Дакле, тако, други део. Као што је раније написано, у њему ћемо урадити следеће:
Хајде да напишемо мали клијент за алпхавантаге на аиохттп са захтевима за крајње тачке које су нам потребне.
Хајде да направимо агента који ће прикупљати податке о хартијама од вредности и мета информације о њима.
Али, то је оно што ћемо урадити за сам пројекат, а у смислу фауст истраживања, научићемо како да напишемо агенте који обрађују стреам догађаје из кафке, као и како да напишемо команде (клик омотач), у нашем случају - за ручне пусх поруке на тему коју агент надгледа.
Обука
АлпхаВантаге Цлиент
Прво, хајде да напишемо мали аиохттп клијент за захтеве за алпхавантаге.
АлпхаВантаге АПИ је прилично једноставно и лепо дизајниран, па сам одлучио да све захтеве упутим путем методе construct_query где заузврат постоји хттп позив.
Доносим сва поља snake_case ради лакшег.
Па, декорација логгер.цатцх за леп и информативан излаз за праћење.
ПС Не заборавите да додате алпхавантаге токен локално у цонфиг.имл, или извезите променљиву окружења HORTON_SERVICE_APIKEY. Добијамо жетон овде.
ЦРУД класа
Имаћемо колекцију хартија од вредности за чување мета информација о хартијама од вредности.
За сада ћемо имати најједноставнију израду апликације, мало касније ћемо је проширити, међутим, да не бисмо чекали, ево референце у Апп-класу. Такође препоручујем да погледате класу подешавања, пошто је она одговорна за већину подешавања.
Главни део
Агент за прикупљање и вођење листе хартија од вредности
Дакле, прво добијамо објекат Фауст апликације - прилично је једноставно. Затим експлицитно декларишемо тему за нашег агента... Овде је вредно поменути шта је то, шта је интерни параметар и како се то може другачије уредити.
Теме у кафки, ако желимо да знамо тачну дефиницију, боље је прочитати ван. документ, или можете читати компендијум на Хабреу на руском, где се све такође прилично тачно одражава :)
Параметар интерни, који је прилично добро описан у фауст документу, омогућава нам да конфигуришемо тему директно у коду, наравно, то подразумева параметре које су обезбедили Фауст програмери, на пример: задржавање, политика задржавања (подразумевано брисање, али можете подесити компактан), број партиција по теми (резултатиучинити, на пример, мање од глобалног значаја апликације Фауст).
Генерално, агент може да креира управљану тему са глобалним вредностима, међутим, ја волим да све експлицитно изјавим. Поред тога, неки параметри (на пример, број партиција или политика задржавања) теме у огласу агента не могу да се конфигуришу.
Ево како би то могло да изгледа без ручног дефинисања теме:
Па, хајде да сада опишемо шта ће наш агент урадити :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Дакле, на почетку агента отварамо аиохттп сесију за захтеве преко нашег клијента. Дакле, при покретању радника, када се наш агент покрене, одмах ће се отворити сесија - једна, за све време рада радника (или неколико, ако промените параметар подударност од агента са подразумеваном јединицом).
Затим пратимо ток (стављамо поруку у _, пошто нас, у овом агенту, није брига за садржај) порука из наше теме, ако постоје на тренутном офсету, иначе ће наш циклус чекати њихов долазак. Па, унутар наше петље евидентирамо пријем поруке, добијамо листу активних (гет_сецурити враћа само активну подразумевано, види клијентски код) хартија од вредности и чувамо је у бази података, проверавамо да ли постоји хартија са истим тикером и размена у бази података, ако постоји, онда ће се она (папир) једноставно ажурирати.
Хајде да покренемо нашу креацију!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
ПС Феатурес веб компонента Фауста нећу разматрати у чланцима, па смо поставили одговарајућу заставу.
У нашој команди за покретање, рекли смо Фаусту где да тражи објекат апликације и шта да уради са њим (покрене радника) са нивоом излаза инфо дневника. Добијамо следећи излаз:
Хајде да погледамо скуп партиција. Као што видимо, креирана је тема са именом које смо навели у коду, подразумеваним бројем партиција (8, преузето из топиц_партитионс - параметар објекта апликације), пошто нисмо навели појединачну вредност за нашу тему (преко партиција). Покренутом агенту у воркер-у је додељено свих 8 партиција, пошто је једини, али ће о томе бити детаљније у делу о кластеровању.
Па, сада можемо отићи до другог прозора терминала и послати празну поруку нашој теми:
ПС користећи @ показујемо да шаљемо поруку теми под називом „цоллецт_сецуритиес“.
У овом случају, порука је отишла на партицију 6 - ово можете проверити тако што ћете отићи на кафдроп он localhost:9000
Одласком до прозора терминала са нашим радником, видећемо срећну поруку послату помоћу логуру:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Такође можемо да погледамо монго (користећи Робо3Т или Студио3Т) и видимо да су хартије од вредности у бази података:
Нисам милијардер и зато смо задовољни првом опцијом гледања.
Срећа и радост - први агент је спреман :)
Агент спреман, живео нови агент!
Да, господо, прешли смо само 1/3 пута припремљеног овим чланком, али немојте се обесхрабрити, јер ће сада бити лакше.
Дакле, сада нам је потребан агент који прикупља мета информације и ставља их у документ за прикупљање:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Пошто ће овај агент обрадити информације о одређеној безбедности, потребно је да у поруци наведемо ознаку (симбол) овог обезбеђења. За ову сврху у Фаусту постоје Плоче — класе које декларишу шему порука у теми агента.
У овом случају, идемо на рецордс.пии опишите како би порука за ову тему требало да изгледа:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Као што сте можда претпоставили, Фауст користи напомену типа питхон да опише шему поруке, због чега је минимална верзија коју библиотека подржава 3.6.
Као што видите, методу иницијализације теме преносимо нови параметар са шемом - валуе_типе. Даље, све иде по истој шеми, тако да не видим смисла да се задржавам на било чему другом.
Па, последњи додир је додавање позива агенту за прикупљање мета информација за цоллецт_сецуриттес:
Користимо претходно најављену шему за поруку. У овом случају користио сам метод .цаст јер не треба да чекамо резултат од агента, али вреди напоменути да начине пошаљите поруку на тему:
цаст - не блокира јер не очекује резултат. Не можете послати резултат на другу тему као поруку.
послати - не блокира јер не очекује резултат. Можете одредити агента у теми на коју ће ићи резултат.
питати - чека резултат. Можете одредити агента у теми на коју ће ићи резултат.
Дакле, то је све са агентима за данас!
Тим снова
Последње што сам обећао да ћу написати у овом делу су команде. Као што је раније поменуто, команде у Фаусту су омотач око клика. У ствари, фауст једноставно повезује нашу прилагођену команду свом интерфејсу када наведе -А кључ
Након што су најављени агенти у агенти.пи додајте функцију са декоратером апп.цоммандпозивање методе бацити у цоллецт_сецуритес:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Дакле, ако позовемо листу команди, наша нова команда ће бити у њој:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Можемо да га користимо као и било ко други, па хајде да поново покренемо фауст радника и започнемо пуну колекцију хартија од вредности:
> faust -A horton.agents start-collect-securities
Шта ће се даље дешавати?
У наредном делу, користећи преостале агенте као пример, размотрићемо механизам понора за тражење екстрема у ценама затварања трговања за годину и црон лансирање агената.
ПС У последњем делу су ме питали о Фаусту и конфлуентној кафки (које карактеристике има конфлуент?). Чини се да је конфлуент функционалнији на много начина, али чињеница је да Фауст нема потпуну клијентску подршку за конфлуент – ово следи из описи ограничења клијената у док.