Així doncs, la segona part. Com s'ha escrit anteriorment, en ell farem el següent:
Escrivim un petit client per alphavantage a aiohttp amb sol·licituds per als punts finals que necessitem.
Creem un agent que recopilarà dades sobre valors i metainformació sobre ells.
Però això és el que farem per al projecte en si, i pel que fa a la recerca de Faust, aprendrem a escriure agents que processen esdeveniments de flux des de kafka, així com a escriure ordres (embolcall de clic), en el nostre cas: per als missatges push manuals al tema que l'agent està supervisant.
Entrenament
Client AlphaVantage
En primer lloc, escrivim un petit client aiohttp per a sol·licituds a alphaavantage.
L'API AlphaVantage està dissenyada de manera senzilla i bonica, així que vaig decidir fer totes les sol·licituds mitjançant el mètode construct_query on al seu torn hi ha una trucada http.
Porto tots els camps a snake_case per comoditat.
Bé, la decoració logger.catch per a una sortida de traçament bonica i informativa.
PS No us oblideu d'afegir localment el testimoni alphaavantage a config.yml o exportar la variable d'entorn HORTON_SERVICE_APIKEY. Rebem un testimoni aquí.
Classe CRUD
Tindrem una col·lecció de valors per emmagatzemar metainformació sobre valors.
De moment tindrem la creació de l'aplicació més senzilla, una mica més endavant l'ampliarem, però, per no fer-vos esperar, aquí referències a la classe d'aplicacions. També us recomano que feu una ullada a la classe de configuració, ja que és responsable de la majoria de la configuració.
La part principal
Agent de recollida i manteniment d'una llista de valors
Per tant, primer obtenim l'objecte d'aplicació faust: és bastant senzill. A continuació, declarem explícitament un tema per al nostre agent... Aquí val la pena esmentar què és, quin és el paràmetre intern i com es pot organitzar de manera diferent.
Temes en kafka, si volem saber la definició exacta, millor llegir apagat. document, o pots llegir compendi a Habré en rus, on tot es reflecteix també amb força precisió :)
Paràmetre intern, descrit força bé al document de faust, ens permet configurar el tema directament al codi, és clar, això vol dir els paràmetres proporcionats pels desenvolupadors de faust, per exemple: retenció, política de retenció (per defecte esborra, però podeu configurar compacte), nombre de particions per tema (puntuacionsfer, per exemple, menys de importància global aplicacions faust).
En general, l'agent pot crear un tema gestionat amb valors globals, però m'agrada declarar-ho tot explícitament. A més, no es poden configurar alguns paràmetres (per exemple, el nombre de particions o la política de retenció) del tema de l'anunci de l'agent.
A continuació es mostra com podria semblar sense definir manualment el tema:
Bé, ara anem a descriure què farà el nostre agent :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Així doncs, a l'inici de l'agent, obrim una sessió aiohttp per a sol·licituds a través del nostre client. Així, quan s'inicia un treballador, quan s'inicia el nostre agent, immediatament s'obrirà una sessió: una, durant tot el temps que el treballador està en execució (o diverses, si canvieu el paràmetre). simultaneïtat d'un agent amb una unitat per defecte).
A continuació, seguim el flux (hi col·loquem el missatge _, ja que a nosaltres, en aquest agent, no ens importa el contingut) dels missatges del nostre tema, si existeixen al desplaçament actual, en cas contrari el nostre cicle esperarà la seva arribada. Bé, dins del nostre bucle, registrem la recepció del missatge, obtenim una llista de valors actius (get_securities retorna només actius per defecte, vegeu el codi del client) i el desem a la base de dades, comprovant si hi ha un títol amb el mateix ticker i intercanvi a la base de dades, si n'hi ha, simplement s'actualitzarà (el paper).
Llencem la nostra creació!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Característiques de PS component web No consideraré faust als articles, així que posem la bandera adequada.
A la nostra comanda de llançament, vam dir a Faust on buscar l'objecte de l'aplicació i què fer-hi (llançar un treballador) amb el nivell de sortida del registre d'informació. Obtenim la següent sortida:
Vegem el conjunt de particions. Com podem veure, es va crear un tema amb el nom que hem designat al codi, el nombre de particions per defecte (8, extret de particions_tema - paràmetre d'objecte d'aplicació), ja que no hem especificat un valor individual per al nostre tema (mitjançant particions). A l'agent llançat al treballador se li assignen les 8 particions, ja que és l'única, però això es tractarà amb més detall a la part sobre agrupació.
Bé, ara podem anar a una altra finestra de terminal i enviar un missatge buit al nostre tema:
PS utilitzant @ mostrem que estem enviant un missatge a un tema anomenat "collect_securities".
En aquest cas, el missatge va anar a la partició 6; podeu comprovar-ho si aneu a kafdrop on localhost:9000
Anant a la finestra del terminal amb el nostre treballador, veurem un missatge feliç enviat mitjançant loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
També podem mirar a mongo (utilitzant Robo3T o Studio3T) i veure que els valors es troben a la base de dades:
No sóc multimilionari i, per tant, ens conformem amb la primera opció de visualització.
Felicitat i alegria: el primer agent està preparat :)
Agent preparat, visca el nou agent!
Sí, senyors, només hem recorregut 1/3 del camí que ens ha preparat aquest article, però no us desanimau, perquè ara serà més fàcil.
Per tant, ara necessitem un agent que reculli metainformació i la introdueixi en un document de recollida:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Com que aquest agent processarà informació sobre una seguretat concreta, hem d'indicar el ticker (símbol) d'aquesta seguretat al missatge. Per a això en faust n'hi ha arxius — classes que declaren l'esquema de missatges al tema de l'agent.
En aquest cas, anem a records.pyi descriu com hauria de ser el missatge d'aquest tema:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Com haureu endevinat, faust utilitza l'anotació de tipus Python per descriure l'esquema del missatge, per això la versió mínima admesa per la biblioteca és 3.6.
Com podeu veure, passem un nou paràmetre amb un esquema al mètode d'inicialització del tema - value_type. A més, tot segueix el mateix esquema, així que no veig cap sentit a detenir-me en res més.
Bé, el toc final és afegir una trucada a l'agent de recollida d'informació meta a collect_securitites:
Utilitzem l'esquema anunciat anteriorment per al missatge. En aquest cas, he utilitzat el mètode .cast ja que no hem d'esperar el resultat de l'agent, però val la pena esmentar que maneres envia un missatge al tema:
cast: no bloqueja perquè no espera un resultat. No podeu enviar el resultat a un altre tema com a missatge.
enviar - no bloqueja perquè no espera un resultat. Podeu especificar un agent al tema al qual anirà el resultat.
preguntar - espera un resultat. Podeu especificar un agent al tema al qual anirà el resultat.
Així doncs, això és tot amb els agents d'avui!
L'equip dels somnis
L'últim que vaig prometre escriure en aquesta part són les ordres. Com s'ha esmentat anteriorment, les ordres de faust són un embolcall al voltant del clic. De fet, faust simplement adjunta la nostra ordre personalitzada a la seva interfície quan especifica la clau -A
Després dels agents anunciats agents.py afegir una funció amb un decorador app.commandcridant el mètode emetre у cobrar_valors:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Així, si cridem a la llista d'ordres, la nostra nova ordre hi estarà:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Podem utilitzar-lo com qualsevol altra persona, així que reiniciem el treballador de faust i comencem una col·lecció completa de valors:
> faust -A horton.agents start-collect-securities
Què passarà després?
A la següent part, prenent com a exemple la resta d'agents, considerarem el mecanisme d'amortització per a la recerca d'extrems en els preus de tancament de la negociació de l'any i el llançament cron d'agents.