Dus, sa, it twadde diel. Lykas earder skreaun, sille wy it folgjende dwaan:
Litte wy in lyts kliïnt skriuwe foar alphavantage op aiohttp mei oanfragen foar de einpunten dy't wy nedich binne.
Litte wy in agint oanmeitsje dy't gegevens sil sammelje oer weardepapieren en meta-ynformaasje oer har.
Mar, dit is wat wy sille dwaan foar it projekt sels, en yn termen fan faust-ûndersyk sille wy leare hoe't jo aginten skriuwe dy't stream-eveneminten ferwurkje fan kafka, lykas hoe't jo kommando's skriuwe (klik wrapper), yn ús gefal - foar hânmjittich push-berjochten nei it ûnderwerp dat de agint kontrolearret.
Tarieding fan
AlphaVantage Client
Litte wy earst in lyts aiohttp-kliïnt skriuwe foar fersiken om alphavantage.
De AlphaVantage API is frij ienfâldich en prachtich ûntworpen, dus ik besleat alle oanfragen fia de metoade te meitsjen construct_query wêr't op syn beurt in http-oprop is.
Ik bring alle fjilden nei snake_case foar gemak.
No, de logger.catch-dekoraasje foar prachtige en ynformative traceback-útfier.
PS Ferjit net om de alphavantage token lokaal ta te foegjen oan config.yml, of de omjouwingsfariabele te eksportearjen HORTON_SERVICE_APIKEY. Wy krije in token hjir.
CRUD klasse
Wy sille in samling fan weardepapieren hawwe om meta-ynformaasje oer weardepapieren op te slaan.
Foar no sille wy de ienfâldichste oanmeitsjen fan applikaasjes hawwe, in bytsje letter sille wy it lykwols útwreidzje om jo net te wachtsjen, hjir referinsjes oan App-klasse. Ik advisearje jo ek om de ynstellingsklasse te besjen, om't it ferantwurdlik is foar de measte ynstellings.
Haadpart
Agent foar it sammeljen en ûnderhâlden fan in list fan weardepapieren
Dat, earst krije wy it faust-applikaasjeobjekt - it is frij ienfâldich. Dêrnei ferklearje wy eksplisyt in ûnderwerp foar ús agent ... Hjir is it wurdich te neamen wat it is, wat de ynterne parameter is en hoe't dit oars kin wurde regele.
Underwerpen yn kafka, as wy de krekte definysje witte wolle, is it better om te lêzen út. dokumint, of jo kinne lêze kompendium op Habré yn it Russysk, wêr't alles ek frij presys werjûn wurdt :)
Yn 't algemien kin de agint in beheard ûnderwerp oanmeitsje mei globale wearden, lykwols wol ik alles eksplisyt ferklearje. Derneist kinne guon parameters (bygelyks it oantal partysjes of behâldbelied) fan it ûnderwerp yn 'e agent-advertinsje net konfigureare wurde.
Hjir is hoe't it der útsjen kin sûnder it ûnderwerp manuell te definiearjen:
No, litte wy no beskriuwe wat ús agent sil dwaan :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Dat, oan it begjin fan 'e agint, iepenje wy in aiohttp-sesje foar oanfragen fia ús kliïnt. Sa, by it starten fan in arbeider, as ús agint wurdt lansearre, sil fuortendaliks in sesje iepene wurde - ien, foar de hiele tiid dat de arbeider rint (of ferskate, as jo de parameter feroarje gelikensens fan in agint mei in standertienheid).
Dêrnei folgje wy de stream (wy pleatse it berjocht yn _, om't wy, yn dizze agint, net skele oer de ynhâld) fan berjochten fan ús ûnderwerp, as se besteane op 'e hjoeddeistige offset, oars sil ús syklus wachtsje op har komst. No, yn ús lus registrearje wy de ûntfangst fan it berjocht, krije in list mei aktive (get_securities jout allinich aktyf standert, sjoch kliïntkoade) weardepapieren en bewarje it yn 'e databank, kontrolearje oft d'r in feiligens is mei deselde ticker en útwikseling yn 'e databank , as der is, dan sil it (it papier) gewoan bywurke wurde.
Litte wy ús skepping lansearje!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Features web komponint Ik sil gjin faust beskôgje yn 'e artikels, dus wy sette de passende flagge.
Yn ús startkommando hawwe wy faust ferteld wêr't it applikaasjeobjekt te sykjen en wat dermei te dwaan (in arbeider starte) mei it útfiernivo fan ynfolog. Wy krije de folgjende útfier:
Litte wy nei de partysjeset sjen. Sa't wy sjen kinne, is in ûnderwerp makke mei de namme dy't wy yn 'e koade oantsjutten, it standert oantal partysjes (8, nommen út topic_partysjes - applikaasje-objektparameter), om't wy gjin yndividuele wearde hawwe opjûn foar ús ûnderwerp (fia partysjes). De lansearre agint yn 'e arbeider wurdt tawiisd oan alle 8 partysjes, om't it de ienige is, mar dit sil yn mear detail besprutsen wurde yn it diel oer klustering.
No, no kinne wy nei in oar terminalfinster gean en in leech berjocht nei ús ûnderwerp stjoere:
PS brûke @ wy litte sjen dat wy in berjocht stjoere nei in ûnderwerp mei de namme "collect_securities".
Yn dit gefal gie it berjocht nei partysje 6 - jo kinne dit kontrolearje troch te gean nei kafdrop op localhost:9000
Gean nei it terminalfinster mei ús arbeider, sille wy in lokkich berjocht sjen ferstjoerd mei loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Wy kinne ek nei mongo sjen (mei Robo3T of Studio3T) en sjen dat de weardepapieren yn 'e database binne:
Ik bin gjin miljardêr, en dêrom binne wy tefreden mei de earste besjen opsje.
Lok en freugde - de earste agent is klear :)
Agent klear, lang libje de nije agent!
Ja, hearen, wy hawwe mar 1/3 fan it paad dat troch dit artikel taret is ôfsletten, mar wês net ûntmoedige, want no sil it makliker wurde.
Dat no hawwe wy in agint nedich dy't meta-ynformaasje sammelet en it yn in samlingdokumint set:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Sûnt dizze agint sil ferwurkje ynformaasje oer in spesifike feiligens, wy moatte oanjaan de ticker (symboal) fan dizze feiligens yn it berjocht. Foar dit doel yn faust der binne records - klassen dy't it berjochtskema ferklearje yn it agentûnderwerp.
Yn dit gefal, litte wy gean nei records.pyen beskriuw hoe't it berjocht foar dit ûnderwerp der útsjen moat:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Lykas jo miskien hawwe rieden, brûkt faust de annotaasje fan it python-type om it berjochtskema te beskriuwen, dat is de reden wêrom't de minimale ferzje dy't troch de bibleteek wurdt stipe is 3.6.
Litte wy weromgean nei de agint, set de typen yn en foegje it ta:
Sa't jo sjen kinne, jouwe wy in nije parameter mei in skema troch nei de metoade foar inisjalisaasje fan ûnderwerp - value_type. Fierder folget alles itselde skema, dus ik sjoch gjin nut om op wat oars te stean.
No, de lêste touch is om in oprop ta te foegjen oan de agint foar it sammeljen fan meta-ynformaasje om collect_securitites:
Wy brûke it earder oankundige skema foar it berjocht. Yn dit gefal haw ik de .cast-metoade brûkt, om't wy net hoege te wachtsjen op it resultaat fan 'e agint, mar it is it neamen wurdich dat manieren stjoer in berjocht nei it ûnderwerp:
cast - blokkearret net om't it gjin resultaat ferwachtet. Jo kinne it resultaat net as berjocht nei in oar ûnderwerp stjoere.
ferstjoere - blokkearret net om't it gjin resultaat ferwachtet. Jo kinne in agint opjaan yn it ûnderwerp wêr't it resultaat nei sil gean.
freegje - wachtet op in resultaat. Jo kinne in agint opjaan yn it ûnderwerp wêr't it resultaat nei sil gean.
Dat, dat is alles mei aginten foar hjoed!
It dreamteam
It lêste wat ik tasein yn dit diel te skriuwen is kommando's. Lykas earder neamd, binne kommando's yn faust in wrapper om klik. Feitlik hechtet faust ús oanpaste kommando gewoan oan har ynterface by it opjaan fan de -A-kaai
Nei de oankundige aginten yn agents.py add in funksje mei in decorator app.kommandoropt de metoade cast у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Dus, as wy de list mei kommando's neame, sil ús nije kommando deryn wêze:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Wy kinne it brûke lykas elkenien, dus litte wy de faust-arbeider opnij starte en in folsleine kolleksje fan weardepapieren begjinne:
> faust -A horton.agents start-collect-securities
Wat sil d'rnei barre?
Yn it folgjende diel, mei de oerbleaune aginten as foarbyld, sille wy it sinkmeganisme beskôgje foar it sykjen nei ekstremen yn 'e slutingsprizen fan hannel foar it jier en de cron-lansearring fan aginten.