So, so, die tweede deel. Soos vroeër geskryf, sal ons die volgende doen:
Kom ons skryf 'n klein kliënt vir alphavantage op aiohttp met versoeke vir die eindpunte wat ons benodig.
Kom ons skep 'n agent wat data oor sekuriteite en meta-inligting daaroor sal insamel.
Maar dit is wat ons vir die projek self sal doen, en in terme van faustnavorsing, sal ons leer hoe om agente te skryf wat stroomgebeure vanaf kafka verwerk, asook hoe om opdragte (click wrapper) te skryf, in ons geval - vir handmatige drukboodskappe na die onderwerp wat die agent monitor.
Opleiding
AlphaVantage kliënt
Kom ons skryf eers 'n klein aiohttp-kliënt vir versoeke om alphavantage.
Die AlphaVantage API is eenvoudig en pragtig ontwerp, so ek het besluit om alle versoeke deur die metode te rig construct_query waar daar weer 'n http-oproep is.
Ek bring al die velde na snake_case vir troos.
Wel, die logger.catch-versiering vir pragtige en insiggewende naspooruitset.
NS Moenie vergeet om die alphavantage-token plaaslik by config.yml by te voeg, of die omgewingsveranderlike uit te voer nie HORTON_SERVICE_APIKEY. Ons ontvang 'n teken hier.
CRUD klas
Ons sal 'n sekuriteitsversameling hê om meta-inligting oor sekuriteite te stoor.
Vir nou sal ons die eenvoudigste toepassingskepping hê, 'n bietjie later sal ons dit egter uitbrei om nie te laat wag nie, hier verwysings na App-klas. Ek raai jou ook aan om na die instellingsklas te kyk, aangesien dit verantwoordelik is vir die meeste van die instellings.
Hoofliggaam
Agent vir die insameling en instandhouding van 'n lys van sekuriteite
So, eers kry ons die faust-toepassingsvoorwerp - dit is redelik eenvoudig. Vervolgens verklaar ons uitdruklik 'n onderwerp vir ons agent ... Hier is dit die moeite werd om te noem wat dit is, wat die interne parameter is en hoe dit anders gereël kan word.
Onderwerpe in kafka, as ons die presiese definisie wil weet, is dit beter om te lees af. dokument, of jy kan lees kompendium op Habré in Russies, waar alles ook redelik akkuraat weerspieël word :)
Interne parameter, wat redelik goed beskryf word in die faust doc, stel ons in staat om die onderwerp direk in die kode op te stel, natuurlik, dit beteken die parameters wat deur die faust-ontwikkelaars verskaf word, byvoorbeeld: retensie, retensiebeleid (by verstek verwyder, maar jy kan stel compact), aantal partisies per onderwerp (tellingsom byvoorbeeld minder te doen as wêreldwye betekenis toepassings faust).
Oor die algemeen kan die agent 'n bestuurde onderwerp met globale waardes skep, maar ek hou daarvan om alles uitdruklik te verklaar. Daarbenewens kan sommige parameters (byvoorbeeld die aantal partisies of retensiebeleid) van die onderwerp in die agentadvertensie nie opgestel word nie.
Hier is hoe dit kan lyk sonder om die onderwerp handmatig te definieer:
Wel, laat ons nou beskryf wat ons agent sal doen :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Dus, aan die begin van die agent, maak ons 'n aiohttp-sessie oop vir versoeke deur ons kliënt. Dus, wanneer 'n werker begin word, wanneer ons agent geloods word, sal 'n sessie onmiddellik oopgemaak word - een, vir die hele tyd wat die werker aan die gang is (of verskeie, as jy die parameter verander gelyktydigheid van 'n agent met 'n verstekeenheid).
Volgende volg ons die stroom (ons plaas die boodskap in _, aangesien ons, in hierdie agent, nie omgee vir die inhoud) van boodskappe uit ons onderwerp, as hulle by die huidige afwyking bestaan, anders sal ons siklus wag vir hul aankoms. Wel, binne ons lus teken ons die ontvangs van die boodskap aan, kry 'n lys van aktiewe (get_securities gee slegs aktief by verstek terug, sien kliëntkode) sekuriteite en stoor dit in die databasis, en kyk of daar 'n sekuriteit is met dieselfde tikker en ruil in die databasis , as daar is, dan sal dit (die vraestel) eenvoudig opgedateer word.
Kom ons begin ons skepping!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Kenmerke web komponent Ek sal nie faust in die artikels oorweeg nie, so ons stel die toepaslike vlag.
In ons bekendstellingsbevel het ons vir faust gesê waar om na die toepassingsvoorwerp te soek en wat om daarmee te doen (begin 'n werker) met die infolog-uitsetvlak. Ons kry die volgende uitset:
Kom ons kyk na die partisiestel. Soos ons kan sien, is 'n onderwerp geskep met die naam wat ons in die kode aangewys het, die verstek aantal partisies (8, geneem uit onderwerp_partisies - toepassingsobjekparameter), aangesien ons nie 'n individuele waarde vir ons onderwerp gespesifiseer het nie (via partisies). Aan die geloodsde agent in die werker word al 8 partisies toegewys, aangesien dit die enigste een is, maar dit sal in meer besonderhede in die deel oor groepering bespreek word.
Wel, nou kan ons na 'n ander terminale venster gaan en 'n leë boodskap na ons onderwerp stuur:
PS gebruik @ ons wys dat ons 'n boodskap stuur na 'n onderwerp genaamd "versamel_sekuriteite".
In hierdie geval het die boodskap na partisie 6 gegaan - jy kan dit nagaan deur na kafdrop aan te gaan localhost:9000
As ons saam met ons werker na die terminale venster gaan, sal ons 'n gelukkige boodskap sien wat met loguru gestuur word:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Ons kan ook na mongo kyk (met behulp van Robo3T of Studio3T) en sien dat die sekuriteite in die databasis is:
Ek is nie 'n miljardêr nie, en daarom is ons tevrede met die eerste kykopsie.
Geluk en vreugde - die eerste agent is gereed :)
Agent gereed, lank lewe die nuwe agent!
Ja, here, ons het net 1/3 van die pad afgelê wat deur hierdie artikel voorberei is, maar moenie moedeloos wees nie, want nou sal dit makliker wees.
So nou het ons 'n agent nodig wat meta-inligting insamel en dit in 'n versamelingsdokument plaas:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Aangesien hierdie agent inligting oor 'n spesifieke sekuriteit sal verwerk, moet ons die tikker (simbool) van hierdie sekuriteit in die boodskap aandui. Vir hierdie doel in faust is daar Rekords — klasse wat die boodskapskema in die agentonderwerp verklaar.
In hierdie geval, kom ons gaan na rekords.pyen beskryf hoe die boodskap vir hierdie onderwerp moet lyk:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Soos jy dalk geraai het, gebruik faust die python-tipe annotasie om die boodskapskema te beskryf, en daarom is die minimum weergawe wat deur die biblioteek ondersteun word 3.6.
Kom ons keer terug na die agent, stel die tipes in en voeg dit by:
Soos u kan sien, gee ons 'n nuwe parameter met 'n skema deur na die onderwerp-inisialiseringsmetode - waarde_tipe. Verder volg alles dieselfde skema, so ek sien geen sin daarin om by enigiets anders stil te staan nie.
Wel, die finale aanraking is om 'n oproep by die meta-inligtingversamelingsagent te voeg na collect_securitites:
Ons gebruik die voorheen aangekondigde skema vir die boodskap. In hierdie geval het ek die .cast-metode gebruik aangesien ons nie hoef te wag vir die uitslag van die agent nie, maar dit is die moeite werd om te noem dat maniere stuur 'n boodskap na die onderwerp:
cast - blokkeer nie omdat dit nie 'n resultaat verwag nie. Jy kan nie die resultaat as 'n boodskap na 'n ander onderwerp stuur nie.
stuur - blokkeer nie omdat dit nie 'n resultaat verwag nie. Jy kan 'n agent in die onderwerp spesifiseer waarna die resultaat sal gaan.
vra - wag vir 'n uitslag. Jy kan 'n agent in die onderwerp spesifiseer waarna die resultaat sal gaan.
So, dit is alles met agente vir vandag!
Die droomspan
Die laaste ding wat ek belowe het om in hierdie deel te skryf, is opdragte. Soos vroeër genoem, is opdragte in faust 'n omhulsel rondom klik. Trouens, faust heg eenvoudig ons persoonlike opdrag aan sy koppelvlak wanneer die -A-sleutel gespesifiseer word
Nadat die aangekondigde agente in agents.py voeg 'n funksie met 'n versierder by app.opdragroep die metode gooi у versamel_sekuriteite:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Dus, as ons die lys van opdragte noem, sal ons nuwe opdrag daarin wees:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Ons kan dit gebruik soos enigiemand anders, so kom ons begin die faust-werker weer en begin met 'n volwaardige versameling van sekuriteite:
> faust -A horton.agents start-collect-securities
Wat sal volgende gebeur?
In die volgende deel, met die oorblywende agente as 'n voorbeeld, sal ons die sinkmeganisme oorweeg om na uiterstes te soek in die sluitingspryse van verhandeling vir die jaar en die kroonbekendstelling van agente.