SÄ, sÄ, andra delen. Som skrivits tidigare kommer vi att göra följande i den:
LÄt oss skriva en liten klient för alphavantage pÄ aiohttp med förfrÄgningar om de slutpunkter vi behöver.
LÄt oss skapa en agent som kommer att samla in data om vÀrdepapper och metainformation om dem.
Men det hÀr Àr vad vi kommer att göra för sjÀlva projektet, och nÀr det gÀller snabbforskning, kommer vi att lÀra oss hur man skriver agenter som bearbetar strömningshÀndelser frÄn kafka, sÄvÀl som hur man skriver kommandon (click wrapper), i vÄrt fall - för manuella push-meddelanden till Àmnet som agenten övervakar.
Utbildning
AlphaVantage-klient
LÄt oss först skriva en liten aiohttp-klient för förfrÄgningar till alphavantage.
AlphaVantage API Àr ganska enkelt och vackert designat, sÄ jag bestÀmde mig för att göra alla förfrÄgningar genom metoden construct_query dÀr det i sin tur finns ett http-anrop.
Jag tar med alla fÀlt till snake_case för komfort.
Jo, logger.catch-dekorationen för vacker och informativ spÄrning.
PS Glöm inte att lÀgga till alphavantage-token lokalt till config.yml, eller exportera miljövariabeln HORTON_SERVICE_APIKEY. Vi fÄr en token hÀr.
CRUD klass
Vi kommer att ha en vÀrdepapperssamling för att lagra metainformation om vÀrdepapper.
För nu kommer vi att ha den enklaste applikationsskapandet, lite senare kommer vi att utöka det, dock för att inte lÄta dig vÀnta, hÀr referenser till App-klass. Jag rÄder dig ocksÄ att ta en titt pÄ instÀllningsklassen, eftersom den Àr ansvarig för de flesta instÀllningarna.
Huvuddelen
Agent för insamling och underhÄll av en lista över vÀrdepapper
SÄ först fÄr vi faust-applikationsobjektet - det Àr ganska enkelt. DÀrefter deklarerar vi uttryckligen ett Àmne för vÄr agent... HÀr Àr det vÀrt att nÀmna vad det Àr, vad den interna parametern Àr och hur detta kan ordnas annorlunda.
Parameter intern, som beskrivs ganska bra i faust-dokumentet, tillÄter oss att konfigurera Àmnet direkt i koden, naturligtvis betyder detta parametrarna som tillhandahÄlls av faust-utvecklarna, till exempel: retention, retention policy (som standard radera, men du kan stÀlla in kompakt), antal partitioner per Àmne (poÀngatt göra till exempel mindre Àn global betydelse applikationer faust).
I allmÀnhet kan agenten skapa ett hanterat Àmne med globala vÀrden, men jag gillar att deklarera allt explicit. Dessutom kan vissa parametrar (till exempel antalet partitioner eller lagringspolicy) för Àmnet i agentannonsen inte konfigureras.
SÄ hÀr kan det se ut utan att manuellt definiera Àmnet:
NÄvÀl, lÄt oss nu beskriva vad vÄr agent kommer att göra :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
SÄ i början av agenten öppnar vi en aiohttp-session för förfrÄgningar genom vÄr klient. SÄlunda, nÀr en arbetare startar, nÀr vÄr agent startas, kommer en session omedelbart att öppnas - en, under hela tiden arbetaren Àr igÄng (eller flera, om du Àndrar parametern samtidighet frÄn en agent med en standardenhet).
DÀrefter följer vi strömmen (vi placerar meddelandet i _, eftersom vi, i denna agent, inte bryr oss om innehÄllet) av meddelanden frÄn vÄrt Àmne, om de finns vid den aktuella offset, annars kommer vÄr cykel att vÀnta pÄ deras ankomst. Tja, inne i vÄr loop loggar vi mottagandet av meddelandet, fÄr en lista över aktiva (get_securities returnerar endast aktiva som standard, se kundkod) vÀrdepapper och sparar det i databasen, kontrollerar om det finns ett vÀrdepapper med samma ticker och utbyte i databasen, om det finns, kommer det (papperet) helt enkelt att uppdateras.
LÄt oss lansera vÄr skapelse!
> docker-compose up -d
... ĐĐ°ĐżŃŃĐș ĐșĐŸĐœŃĐ”ĐčĐœĐ”ŃĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info
PS funktioner webbkomponent Jag kommer inte att övervÀga faust i artiklarna, sÄ vi sÀtter rÀtt flagga.
I vÄrt startkommando berÀttade vi för faust var man skulle leta efter applikationsobjektet och vad man skulle göra med det (starta en arbetare) med informationsloggutdatanivÄn. Vi fÄr följande utdata:
LÄt oss titta pÄ partitionsuppsÀttningen. Som vi kan se skapades ett Àmne med namnet som vi angav i koden, standardantalet partitioner (8, hÀmtat frÄn topic_partitions - application object parameter), eftersom vi inte angav ett individuellt vÀrde för vÄrt Àmne (via partitioner). Den startade agenten i arbetaren tilldelas alla 8 partitioner, eftersom det Àr den enda, men detta kommer att diskuteras mer detaljerat i delen om klustring.
NÄvÀl, nu kan vi gÄ till ett annat terminalfönster och skicka ett tomt meddelande till vÄrt Àmne:
PS anvÀnder @ vi visar att vi skickar ett meddelande till ett Àmne som heter "collect_securities".
I det hÀr fallet gick meddelandet till partition 6 - du kan kontrollera detta genom att gÄ till kafdrop on localhost:9000
NÀr vi gÄr till terminalfönstret med vÄr arbetare kommer vi att se ett glatt meddelande skickat med loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Vi kan ocksÄ titta pÄ mongo (med Robo3T eller Studio3T) och se att vÀrdepapperen finns i databasen:
Jag Àr ingen miljardÀr, och dÀrför nöjer vi oss med det första visningsalternativet.
Lycka och glÀdje - den första agenten Àr redo :)
Agent redo, lÀnge leve den nya agenten!
Ja, mina herrar, vi har bara tÀckt 1/3 av vÀgen som utarbetats av den hÀr artikeln, men var inte avskrÀckta, för nu kommer det att bli lÀttare.
SÄ nu behöver vi en agent som samlar in metainformation och lÀgger in den i ett insamlingsdokument:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Eftersom denna agent kommer att bearbeta information om ett specifikt vĂ€rdepapper, mĂ„ste vi ange tickern (symbolen) för detta vĂ€rdepapper i meddelandet. För detta Ă€ndamĂ„l i faust finns det Register â klasser som deklarerar meddelandeschemat i agentĂ€mnet.
I det hÀr fallet, lÄt oss gÄ till records.pyoch beskriv hur meddelandet för detta Àmne ska se ut:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Som du kanske har gissat anvÀnder faust annoteringen av pythontyp för att beskriva meddelandeschemat, varför den minsta versionen som stöds av biblioteket Àr 3.6.
LÄt oss ÄtergÄ till agenten, stÀlla in typerna och lÀgga till den:
Som du kan se skickar vi en ny parameter med ett schema till Àmnesinitieringsmetoden - value_type. Dessutom följer allt samma schema, sÄ jag ser ingen mening med att uppehÄlla mig vid nÄgot annat.
Tja, sista handen Àr att lÀgga till ett samtal till agenten för insamling av metainformation till collect_securitites:
Vi anvÀnder det tidigare aviserade schemat för meddelandet. I det hÀr fallet anvÀnde jag .cast-metoden eftersom vi inte behöver vÀnta pÄ resultatet frÄn agenten, men det Àr vÀrt att nÀmna att sÀtt skicka ett meddelande till Àmnet:
cast - blockerar inte eftersom den inte förvÀntar sig ett resultat. Du kan inte skicka resultatet till ett annat Àmne som ett meddelande.
skicka - blockerar inte eftersom det inte förvÀntar sig ett resultat. Du kan ange en agent i Àmnet som resultatet ska gÄ till.
frÄga - vÀntar pÄ ett resultat. Du kan ange en agent i Àmnet som resultatet ska gÄ till.
SÄ, det Àr allt med agenter för idag!
Dröm teamet
Det sista jag lovade att skriva i den hÀr delen Àr kommandon. Som nÀmnts tidigare Àr kommandon i faust ett omslag runt klick. Faktum Àr att faust helt enkelt kopplar vÄrt anpassade kommando till dess grÀnssnitt nÀr du anger -A-tangenten
Efter de annonserade agenterna in agents.py lĂ€gg till en funktion med en dekoratör app.commandkallar metoden gjutas Ń collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
SÄledes, om vi anropar listan med kommandon, kommer vÄrt nya kommando att finnas i den:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Vi kan anvÀnda det som alla andra, sÄ lÄt oss starta om faust-arbetaren och pÄbörja en fullfjÀdrad samling av vÀrdepapper:
> faust -A horton.agents start-collect-securities
Vad kommer hÀnda hÀrnÀst?
I nÀsta del, med de ÄterstÄende agenterna som exempel, kommer vi att övervÀga sjunkmekanismen för att söka efter ytterligheter i stÀngningspriserna för handel för Äret och kronlanseringen av agenter.
Det Àr allt för idag! Tack för att du lÀser :)