Tako, tako, drugi del. Kot smo že napisali, bomo v njem storili naslednje:
Napišimo majhen odjemalec za alphavantage na aiohttp z zahtevami za končne točke, ki jih potrebujemo.
Ustvarimo agenta, ki bo zbiral podatke o vrednostnih papirjih in meta informacije o njih.
Ampak, to je tisto, kar bomo naredili za sam projekt in v smislu raziskovanja fausta se bomo naučili pisati agente, ki obdelujejo tokovne dogodke iz kafke, pa tudi pisati ukaze (click wrapper), v našem primeru - za ročna potisna sporočila na temo, ki jo spremlja agent.
Izobraževanje
Odjemalec AlphaVantage
Najprej napišimo majhen odjemalec aiohttp za zahteve za alphavantage.
AlphaVantage API je precej preprosto in lepo oblikovan, zato sem se odločil, da vse zahteve opravim prek metode construct_query kjer je nato klic http.
Prinesem vsa polja snake_case za udobje.
No, dekoracija logger.catch za čudovit in informativen izpis sledenja.
PS Ne pozabite dodati žetona alphavantage lokalno v config.yml ali izvoziti spremenljivko okolja HORTON_SERVICE_APIKEY. Prejmemo žeton tukaj.
razred CRUD
Imeli bomo zbirko vrednostnih papirjev za shranjevanje metainformacij o vrednostnih papirjih.
Zaenkrat bomo imeli najenostavnejšo izdelavo aplikacije, malo kasneje jo bomo razširili, da pa ne boste čakali tukaj reference v App-razred. Svetujem vam tudi, da si ogledate razred nastavitev, saj je odgovoren za večino nastavitev.
Glavni organ
Agent za zbiranje in vzdrževanje seznama vrednostnih papirjev
Torej, najprej dobimo objekt aplikacije faust - to je precej preprosto. Nato eksplicitno deklariramo temo za našega agenta ... Tukaj je vredno omeniti, kaj to je, kaj je notranji parameter in kako je to mogoče drugače urediti.
Teme v Kafki, če želimo vedeti natančno definicijo, je bolje prebrati izklopljeno. dokument, ali lahko preberete nabornik na Habréju v ruščini, kjer se vse odraža tudi precej natančno :)
Notranji parameter, ki je precej dobro opisan v faust dokumentu, nam omogoča, da temo konfiguriramo neposredno v kodi, seveda to pomeni parametre, ki so jih zagotovili razvijalci faust, na primer: zadrževanje, politika zadrževanja (privzeto izbriši, vendar lahko nastavite kompaktna), število particij na temo (rezultatinarediti, na primer, manj kot globalna vrednost aplikacije faust).
Na splošno lahko agent ustvari upravljano temo z globalnimi vrednostmi, vendar rad vse deklariram izrecno. Poleg tega nekaterih parametrov (na primer števila particij ali pravilnika zadrževanja) teme v oglasu posrednika ni mogoče konfigurirati.
Takole bi lahko izgledalo brez ročne definicije teme:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Torej, na začetku agenta odpremo sejo aiohttp za zahteve prek našega odjemalca. Tako se ob zagonu delavca, ko se zažene naš agent, takoj odpre seja - ena, za ves čas, ko se delavec izvaja (ali več, če spremenite parameter sočasnost od agenta s privzeto enoto).
Nato sledimo toku (sporočilo postavimo v _, saj nas v tem agentu ne zanima vsebina) sporočil iz naše teme, če obstajajo na trenutnem odmiku, sicer bo naš cikel počakal na njihov prihod. No, znotraj naše zanke zabeležimo prejem sporočila, pridobimo seznam aktivnih (get_securities privzeto vrne le aktivne, glejte kodo odjemalca) vrednostnih papirjev in ga shranimo v zbirko podatkov ter preverimo, ali obstaja vrednostni papir z isto oznako in izmenjavo v bazi podatkov , če obstaja, potem bo (papir) preprosto posodobljen.
Zaženimo naše ustvarjanje!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Lastnosti PS spletna komponenta Fausta v člankih ne bom upošteval, zato smo postavili ustrezno zastavico.
V našem ukazu za zagon smo Faustu povedali, kje naj išče objekt aplikacije in kaj naj z njim naredi (zažene delavca) z izhodno ravnjo dnevnika informacij. Dobimo naslednji rezultat:
Poglejmo nabor particij. Kot lahko vidimo, je bila ustvarjena tema z imenom, ki smo ga določili v kodi, privzeto število particij (8, vzeto iz topic_partitions - parameter objekta aplikacije), saj nismo podali posamezne vrednosti za našo temo (preko particij). Zagnanemu agentu v delavcu je dodeljenih vseh 8 particij, saj je edini, a o tem bomo podrobneje razpravljali v delu o združevanju v gruče.
No, zdaj lahko gremo v drugo okno terminala in pošljemo prazno sporočilo naši temi:
Uporaba PS @ pokažemo, da pošiljamo sporočilo v temo z imenom “collect_securities”.
V tem primeru je sporočilo šlo na particijo 6 - to lahko preverite tako, da obiščete kafdrop localhost:9000
Ko gremo z našim delavcem v okno terminala, bomo videli veselo sporočilo, poslano z uporabo loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Pogledamo lahko tudi v mongo (z uporabo Robo3T ali Studio3T) in vidimo, da so vrednostni papirji v bazi podatkov:
Nisem milijarder, zato smo zadovoljni s prvo možnostjo ogleda.
Sreča in veselje - prvi agent je pripravljen :)
Agent pripravljen, naj živi novi agent!
Da, gospodje, prehodili smo le 1/3 poti, ki jo je pripravil ta članek, vendar naj vas ne obupa, saj bo zdaj lažje.
Zdaj potrebujemo posrednika, ki zbira meta informacije in jih vnaša v zbirni dokument:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Ker bo ta agent obdelal informacije o določenem vrednostnem papirju, moramo v sporočilu navesti oznako (simbol) tega vrednostnega papirja. V ta namen so v faustu zapisi — razredi, ki deklarirajo shemo sporočila v temi posrednika.
V tem primeru pojdimo na records.pyin opišite, kako naj izgleda sporočilo za to temo:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kot ste morda uganili, Faust uporablja opombo tipa python za opis sporočilne sheme, zato je najmanjša različica, ki jo podpira knjižnica 3.6.
Kot lahko vidite, posredujemo nov parameter s shemo metodi inicializacije teme - value_type. Nadalje, vse poteka po isti shemi, zato ne vidim smisla, da bi se ukvarjal s čim drugim.
No, zadnji dotik je dodajanje klica agentu za zbiranje meta informacij za collect_securitites:
Za sporočilo uporabljamo predhodno napovedano shemo. V tem primeru sem uporabil metodo .cast, saj nam ni treba čakati na rezultat agenta, vendar velja omeniti, da načine pošlji sporočilo na temo:
cast - ne blokira, ker ne pričakuje rezultata. Rezultata ne morete poslati v drugo temo kot sporočilo.
pošlji - ne blokira, ker ne pričakuje rezultata. V temi lahko določite agenta, ki mu bo šel rezultat.
vprašaj - čaka na rezultat. V temi lahko določite agenta, ki mu bo šel rezultat.
Torej, to je vse z agenti za danes!
Sanjska ekipa
Zadnja stvar, ki sem jo obljubil napisati v tem delu, so ukazi. Kot smo že omenili, so ukazi v faustu ovoj okrog klika. Pravzaprav faust preprosto pripne naš ukaz po meri svojemu vmesniku, ko podaja ključ -A
Po napovedanih agentih v agenti.py dodajte funkcijo z okrasiteljem app.commandklicanje metode lite у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Torej, če pokličemo seznam ukazov, bo naš novi ukaz v njem:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Uporabljamo ga lahko kot kogarkoli drugega, zato znova zaženimo faust workerja in začnimo polno zbiranje vrednostnih papirjev:
> faust -A horton.agents start-collect-securities
Kaj bo potem?
V naslednjem delu bomo na primeru preostalih agentov obravnavali mehanizem ponora za iskanje ekstremov v zaključnih cenah trgovanja za leto in zagon agentov cron.
PS Pod zadnjim delom so me spraševali o faustu in sotočni kafki (kakšne lastnosti ima konfluent?). Zdi se, da je confluent bolj funkcionalen v mnogih pogledih, toda dejstvo je, da faust nima popolne podpore odjemalcev za confluent - to izhaja iz opise omejitev strank v dok.