Så, så, den andre delen. Som skrevet tidligere, i den vil vi gjøre følgende:
La oss skrive en liten klient for alphavantage på aiohttp med forespørsler om endepunktene vi trenger.
La oss lage en agent som vil samle inn data om verdipapirer og metainformasjon om dem.
Men, dette er hva vi skal gjøre for selve prosjektet, og når det gjelder faust-forskning, vil vi lære å skrive agenter som behandler strømmehendelser fra kafka, samt hvordan man skriver kommandoer (click wrapper), i vårt tilfelle - for manuelle push-meldinger til emnet som agenten overvåker.
Trening
AlphaVantage-klient
La oss først skrive en liten aiohttp-klient for forespørsler til alphavantage.
AlphaVantage API er ganske enkelt og vakkert designet, så jeg bestemte meg for å gjøre alle forespørsler gjennom metoden construct_query hvor det igjen er et http-kall.
Jeg tar med alle feltene til snake_case for enkelhets skyld.
Vel, logger.catch-dekorasjonen for vakker og informativ tilbakesporing.
PS Ikke glem å legge til alphavantage-tokenet lokalt til config.yml, eller eksporter miljøvariabelen HORTON_SERVICE_APIKEY. Vi mottar en token her.
CRUD klasse
Vi vil ha en verdipapirsamling for å lagre metainformasjon om verdipapirer.
Foreløpig vil vi ha den enkleste applikasjonsopprettingen, litt senere vil vi utvide den, men for ikke å la deg vente, her referanser til App-klassen. Jeg anbefaler deg også å ta en titt på innstillingsklassen, siden den er ansvarlig for de fleste innstillingene.
Hoveddel
Agent for innsamling og vedlikehold av en liste over verdipapirer
Så først får vi faust-applikasjonsobjektet - det er ganske enkelt. Deretter erklærer vi eksplisitt et emne for vår agent... Her er det verdt å nevne hva det er, hva den interne parameteren er og hvordan dette kan ordnes annerledes.
Emner i kafka, hvis vi vil vite den nøyaktige definisjonen, er det bedre å lese av. dokument, eller du kan lese kompendium på Habré på russisk, hvor alt også reflekteres ganske nøyaktig :)
Intern parameter, beskrevet ganske godt i faust-dokumentet, lar oss konfigurere emnet direkte i koden, selvfølgelig betyr dette parametrene gitt av faust-utviklerne, for eksempel: oppbevaring, oppbevaringspolicy (som standard slette, men du kan angi kompakt), antall partisjoner per emne (skårerå gjøre for eksempel mindre enn global betydning applikasjoner faust).
Generelt kan agenten lage et administrert emne med globale verdier, men jeg liker å erklære alt eksplisitt. I tillegg kan enkelte parametere (for eksempel antall partisjoner eller oppbevaringspolicy) for emnet i agentannonsen ikke konfigureres.
Vel, la oss nå beskrive hva agenten vår vil gjøre :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Så i begynnelsen av agenten åpner vi en aiohttp-sesjon for forespørsler gjennom vår klient. Når du starter en arbeider, når agenten vår er lansert, vil en økt umiddelbart åpnes - én, for hele tiden arbeideren kjører (eller flere, hvis du endrer parameteren samtidighet fra en agent med en standardenhet).
Deretter følger vi strømmen (vi legger inn meldingen _, siden vi, i denne agenten, ikke bryr oss om innholdet) av meldinger fra emnet vårt, hvis de eksisterer med gjeldende forskyvning, ellers vil syklusen vår vente på deres ankomst. Vel, inne i løkken vår logger vi mottaket av meldingen, får en liste over aktive (get_securities returnerer bare aktive som standard, se klientkode) verdipapirer og lagrer det i databasen, sjekker om det er et verdipapir med samme ticker og utveksling i databasen, hvis det er det, vil den (papiret) ganske enkelt bli oppdatert.
La oss lansere vår kreasjon!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS-funksjoner web-komponent Jeg vil ikke vurdere faust i artiklene, så vi setter riktig flagg.
I lanseringskommandoen vår fortalte vi faust hvor de skulle se etter applikasjonsobjektet og hva de skulle gjøre med det (starte en arbeider) med informasjonsloggutdatanivået. Vi får følgende utgang:
La oss se på partisjonssettet. Som vi kan se, ble det opprettet et emne med navnet som vi angav i koden, standard antall partisjoner (8, hentet fra emne_partisjoner - applikasjonsobjektparameter), siden vi ikke spesifiserte en individuell verdi for emnet vårt (via partisjoner). Den lanserte agenten i arbeideren er tildelt alle 8 partisjoner, siden den er den eneste, men dette vil bli diskutert mer detaljert i delen om klynging.
Vel, nå kan vi gå til et annet terminalvindu og sende en tom melding til emnet vårt:
PS bruker @ vi viser at vi sender en melding til et emne som heter "collect_securities".
I dette tilfellet gikk meldingen til partisjon 6 - du kan sjekke dette ved å gå til kafdrop on localhost:9000
Når vi går til terminalvinduet med arbeideren vår, vil vi se en glad melding sendt med loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Vi kan også se på mongo (ved å bruke Robo3T eller Studio3T) og se at verdipapirene er i databasen:
Jeg er ingen milliardær, og derfor er vi fornøyd med det første visningsalternativet.
Lykke og glede - den første agenten er klar :)
Agent klar, lenge leve den nye agenten!
Ja, mine herrer, vi har bare dekket 1/3 av veien utarbeidet av denne artikkelen, men ikke mist motet, for nå blir det lettere.
Så nå trenger vi en agent som samler inn metainformasjon og legger den inn i et innsamlingsdokument:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Siden denne agenten vil behandle informasjon om et bestemt verdipapir, må vi angi ticker (symbol) for dette verdipapiret i meldingen. For dette formålet i faust er det Records — klasser som erklærer meldingsskjemaet i agentemnet.
I dette tilfellet, la oss gå til records.pyog beskriv hvordan meldingen for dette emnet skal se ut:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Som du kanskje har gjettet, bruker faust merknaden python-type for å beskrive meldingsskjemaet, og det er grunnen til at minimumsversjonen som støttes av biblioteket er 3.6.
La oss gå tilbake til agenten, angi typene og legge den til:
Som du kan se, sender vi en ny parameter med et skjema til emneinitialiseringsmetoden - verdi_type. Videre følger alt det samme opplegget, så jeg ser ikke noe poeng i å dvele ved noe annet.
Vel, den siste detaljen er å legge til et kall til metainformasjonsinnsamlingsagenten til collect_securitites:
Vi bruker tidligere annonsert ordning for meldingen. I dette tilfellet brukte jeg .cast-metoden siden vi ikke trenger å vente på resultatet fra agenten, men det er verdt å nevne at måter å send en melding til emnet:
cast - blokkerer ikke fordi den ikke forventer et resultat. Du kan ikke sende resultatet til et annet emne som en melding.
send - blokkerer ikke fordi den ikke forventer et resultat. Du kan spesifisere en agent i emnet som resultatet skal gå til.
spør - venter på et resultat. Du kan spesifisere en agent i emnet som resultatet skal gå til.
Så, det er alt med agenter for i dag!
Drømmelaget
Det siste jeg lovet å skrive i denne delen er kommandoer. Som nevnt tidligere, er kommandoer i faust en omslag rundt klikk. Faktisk knytter faust ganske enkelt vår egendefinerte kommando til grensesnittet når den spesifiserer -A-tasten
Etter de annonserte agentene inn agents.py legg til en funksjon med en dekoratør app.kommandokaller metoden kastet у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Derfor, hvis vi kaller listen over kommandoer, vil vår nye kommando være i den:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Vi kan bruke det som alle andre, så la oss starte faust-arbeideren på nytt og starte en fullverdig samling av verdipapirer:
> faust -A horton.agents start-collect-securities
Hva vil skje videre?
I neste del, med de gjenværende agentene som eksempel, vil vi se på synkemekanismen for å søke etter ytterpunkter i sluttkursene for handel for året og kronlanseringen av agenter.