Do, do, la dua parto. Kiel skribite antaŭe, en ĝi ni faros la jenon:
Ni skribu malgrandan klienton por alphaavantage sur aiohttp kun petoj por la finpunktoj, kiujn ni bezonas.
Ni kreu agenton, kiu kolektos datumojn pri valorpaperoj kaj metainformojn pri ili.
Sed, ĉi tion ni faros por la projekto mem, kaj laŭ faust-esplorado, ni lernos kiel skribi agentojn, kiuj prilaboras fluajn eventojn de kafka, kaj ankaŭ kiel skribi komandojn (klaku envolvaĵon), en nia kazo - por manaj puŝomesaĝoj al la temo kiun la agento kontrolas.
Trejnado
AlphaVantage Kliento
Unue, ni skribu malgrandan aiohttp klienton por petoj al alphaavantage.
Nuntempe ni havos la plej simplan kreadon de aplikaĵo, iom poste ni vastigos ĝin, tamen, por ne atendigi vin, ĉi tie referencoj al App-klaso. Mi ankaŭ konsilas al vi rigardi la klason de agordoj, ĉar ĝi respondecas pri la plej multaj el la agordoj.
Ĉefa parto
Agento por kolekti kaj konservi liston de valorpaperoj
Do, unue ni ricevas la faust-aplikaĵon - ĝi estas sufiĉe simpla. Poste, ni eksplicite deklaras temon por nia agento... Ĉi tie indas mencii kio ĝi estas, kio estas la interna parametro kaj kiel ĉi tio povas esti aranĝita alimaniere.
Temoj en kafka, se ni volas scii la ĝustan difinon, estas pli bone legi for. dokumento, aŭ vi povas legi kompendio sur Habré en la rusa, kie ĉio estas ankaŭ sufiĉe precize reflektita :)
Interna parametro, sufiĉe bone priskribita en la faust-dokumento, permesas al ni agordi la temon rekte en la kodo, kompreneble, tio signifas la parametrojn provizitajn de la faust-programistoj, ekzemple: reteno, retenpolitiko (defaŭlte forigi, sed vi povas agordi kompakta), nombro da sekcioj per temo (disdonojfari, ekzemple, malpli ol tutmonda signifo aplikoj faust).
Ĝenerale, la agento povas krei administritan temon kun tutmondaj valoroj, tamen mi ŝatas deklari ĉion eksplicite. Krome, kelkaj parametroj (ekzemple, la nombro da sekcioj aŭ retenpolitiko) de la temo en la agentanonco ne povas esti agordita.
Jen kiel ĝi povus aspekti sen permane difini la temon:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Do, komence de la agento, ni malfermas aiohttp-sesion por petoj per nia kliento. Tiel, kiam oni komencas laboriston, kiam nia agento estas lanĉita, sesio tuj estos malfermita - unu, dum la tuta tempo, kiam la laboristo funkcias (aŭ pluraj, se vi ŝanĝas la parametron). samtempeco de agento kun defaŭlta unuo).
Poste, ni sekvas la fluon (ni metas la mesaĝon enen _, ĉar ni, en ĉi tiu agento, ne zorgas pri la enhavo) de mesaĝoj de nia temo, se ili ekzistas ĉe la nuna ofseto, alie nia ciklo atendos ilian alvenon. Nu, ene de nia buklo, ni registras la ricevon de la mesaĝo, ricevas liston de aktivaj (get_securities revenas nur aktivaj defaŭlte, vidu klientkodon) kaj konservas ĝin en la datumbazo, kontrolante ĉu ekzistas sekureco kun la sama ticker kaj interŝanĝo en la datumbazo , se ekzistas, tiam ĝi (la papero) simple estos ĝisdatigita.
Ni lanĉu nian kreaĵon!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Trajtoj TTT-komponento Mi ne konsideros faust en la artikoloj, do ni starigas la taŭgan flagon.
En nia lanĉa komando, ni diris al faust, kie serĉi la aplikaĵobjekton kaj kion fari kun ĝi (komencu laboriston) kun la nivelo de eligo de informoj. Ni ricevas la sekvan eligon:
Ni rigardu la dispartigon. Kiel ni povas vidi, temo estis kreita kun la nomo, kiun ni indikis en la kodo, la defaŭlta nombro da sekcioj (8, prenita de temo_dispartigoj - aplika objekta parametro), ĉar ni ne specifis individuan valoron por nia temo (per sekcioj). La lanĉita agento en la laboristo ricevas ĉiujn 8 sekciojn, ĉar ĝi estas la sola, sed ĉi tio estos diskutita pli detale en la parto pri clustering.
Nu, nun ni povas iri al alia fina fenestro kaj sendi malplenan mesaĝon al nia temo:
PS uzante @ ni montras, ke ni sendas mesaĝon al temo nomita "kolekto_sekuraĵoj".
En ĉi tiu kazo, la mesaĝo iris al sekcio 6 - vi povas kontroli tion irante al kafdrop on localhost:9000
Irante al la fina fenestro kun nia laboristo, ni vidos feliĉan mesaĝon senditan uzante loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Ni ankaŭ povas rigardi en mongo (uzante Robo3T aŭ Studio3T) kaj vidi, ke la valorpaperoj estas en la datumbazo:
Mi ne estas miliardulo, kaj tial ni kontentiĝas pri la unua spektada opcio.
Feliĉo kaj ĝojo - la unua agento estas preta :)
Agento preta, vivu la nova agento!
Jes, sinjoroj, ni kovris nur 1/3 de la vojo preparita de ĉi tiu artikolo, sed ne malkuraĝiĝu, ĉar nun estos pli facile.
Do nun ni bezonas agenton, kiu kolektas metainformojn kaj metas ĝin en kolektodokumenton:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Ĉar ĉi tiu agento prilaboros informojn pri specifa sekureco, ni devas indiki la ticker (simbolo) de ĉi tiu sekureco en la mesaĝo. Tiucele en faust ekzistas rekordoj — klasoj kiuj deklaras la mesaĝskemon en la agenta temo.
En ĉi tiu kazo, ni iru al records.pykaj priskribu kiel devus aspekti la mesaĝo por ĉi tiu temo:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kiel vi eble divenis, faust uzas la python-tipan komentadon por priskribi la mesaĝskemon, tial la minimuma versio subtenata de la biblioteko estas 3.6.
Ni revenu al la agento, agordu la tipojn kaj aldonu ĝin:
Kiel vi povas vidi, ni transdonas novan parametron kun skemo al la temo-komenciga metodo - value_type. Plue, ĉio sekvas la saman skemon, do mi vidas nenian signifon enloĝi pri io alia.
Nu, la fina tuŝo estas aldoni alvokon al la metainformkolekta agento por collect_securitites:
Ni uzas la antaŭe anoncitan skemon por la mesaĝo. En ĉi tiu kazo, mi uzis la metodon .cast ĉar ni ne bezonas atendi la rezulton de la agento, sed menciindas tion. manieroj sendu mesaĝon al la temo:
cast - ne blokas ĉar ĝi ne atendas rezulton. Vi ne povas sendi la rezulton al alia temo kiel mesaĝo.
sendi - ne blokas ĉar ĝi ne atendas rezulton. Vi povas specifi agenton en la temo al kiu la rezulto iros.
demandi — atendas rezulton. Vi povas specifi agenton en la temo al kiu la rezulto iros.
Do, tio estas ĉio kun agentoj por hodiaŭ!
La sonĝteamo
La lasta afero, kiun mi promesis skribi en ĉi tiu parto, estas komandoj. Kiel menciite pli frue, komandoj en faust estas envolvaĵo ĉirkaŭ klako. Fakte, faust simple ligas nian kutiman komandon al sia interfaco kiam oni specifas la -A-klavon
Post la anoncitaj agentoj en agentoj.py aldonu funkcion kun dekoraciisto app.komandonomante la metodon ero у kolekti_paperojn:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Tiel, se ni vokas la liston de komandoj, nia nova komando estos en ĝi:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Ni povas uzi ĝin kiel iu ajn alia, do ni rekomencu la faust-laboriston kaj komencu plenan kolekton de valorpaperoj:
> faust -A horton.agents start-collect-securities
Kio okazos poste?
En la sekva parto, uzante la ceterajn agentojn kiel ekzemplon, ni konsideros la sinkan mekanismon por serĉi ekstremojn en la fermaj prezoj de komerco por la jaro kaj la cron-lanĉo de agentoj.