Nii, nii, teine osa. Nagu varem kirjutatud, teeme selles järgmist:
Kirjutame aiohttp-le alfavantage väikekliendi koos meile vajalike lõpp-punktide päringutega.
Loome agendi, kes kogub andmeid väärtpaberite kohta ja metainfot nende kohta.
Kuid seda me teeme projekti enda jaoks ja fausti uurimise osas õpime, kuidas kirjutada agente, mis töötlevad sündmusi kafkast, samuti kuidas kirjutada käske (klõpsake ümbrist), meie puhul - käsitsi tõukesõnumite jaoks teemale, mida agent jälgib.
Koolitus
AlphaVantage klient
Kõigepealt kirjutame alfavantage taotluste jaoks väikese aiohttp-kliendi.
AlphaVantage API on üsna lihtsalt ja kaunilt kujundatud, nii et otsustasin esitada kõik taotlused selle meetodi kaudu construct_query kus omakorda on http-kõne.
Toon kõik põllud kohale snake_case mugavuseks.
Noh, kaunistus logger.catch kauni ja informatiivse jälgimisväljundi jaoks.
PS Ärge unustage lisada saidile config.yml kohapeal alfavantage-märki või eksportida keskkonnamuutujat HORTON_SERVICE_APIKEY. Saame märgi siin.
CRUD klass
Meil on väärtpaberikogu, et salvestada väärtpaberite metainfot.
Praegu on meil kõige lihtsam rakenduste loomine, veidi hiljem laiendame seda, kuid et mitte jätta teid ootama, siis siin viited App-klassi. Samuti soovitan teil vaadata seadete klassi, kuna see vastutab enamiku seadete eest.
Peamine osa
Agent väärtpaberite nimekirja kogumiseks ja pidamiseks
Niisiis, kõigepealt saame kiire rakenduse objekti - see on üsna lihtne. Järgmisena deklareerime oma agendi jaoks selgesõnaliselt teema... Siin tasub mainida, mis see on, mis on sisemine parameeter ja kuidas seda teisiti korraldada.
Kafka-keelsed teemad, kui tahame täpset määratlust teada, on parem lugeda väljas. dokumentvõi saate lugeda abstraktne venekeelsel Habrel, kus kõik ka päris täpselt kajastatud :)
Sisemine parameeter, mis on faust doc-is üsna hästi kirjeldatud, võimaldab meil teemat otse koodis seadistada, see tähendab muidugi fausti arendajate antud parameetreid, näiteks: retention, retention policy (vaikimisi kustutada, aga saab määrata kompaktne), partitsioonide arv teema kohta (hindedteha näiteks vähem kui globaalset tähtsust rakendused faust).
Üldiselt saab agent luua globaalsete väärtustega hallatava teema, kuid mulle meeldib kõike selgelt deklareerida. Lisaks ei saa agendikuulutuses oleva teema mõningaid parameetreid (nt partitsioonide arv või säilituspoliitika) seadistada.
Ilma teemat käsitsi määratlemata võib see välja näha järgmine:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Seega avame agendi alguses aiohttp-seansi oma kliendi kaudu päringute jaoks. Seega avatakse töötaja käivitamisel meie agendi käivitamisel kohe seanss – üks, kogu töötaja töötamise ajaks (või mitu, kui muudate parameetrit samaaegsus vaikeüksusega agendilt).
Järgmisena järgime voogu (paneme sõnumi sisse _, kuna meid selles agendis ei huvita oma teema sõnumite sisu, kui need on praegusel nihkel olemas, vastasel juhul ootab meie tsükkel nende saabumist. Noh, oma tsükli sees logime sõnumi vastuvõtmise, saame aktiivsete (get_securities tagastab vaikimisi ainult aktiivsed, vaata kliendi koodi) väärtpaberite nimekirja ja salvestame selle andmebaasi, kontrollides, kas seal on sama märgiga väärtpaberit ja vahetada andmebaasis , kui on, siis seda (paberit) lihtsalt uuendatakse.
Paneme oma loomingu käima!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS funktsioonid veebikomponent Fausti ma artiklites arvesse ei võta, seega panime sobiva lipu.
Käivituskäskluses ütlesime faust infologi väljundtasemega, kust rakendusobjekti otsida ja mida sellega teha (käivitada töötaja). Saame järgmise väljundi:
Vaatame partitsioonikomplekti. Nagu näeme, loodi teema nimega, mille koodis määrasime, partitsioonide vaikearv (8, võetud teema_partitsioonid - rakenduse objekti parameeter), kuna me ei määranud oma teema jaoks individuaalset väärtust (sektsioonide kaudu). Töötajas käivitatud agendile on määratud kõik 8 partitsiooni, kuna see on ainuke, kuid sellest räägitakse üksikasjalikumalt klasterdamist käsitlevas osas.
Noh, nüüd saame minna teise terminali aknasse ja saata meie teemale tühja sõnumi:
PS kasutamine @ näitame, et saadame sõnumi teemale "koguda_väärtpaberid".
Sel juhul läks teade partitsioonile 6 – seda saate kontrollida, minnes kafdrop on localhost:9000
Oma töötajaga terminali aknasse minnes näeme loguru abil saadetud rõõmsat teadet:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Samuti saame vaadata mongot (kasutades Robo3T või Studio3T) ja näha, et väärtpaberid on andmebaasis:
Ma ei ole miljardär ja seetõttu oleme esimese vaatamisvõimalusega rahul.
Õnn ja rõõm - esimene agent on valmis :)
Agent valmis, elagu uus agent!
Jah, härrased, oleme läbinud vaid 1/3 selle artikliga ette valmistatud teest, kuid ärge heitke meelt, sest nüüd on see lihtsam.
Nüüd vajame agenti, kes kogub metateavet ja paneb selle kogumisdokumenti:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Kuna see agent töötleb teavet konkreetse väärtpaberi kohta, peame sõnumis märkima selle väärtpaberi märgi (sümboli). Selleks on faustis olemas Andmed — klassid, mis deklareerivad agendi teemas sõnumiskeemi.
Sel juhul lähme edasi records.pyja kirjeldage, milline peaks selle teema sõnum välja nägema:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Nagu arvata võis, kasutab Faust sõnumiskeemi kirjeldamiseks pythoni tüüpi annotatsiooni, mistõttu on teegi poolt toetatud minimaalne versioon 3.6.
Pöördume tagasi agendi juurde, määrake tüübid ja lisage see:
Nagu näete, edastame teema initsialiseerimise meetodile uue parameetri koos skeemiga - väärtus_tüüp. Lisaks käib kõik sama skeemi järgi, nii et ma ei näe mõtet millegi muu kallal peatuda.
Noh, viimane lihv on lisada metateabe kogumise agendile call_securitites:
Sõnumi jaoks kasutame varem välja kuulutatud skeemi. Sel juhul kasutasin .cast meetodit, kuna me ei pea ootama agendi tulemust, kuid tasub mainida, et viisil saada sõnum teemale:
valatud - ei blokeeri, sest ei oota tulemust. Tulemust ei saa sõnumina teisele teemale saata.
saada – ei blokeeri, sest ei oota tulemust. Teemas saate määrata agendi, kellele tulemus suunatakse.
küsi - ootab tulemust. Teemas saate määrata agendi, kellele tulemus suunatakse.
Tänaseks on agentidega kõik!
Unistuste meeskond
Viimane asi, mida ma sellesse ossa lubasin kirjutada, on käsud. Nagu varem mainitud, on fausti käsud klõpsamise ümber. Tegelikult lisab Faust klahvi -A määramisel lihtsalt meie kohandatud käsu oma liidesele
Pärast agentide väljakuulutamist agents.py lisa dekoraatoriga funktsioon app.commandkutsudes meetodit koo у koguda_väärtpabereid:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Seega, kui kutsume välja käskude loendi, on meie uus käsk selles:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Saame seda kasutada nagu iga teinegi, nii et taaskäivitagem Faust Worker ja alustame täieõigusliku väärtpaberite kogumisega:
> faust -A horton.agents start-collect-securities
Mis saab edasi?
Järgmises osas vaatleme ülejäänud agentide näitel vajumismehhanismi aasta tehingute sulgemishindades äärmuste otsimiseks ja agentide kroonu käivitamiseks.