Pra, pra, pjesa e dytë. Siç është shkruar më herët, në të do të bëjmë sa vijon:
Le të shkruajmë një klient të vogël për alphavantage në aiohttp me kërkesa për pikat fundore që na duhen.
Le të krijojmë një agjent që do të mbledhë të dhëna për letrat me vlerë dhe meta informacion mbi to.
Por, kjo është ajo që ne do të bëjmë për vetë projektin, dhe për sa i përket kërkimit të faustit, do të mësojmë se si të shkruajmë agjentë që përpunojnë ngjarjet e transmetimit nga kafka, si dhe si të shkruajmë komanda (klikoni mbështjellës), në rastin tonë - për mesazhe shtytëse manuale për temën që po monitoron agjenti.
API AlphaVantage është projektuar mjaft thjesht dhe bukur, kështu që vendosa t'i bëj të gjitha kërkesat përmes metodës construct_query ku nga ana tjetër ka një thirrje http.
I sjell të gjitha fushat në snake_case për lehtësi.
Epo, dekorimi i logger.catch për rezultate të bukura dhe informative të gjurmës.
PS Mos harroni të shtoni token alphavantage në nivel lokal në config.yml, ose të eksportoni variablin e mjedisit HORTON_SERVICE_APIKEY. Ne marrim një shenjë këtu.
Klasa CRUD
Ne do të kemi një koleksion letrash me vlerë për të ruajtur meta informacionet rreth letrave me vlerë.
Tani për tani do të kemi krijimin më të thjeshtë aplikacioni, pak më vonë do ta zgjerojmë, megjithatë, për të mos ju mbajtur në pritje, këtu referencat në klasën e aplikacionit. Unë gjithashtu ju këshilloj të hidhni një sy në klasën e cilësimeve, pasi ajo është përgjegjëse për shumicën e cilësimeve.
Pjesa kryesore
Agjent për mbledhjen dhe mbajtjen e listës së letrave me vlerë
Pra, së pari marrim objektin e aplikimit faust - është mjaft e thjeshtë. Më pas, ne deklarojmë në mënyrë eksplicite një temë për agjentin tonë... Këtu vlen të përmendet se çfarë është, cili është parametri i brendshëm dhe si mund të rregullohet ndryshe.
Parametri i brendshëm, i përshkruar mjaft mirë në dokumentin faust, na lejon të konfigurojmë temën drejtpërdrejt në kod, natyrisht, kjo do të thotë parametrat e ofruar nga zhvilluesit e faustit, për shembull: mbajtje, politika e mbajtjes (si parazgjedhje fshini, por mund të vendosni kompakt), numri i ndarjeve për temë (ndarësepër të bërë, për shembull, më pak se rëndësi globale aplikacionet faust).
Në përgjithësi, agjenti mund të krijojë një temë të menaxhuar me vlera globale, megjithatë, më pëlqen të deklaroj gjithçka në mënyrë eksplicite. Përveç kësaj, disa parametra (për shembull, numri i ndarjeve ose politika e ruajtjes) të temës në reklamën e agjentit nuk mund të konfigurohen.
Ja se si mund të duket pa e përcaktuar manualisht temën:
Epo, tani le të përshkruajmë se çfarë do të bëjë agjenti ynë :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Pra, në fillim të agjentit, ne hapim një sesion aiohttp për kërkesat përmes klientit tonë. Kështu, kur filloni një punëtor, kur agjenti ynë hapet, menjëherë do të hapet një seancë - një, për të gjithë kohën që punëtori po funksionon (ose disa, nëse ndryshoni parametrin konkurencë nga një agjent me një njësi të paracaktuar).
Më pas, ndjekim rrjedhën (e vendosim mesazhin në _, meqenëse ne, në këtë agjent, nuk na intereson përmbajtja) e mesazheve nga tema jonë, nëse ato ekzistojnë në kompensimin aktual, përndryshe cikli ynë do të presë mbërritjen e tyre. Epo, brenda qarkut tonë, ne regjistrojmë marrjen e mesazhit, marrim një listë të letrave me vlerë aktive (get_securities kthehen vetëm aktive si parazgjedhje, shiko kodin e klientit) dhe e ruajmë atë në bazën e të dhënave, duke kontrolluar nëse ka një vlerë me të njëjtin tregues dhe shkëmbim në bazën e të dhënave, nëse ka, atëherë ai (letra) thjesht do të përditësohet.
Le të nisim krijimin tonë!
> docker-compose up -d
... ĐĐ°ĐżŃŃĐș ĐșĐŸĐœŃĐ”ĐčĐœĐ”ŃĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info
Karakteristikat e PS komponent web Nuk do ta konsideroj Faustin në artikuj, ndaj vendosëm flamurin e duhur.
Në komandën tonë të nisjes, ne i thamë Faustit se ku të kërkonte objektin e aplikacionit dhe çfarë të bënte me të (nisni një punëtor) me nivelin e daljes së regjistrit të informacionit. Ne marrim daljen e mëposhtme:
Le të shohim grupin e ndarjes. Siç mund ta shohim, u krijua një temë me emrin që caktuam në kod, numrin e paracaktuar të ndarjeve (8, marrë nga tema_ndarjet - parametri i objektit të aplikacionit), pasi nuk specifikuam një vlerë individuale për temën tonë (nëpërmjet ndarjeve). Agjentit të lëshuar në punëtor i caktohen të 8 ndarjet, pasi është e vetmja, por kjo do të diskutohet më në detaje në pjesën rreth grupimit.
Epo, tani mund të shkojmë në një dritare tjetër terminali dhe të dërgojmë një mesazh bosh në temën tonë:
PS duke pĂ«rdorur @ ne tregojmĂ« se po i dĂ«rgojmĂ« njĂ« mesazh njĂ« teme tĂ« quajtur âmbledh_vleratâ.
Në këtë rast, mesazhi shkoi në ndarjen 6 - mund ta kontrolloni këtë duke shkuar te kafdrop on localhost:9000
Duke shkuar në dritaren e terminalit me punonjësin tonë, do të shohim një mesazh të lumtur të dërguar duke përdorur loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Ne gjithashtu mund të shikojmë në mongo (duke përdorur Robo3T ose Studio3T) dhe të shohim që letrat me vlerë janë në bazën e të dhënave:
Unë nuk jam miliarder dhe për këtë arsye jemi të kënaqur me opsionin e parë të shikimit.
Lumturia dhe gëzimi - agjenti i parë është gati :)
Agjenti gati, rroftë agjenti i ri!
Po, zotërinj, ne kemi kaluar vetëm 1/3 e rrugës së përgatitur nga ky artikull, por mos u dekurajoni, sepse tani do të jetë më e lehtë.
Pra, tani ne kemi nevojë për një agjent që mbledh meta informacion dhe e vendos atë në një dokument grumbullimi:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
MeqenĂ«se ky agjent do tĂ« pĂ«rpunojĂ« informacione pĂ«r njĂ« siguri specifike, ne duhet tĂ« tregojmĂ« shenjĂ«n (simbolin) e kĂ«saj sigurie nĂ« mesazh. PĂ«r kĂ«tĂ« qĂ«llim nĂ« Faust ekzistojnĂ« tĂ« dhĂ«na â klasa qĂ« deklarojnĂ« skemĂ«n e mesazheve nĂ« temĂ«n e agjentit.
Në këtë rast, le të shkojmë në regjistron.pydhe përshkruani se si duhet të duket mesazhi për këtë temë:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Siç mund ta keni marrë me mend, Faust përdor shënimin e tipit python për të përshkruar skemën e mesazhit, prandaj versioni minimal i mbështetur nga biblioteka është 3.6.
Le të kthehemi te agjenti, të vendosim llojet dhe ta shtojmë atë:
Siç mund ta shihni, ne kalojmë një parametër të ri me një skemë në metodën e inicializimit të temës - value_type. Më tej, gjithçka ndjek të njëjtën skemë, kështu që nuk shoh asnjë kuptim të ndalem në ndonjë gjë tjetër.
Epo, prekja e fundit është të shtoni një telefonatë në agjentin e mbledhjes së informacionit meta për collect_securites:
Ne përdorim skemën e shpallur më parë për mesazhin. Në këtë rast kam përdorur metodën .cast pasi nuk kemi nevojë të presim rezultatin nga agjenti, por vlen të theksohet se mënyra dërgoni një mesazh në temë:
cast - nuk bllokon sepse nuk pret rezultat. Ju nuk mund ta dërgoni rezultatin në një temë tjetër si mesazh.
dërgo - nuk bllokon sepse nuk pret rezultat. Ju mund të specifikoni një agjent në temën në të cilën do të shkojë rezultati.
pyet - pret për një rezultat. Ju mund të specifikoni një agjent në temën në të cilën do të shkojë rezultati.
Pra, kjo është e gjitha me agjentët për sot!
Ekipi i Ă«ndrrave
Gjëja e fundit që premtova të shkruaj në këtë pjesë janë komandat. Siç u përmend më herët, komandat në faust janë një mbështjellës rreth klikimit. Në fakt, Faust thjesht bashkon komandën tonë të personalizuar në ndërfaqen e tij kur specifikon tastin -A
Pasi agjentĂ«t e shpallur nĂ« agjentĂ«t.py shtoni njĂ« funksion me njĂ« dekorues app.komandĂ«duke thirrur metodĂ«n hedh Ń mbledh_titujt:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Kështu, nëse thërrasim listën e komandave, komanda jonë e re do të jetë në të:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Ne mund ta përdorim atë si kushdo tjetër, kështu që le të rifillojmë faust worker dhe të fillojmë një koleksion të plotë të letrave me vlerë:
> faust -A horton.agents start-collect-securities
ĂfarĂ« do tĂ« ndodhĂ« mĂ« pas?
Në pjesën tjetër, duke përdorur agjentët e mbetur si shembull, ne do të shqyrtojmë mekanizmin e lavamanit për kërkimin e ekstremeve në çmimet e mbylljes së tregtimit për vitin dhe nisjen e agjentëve.
PS Në pjesën e fundit më pyetën për kafkën faust dhe konfluent (çfarë veçorish ka konfluenti?). Duket se konfluenti është më funksional në shumë mënyra, por fakti është se Faust nuk ka mbështetje të plotë të klientit për konfluent - kjo rrjedh nga përshkrimet e kufizimeve të klientit në doc.