Takže druhá část. Jak bylo napsáno dříve, v něm provedeme následující:
Pojďme napsat malého klienta pro alphavantage na aiohttp s požadavky na koncové body, které potřebujeme.
Vytvořme agenta, který bude sbírat data o cenných papírech a metainformace o nich.
Ale to je to, co uděláme pro samotný projekt a z hlediska faustového výzkumu se naučíme psát agenty, kteří zpracovávají streamové události z kafka, a také jak psát příkazy (click wrapper), v našem případě - pro ruční push zprávy k tématu, které agent sleduje.
Trénink
Klient AlphaVantage
Nejprve napíšeme malého klienta aiohttp pro požadavky na alphavantage.
AlphaVantage API je poměrně jednoduše a krásně navrženo, takže jsem se rozhodl všechny požadavky provádět prostřednictvím této metody construct_query kde je zase volání http.
Přináším všechna pole snake_case pro pohodlí.
Dekorace logger.catch pro krásný a informativní výstup zpětného sledování.
PS Nezapomeňte přidat token alphavantage lokálně do config.yml nebo exportovat proměnnou prostředí HORTON_SERVICE_APIKEY. Dostáváme žeton zde.
třída CRUD
Budeme mít sbírku cenných papírů k ukládání metainformací o cenných papírech.
Prozatím budeme mít nejjednodušší tvorbu aplikace, o něco později ji rozšíříme, nicméně abyste nečekali, zde Reference do třídy aplikací. Také vám doporučuji podívat se na třídu nastavení, protože je zodpovědná za většinu nastavení.
Hlavní část
Zmocněnec pro shromažďování a vedení seznamu cenných papírů
Nejprve tedy získáme objekt aplikace faust - je to docela jednoduché. Dále výslovně deklarujeme téma pro našeho agenta... Zde stojí za zmínku, co to je, jaký je interní parametr a jak to lze zařídit jinak.
Témata v kafce, pokud chceme znát přesnou definici, je lepší si ji přečíst vypnuto. dokument, nebo si můžete přečíst abstraktní na Habré v ruštině, kde se také vše odráží celkem přesně :)
Parametr interní, celkem dobře popsaný ve faust doc, nám umožňuje konfigurovat téma přímo v kódu, samozřejmě to znamená parametry poskytnuté vývojáři faust, např.: retence, retenční politika (ve výchozím nastavení smazat, ale lze nastavit kompaktní), počet oddílů na téma (skóredělat například méně než globální význam aplikace faust).
Obecně platí, že agent může vytvořit spravované téma s globálními hodnotami, nicméně rád vše deklaruji explicitně. Navíc některé parametry (například počet oddílů nebo zásady uchovávání) tématu v inzerci agenta nelze konfigurovat.
Zde je návod, jak by to mohlo vypadat bez ručního definování tématu:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Na začátku agenta tedy otevřeme relaci aiohttp pro požadavky prostřednictvím našeho klienta. Při spouštění pracovníka se tedy při spuštění našeho agenta okamžitě otevře relace – jedna, po celou dobu běhu pracovníka (nebo několik, pokud změníte parametr souběžnost od agenta s výchozí jednotkou).
Dále sledujeme stream (umístíme zprávu do _, jelikož nás v tomto agentu nezajímá obsah) zpráv z našeho tématu, pokud existují v aktuálním offsetu, jinak náš cyklus počká na jejich příchod. No, uvnitř naší smyčky zaprotokolujeme příjem zprávy, získáme seznam aktivních (get_securities vrací pouze aktivní ve výchozím nastavení, viz kód klienta) cenných papírů a uložíme jej do databáze, zkontrolujeme, zda existuje cenný papír se stejným tickerem a výměna v databázi, pokud existuje, pak se (papír) jednoduše aktualizuje.
Pusťme se do našeho výtvoru!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Vlastnosti PS webová komponenta Nebudu v článcích uvažovat o faustovi, proto nastavíme příslušný příznak.
V našem příkazu ke spuštění jsme faustovi řekli, kde hledat objekt aplikace a co s ním dělat (spustit pracovníka) s výstupní úrovní info logu. Získáme následující výstup:
Podívejme se na sadu oddílů. Jak vidíme, bylo vytvořeno téma s názvem, který jsme určili v kódu, výchozím počtem oddílů (8, převzato z téma_oddíly - parametr aplikačního objektu), protože jsme nezadali individuální hodnotu pro naše téma (přes oddíly). Spuštěnému agentovi v workeru je přiřazeno všech 8 oddílů, protože je jediný, ale o tom bude podrobněji pojednáno v části o shlukování.
Nyní můžeme přejít do jiného okna terminálu a odeslat prázdnou zprávu do našeho tématu:
Použití PS @ ukazujeme, že posíláme zprávu tématu s názvem „collect_securities“.
V tomto případě se zpráva přesunula na oddíl 6 - můžete to zkontrolovat přechodem na kafdrop on localhost:9000
Když přejdeme do okna terminálu s naším pracovníkem, uvidíme šťastnou zprávu odeslanou pomocí loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Můžeme se také podívat do mongo (pomocí Robo3T nebo Studio3T) a zjistit, že cenné papíry jsou v databázi:
Nejsem miliardář, a proto se spokojíme s první možností sledování.
Štěstí a radost - první agent je připraven :)
Agent připraven, ať žije nový agent!
Ano, pánové, prošli jsme pouze 1/3 cesty připravené tímto článkem, ale nenechte se odradit, protože teď to bude jednodušší.
Nyní tedy potřebujeme agenta, který shromažďuje meta informace a vkládá je do sbírkového dokumentu:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Protože tento agent bude zpracovávat informace o konkrétním zabezpečení, musíme ve zprávě uvést ticker (symbol) tohoto zabezpečení. K tomuto účelu ve faust existují Evidence — třídy, které deklarují schéma zpráv v tématu agenta.
V tomto případě pojďme na záznamy.pya popište, jak by zpráva pro toto téma měla vypadat:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Jak jste možná uhodli, faust používá k popisu schématu zprávy anotaci typu python, a proto je minimální verze podporovaná knihovnou 3.6.
Vraťme se k agentovi, nastavte typy a přidejte jej:
Jak vidíte, předáme metodě inicializace tématu nový parametr se schématem - typ_hodnoty. Dále se vše řídí stejným schématem, takže nevidím smysl zabývat se něčím jiným.
Posledním krokem je přidání volání agenta pro shromažďování meta informací, aby collect_securitites:
Pro zprávu používáme dříve oznámené schéma. V tomto případě jsem použil metodu .cast, protože nemusíme čekat na výsledek od agenta, ale stojí za zmínku, že způsoby poslat zprávu k tématu:
cast - neblokuje, protože neočekává výsledek. Výsledek nemůžete odeslat do jiného tématu jako zprávu.
odeslat - neblokuje, protože neočekává výsledek. V tématu můžete určit agenta, ke kterému bude výsledek převeden.
zeptat se - čeká na výsledek. V tématu můžete určit agenta, ke kterému bude výsledek převeden.
Tak, to je pro dnešek s agenty vše!
Tým snů
Poslední věc, kterou jsem slíbil napsat v tomto díle, jsou příkazy. Jak již bylo zmíněno dříve, příkazy ve faust jsou obalem kolem kliknutí. Ve skutečnosti faust jednoduše připojí náš vlastní příkaz ke svému rozhraní při zadání klávesy -A
Po ohlášení agentů v agenti.py přidat funkci s dekoratérem app.commandvolání metody obsazení у sbírat_ cenné papíry:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Pokud tedy zavoláme seznam příkazů, náš nový příkaz v něm bude:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Můžeme to použít jako kdokoli jiný, takže restartujme faust worker a začněme plnohodnotnou sbírku cenných papírů:
> faust -A horton.agents start-collect-securities
Co se stane příště?
V další části se na příkladu zbývajících agentů podíváme na sink mechanismus pro hledání extrémů v závěrečných cenách obchodování pro daný rok a spuštění cronu agentů.
PS V minulém díle se mě ptali na faust a splývající kafku (jaké vlastnosti má confluent?). Zdá se, že confluent je v mnoha ohledech funkčnější, ale faktem je, že faust nemá plnou klientskou podporu pro confluent - vyplývá to z popisy klientských omezení v doc.