Altså, anden del. Som skrevet tidligere vil vi i den gøre følgende:
Lad os skrive en lille klient til alphavantage på aiohttp med anmodninger om de endepunkter, vi har brug for.
Lad os oprette en agent, der vil indsamle data om værdipapirer og metaoplysninger om dem.
Men det er, hvad vi vil gøre for selve projektet, og med hensyn til faust research, vil vi lære, hvordan man skriver agenter, der behandler stream-hændelser fra kafka, samt hvordan man skriver kommandoer (click wrapper), i vores tilfælde - for manuelle push-meddelelser til det emne, som agenten overvåger.
Træning
AlphaVantage klient
Lad os først skrive en lille aiohttp-klient for anmodninger om alphavantage.
AlphaVantage API er ganske enkelt og smukt designet, så jeg besluttede at lave alle anmodninger gennem metoden construct_query hvor der til gengæld er et http-kald.
Jeg bringer alle markerne til snake_case for komfort.
Nå, logger.catch-dekorationen til smuk og informativ traceback-output.
PS Glem ikke at tilføje alphavantage-tokenet lokalt til config.yml, eller eksportere miljøvariablen HORTON_SERVICE_APIKEY. Vi modtager et token her.
CRUD klasse
Vi vil have en værdipapirsamling til at gemme metaoplysninger om værdipapirer.
For nu vil vi have den enkleste applikationsoprettelse, lidt senere udvider vi den dog for ikke at lade dig vente, her referencer til App-klasse. Jeg råder dig også til at tage et kig på indstillingsklassen, da den er ansvarlig for de fleste af indstillingerne.
Hoveddelen
Agent for indsamling og vedligeholdelse af en liste over værdipapirer
Så først får vi faust-applikationsobjektet - det er ret simpelt. Dernæst erklærer vi eksplicit et emne for vores agent... Her er det værd at nævne, hvad det er, hvad den interne parameter er, og hvordan dette kan arrangeres anderledes.
Emner i kafka, hvis vi vil kende den nøjagtige definition, er det bedre at læse af. dokument, eller du kan læse kompendium på Habré på russisk, hvor alt også afspejles ret præcist :)
Intern parameter, som er ganske godt beskrevet i faust-dokumentet, giver os mulighed for at konfigurere emnet direkte i koden, det betyder selvfølgelig de parametre, som faust-udviklerne har leveret, for eksempel: retention, retention policy (som standard slette, men du kan indstille kompakt), antal partitioner pr. emne (scoresat gøre for eksempel mindre end global betydning applikationer faust).
Generelt kan agenten oprette et administreret emne med globale værdier, dog kan jeg godt lide at erklære alt eksplicit. Derudover kan nogle parametre (f.eks. antallet af partitioner eller opbevaringspolitik) for emnet i agentannoncen ikke konfigureres.
Sådan kan det se ud uden manuelt at definere emnet:
Nå, lad os nu beskrive, hvad vores agent vil gøre :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Så i begyndelsen af agenten åbner vi en aiohttp-session for anmodninger gennem vores klient. Når en arbejder startes, når vores agent startes, vil der straks blive åbnet en session - én, for hele tiden arbejderen kører (eller flere, hvis du ændrer parameteren samtidighed fra en agent med en standardenhed).
Dernæst følger vi strømmen (vi placerer beskeden i _, da vi i denne agent er ligeglade med indholdet) af meddelelser fra vores emne, hvis de eksisterer ved den aktuelle offset, ellers vil vores cyklus vente på deres ankomst. Nå, inde i vores løkke logger vi modtagelsen af meddelelsen, får en liste over aktive (get_securities returnerer kun aktive som standard, se klientkode) værdipapirer og gemmer det i databasen, tjekker om der er et værdipapir med samme ticker og udveksling i databasen, hvis der er, så vil den (papiret) simpelthen blive opdateret.
Lad os lancere vores skabelse!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS funktioner web-komponent Jeg vil ikke overveje faust i artiklerne, så vi sætter det passende flag.
I vores startkommando fortalte vi faust, hvor man skulle lede efter applikationsobjektet, og hvad man skulle gøre med det (start en arbejder) med informationslogoutputniveauet. Vi får følgende output:
Lad os se på partitionssættet. Som vi kan se, blev der oprettet et emne med det navn, som vi angav i koden, standardantallet af partitioner (8, taget fra emne_partitioner - application object parameter), da vi ikke specificerede en individuel værdi for vores emne (via partitioner). Den lancerede agent i arbejderen er tildelt alle 8 partitioner, da den er den eneste, men dette vil blive diskuteret mere detaljeret i delen om klyngedannelse.
Nå, nu kan vi gå til et andet terminalvindue og sende en tom besked til vores emne:
PS bruger @ vi viser, at vi sender en besked til et emne med navnet "collect_securities".
I dette tilfælde gik beskeden til partition 6 - du kan tjekke dette ved at gå til kafdrop on localhost:9000
Går vi til terminalvinduet med vores arbejder, vil vi se en glad besked sendt med loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Vi kan også kigge på mongo (ved hjælp af Robo3T eller Studio3T) og se, at værdipapirerne er i databasen:
Jeg er ikke milliardær, og derfor nøjes vi med den første visningsmulighed.
Lykke og glæde - den første agent er klar :)
Agent klar, længe leve den nye agent!
Ja, mine herrer, vi har kun dækket 1/3 af den vej, som denne artikel har forberedt, men bliv ikke afskrækket, for nu bliver det nemmere.
Så nu har vi brug for en agent, der indsamler metainformation og sætter den i et indsamlingsdokument:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Da denne agent vil behandle oplysninger om et specifikt værdipapir, er vi nødt til at angive ticker (symbol) for dette værdipapir i meddelelsen. Til dette formål i faust er der Records — klasser, der erklærer meddelelsesskemaet i agentemnet.
I dette tilfælde, lad os gå til records.pyog beskriv hvordan budskabet til dette emne skal se ud:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Som du måske har gættet, bruger faust annoteringen af python-typen til at beskrive meddelelsesskemaet, hvorfor den minimumsversion, der understøttes af biblioteket, er 3.6.
Lad os vende tilbage til agenten, indstille typerne og tilføje det:
Som du kan se, sender vi en ny parameter med et skema til emneinitialiseringsmetoden - værdi_type. Yderligere følger alt det samme skema, så jeg kan ikke se nogen mening i at dvæle ved andet.
Nå, den sidste touch er at tilføje et opkald til metainformationsindsamlingsagenten til collect_securitites:
Vi bruger den tidligere udmeldte ordning til beskeden. I dette tilfælde brugte jeg .cast-metoden, da vi ikke behøver at vente på resultatet fra agenten, men det er værd at nævne, at måder send en besked til emnet:
cast - blokerer ikke, fordi den ikke forventer et resultat. Du kan ikke sende resultatet til et andet emne som en besked.
send - blokerer ikke, fordi den ikke forventer et resultat. Du kan angive en agent i emnet, som resultatet skal gå til.
spørge - venter på et resultat. Du kan angive en agent i emnet, som resultatet skal gå til.
Så det er alt med agenter for i dag!
Drømmeholdet
Det sidste, jeg lovede at skrive i denne del, er kommandoer. Som tidligere nævnt er kommandoer i faust en indpakning omkring klik. Faktisk knytter faust blot vores brugerdefinerede kommando til dens grænseflade, når han angiver -A-tasten
Efter de annoncerede agenter ind agents.py tilføje en funktion med en dekoratør app.kommandokalder metoden støbt у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Så hvis vi kalder listen over kommandoer, vil vores nye kommando være i den:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Vi kan bruge det som alle andre, så lad os genstarte faust-arbejderen og begynde en fuldgyldig samling af værdipapirer:
> faust -A horton.agents start-collect-securities
Hvad sker der nu?
I den næste del, ved at bruge de resterende agenter som eksempel, vil vi overveje synkemekanismen til at søge efter ekstremer i årets lukkepriser for handel og lanceringen af agenter.