Takže druhá časť. Ako bolo napísané vyššie, v ňom urobíme nasledovné:
Napíšme malého klienta pre alphavantage na aiohttp s požiadavkami na koncové body, ktoré potrebujeme.
Vytvorme agenta, ktorý bude zbierať údaje o cenných papieroch a metainformácie o nich.
Ale to je to, čo urobíme pre samotný projekt a v rámci faustovho výskumu sa naučíme písať agentov, ktorí spracovávajú streamované udalosti z kafka, ako aj písať príkazy (click wrapper), v našom prípade - pre manuálne push správy na tému, ktorú agent monitoruje.
Tréning
Klient AlphaVantage
Najprv napíšme malého klienta aiohttp pre požiadavky na alphavantage.
AlphaVantage API je celkom jednoducho a krásne navrhnuté, takže som sa rozhodol všetky požiadavky vykonať prostredníctvom tejto metódy construct_query kde zasa prebieha volanie http.
Prinášam všetky polia snake_case pre pohodlie.
Dekorácia logger.catch pre krásny a informatívny výstup sledovania.
PS Nezabudnite pridať token alphavantage lokálne do config.yml alebo exportovať premennú prostredia HORTON_SERVICE_APIKEY. Dostávame token tu.
trieda CRUD
Budeme mať zbierku cenných papierov na ukladanie meta informácií o cenných papieroch.
Zatiaľ budeme mať najjednoduchšiu tvorbu aplikácie, o niečo neskôr ju rozšírime, aby ste však nenechali čakať, tu referencie do triedy aplikácií. Odporúčam vám tiež pozrieť sa na triedu nastavení, pretože je zodpovedná za väčšinu nastavení.
Najprv teda získame objekt aplikácie faust - je to celkom jednoduché. Ďalej explicitne deklarujeme tému pre nášho agenta... Tu stojí za zmienku, čo to je, aký je interný parameter a ako sa to dá zariadiť inak.
Témy v kafke, ak chceme poznať presnú definíciu, je lepšie si ju prečítať vypnuté. dokument, alebo si môžete prečítať abstraktné na Habré v ruštine, kde je všetko aj celkom presne odzrkadlené :)
Interný parameter, celkom dobre popísaný vo faust doc, nám umožňuje nakonfigurovať tému priamo v kóde, samozrejme to znamená parametre poskytnuté vývojármi faust, napr.: retencia, retenčná politika (štandardne mazať, ale môžete kompaktné), počet oddielov na tému (skórerobiť napríklad menej ako celosvetový význam aplikácie faust).
Vo všeobecnosti môže agent vytvoriť spravovanú tému s globálnymi hodnotami, rád však všetko deklarujem explicitne. Okrem toho niektoré parametre (napríklad počet oddielov alebo politika uchovávania) témy v reklame agenta nemožno nakonfigurovať.
Takto by to mohlo vyzerať bez manuálneho definovania témy:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Takže na začiatku agenta otvoríme reláciu aiohttp pre požiadavky prostredníctvom nášho klienta. Pri spustení pracovníka sa teda pri spustení nášho agenta okamžite otvorí relácia – jedna, po celú dobu spustenia pracovníka (alebo niekoľko, ak zmeníte parameter súbežnosť od agenta s predvolenou jednotkou).
Ďalej sledujeme prúd (správu umiestnime do _, keďže nám v tomto agentovi nezáleží na obsahu) správ z našej témy, ak existujú pri aktuálnom posune, inak náš cyklus počká na ich príchod. Vo vnútri našej slučky zaprotokolujeme prijatie správy, získame zoznam aktívnych (štandardne sa get_securities vracia iba aktívny, pozri kód klienta) cenných papierov a uložíme ho do databázy, pričom skontrolujeme, či existuje cenný papier s rovnakým tickerom a výmena v databáze, ak existuje, potom sa (papier) jednoducho aktualizuje.
Poďme spustiť našu tvorbu!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Vlastnosti PS webový komponent V článkoch nebudem uvažovať nad faustom, preto nastavujeme príslušný príznak.
V našom príkaze na spustenie sme faustovi povedali, kde má hľadať objekt aplikácie a čo s ním robiť (spustiť pracovníka) s úrovňou výstupu info logu. Získame nasledujúci výstup:
Pozrime sa na sadu oddielov. Ako vidíme, bola vytvorená téma s názvom, ktorý sme určili v kóde, predvoleným počtom oddielov (8, prevzaté z topic_partitions - parameter aplikačného objektu), keďže sme nešpecifikovali individuálnu hodnotu pre našu tému (cez oddiely). Spustený agent v robotovi má priradených všetkých 8 oddielov, keďže je jediný, ale o tom bude podrobnejšie popísané v časti o klastrovaní.
Teraz môžeme prejsť do iného okna terminálu a poslať prázdnu správu našej téme:
Použitie PS @ ukazujeme, že posielame správu téme s názvom „collect_securities“.
V tomto prípade sa správa dostala na oddiel 6 - môžete to skontrolovať tak, že prejdete na kafdrop on localhost:9000
Keď prejdeme do okna terminálu s naším pracovníkom, uvidíme šťastnú správu odoslanú pomocou loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Môžeme sa tiež pozrieť do mongo (pomocou Robo3T alebo Studio3T) a vidieť, že cenné papiere sú v databáze:
Nie som miliardár, a preto sme spokojní s prvou možnosťou zobrazenia.
Šťastie a radosť - prvý agent je pripravený :)
Agent pripravený, nech žije nový agent!
Áno, páni, prešli sme len 1/3 cesta pripraveného týmto článkom, ale nenechajte sa odradiť, pretože teraz to bude jednoduchšie.
Takže teraz potrebujeme agenta, ktorý zbiera meta informácie a vkladá ich do zberného dokumentu:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Keďže tento agent bude spracovávať informácie o konkrétnom cennom papieri, musíme v správe uviesť ticker (symbol) tohto zabezpečenia. Na tento účel vo faust existujú Evidencia — triedy, ktoré deklarujú schému správ v téme agenta.
V tomto prípade poďme na záznamy.pya popíšte, ako by mala vyzerať správa pre túto tému:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Ako ste možno uhádli, faust používa anotáciu typu python na popis schémy správ, a preto je minimálna verzia podporovaná knižnicou 3.6.
Vráťme sa k agentovi, nastavte typy a pridajte ho:
Ako vidíte, metóde inicializácie témy odovzdávame nový parameter so schémou - typ_hodnoty. Ďalej sa všetko riadi rovnakou schémou, takže nevidím dôvod zaoberať sa niečím iným.
Posledným krokom je pridanie hovoru agentovi zhromažďovania meta informácií na collect_securitites:
Pre správu používame predtým oznámenú schému. V tomto prípade som použil metódu .cast, pretože nemusíme čakať na výsledok od agenta, ale stojí za zmienku, že spôsoby poslať správu k téme:
obsadenie - neblokuje, pretože neočakáva výsledok. Výsledok nemôžete poslať do inej témy ako správu.
odoslať - neblokuje, pretože neočakáva výsledok. V téme môžete určiť agenta, ku ktorému sa dostane výsledok.
opýtať sa - čaká na výsledok. V téme môžete určiť agenta, ku ktorému sa dostane výsledok.
Takže, to je na dnes všetko s agentmi!
Tím snov
Posledná vec, ktorú som sľúbil napísať v tejto časti, sú príkazy. Ako už bolo spomenuté, príkazy vo fauste sú obalom okolo kliknutia. V skutočnosti faust pri zadávaní klávesu -A jednoducho pripojí náš vlastný príkaz k svojmu rozhraniu
Po ohlásených agentoch v agenti.py pridať funkciu s dekoratérom app.commandvolanie metódy obsadenie у zbierať_cenné papiere:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Ak teda zavoláme zoznam príkazov, náš nový príkaz v ňom bude:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Môžeme ho použiť ako ktokoľvek iný, takže reštartujme faust workera a začnime s plnohodnotnou zbierkou cenných papierov:
> faust -A horton.agents start-collect-securities
čo bude ďalej?
V ďalšej časti sa na príklade zvyšných agentov pozrieme na sink mechanizmus na hľadanie extrémov v záverečných cenách obchodovania na daný rok a spustenie cronu agentov.
PS Pod poslednou časťou sa ma pýtali na faust a splývajúcu kafku (aké vlastnosti má confluent?). Zdá sa, že confluent je v mnohých smeroch funkčnejší, ale faktom je, že faust nemá plnú klientsku podporu pre confluent - vyplýva to z popisy klientských obmedzení v doc.