Así, así, a segunda parte. Como se escribiu anteriormente, nel faremos o seguinte:
Escribamos un pequeno cliente para alphaavantage en aiohttp con solicitudes para os puntos finais que necesitamos.
Imos crear un axente que recompilará datos sobre valores e metainformación sobre eles.
Pero isto é o que faremos polo propio proxecto e, en termos de investigación de faust, aprenderemos a escribir axentes que procesen eventos de fluxo de kafka, así como a escribir comandos (clic envoltorio), no noso caso - para mensaxes push manuais ao tema que está a supervisar o axente.
Adestramento
Cliente AlphaVantage
Primeiro, imos escribir un pequeno cliente aiohttp para solicitudes de alfavantage.
A API de AlphaVantage está deseñada de xeito bastante sinxelo e bonito, polo que decidín facer todas as solicitudes a través do método construct_query onde á súa vez hai unha chamada http.
Traio todos os campos a snake_case por comodidade.
Ben, a decoración logger.catch para unha saída de rastrexo fermosa e informativa.
PS Non esquezas engadir o token alphaavantage localmente a config.yml ou exportar a variable de ambiente HORTON_SERVICE_APIKEY. Recibimos unha ficha aquí.
Clase CRUD
Teremos unha colección de valores para almacenar metainformación sobre valores.
Polo de agora teremos a creación de aplicacións máis sinxelas, un pouco máis adiante ampliarémola, non obstante, para non facervos esperar, aquí referencias á clase de aplicación. Tamén che aconsello que botes un ollo á clase de configuración, xa que é responsable da maioría das opcións.
Corpo principal
Axente de recollida e mantemento dunha lista de valores
Entón, primeiro obtemos o obxecto da aplicación faust: é bastante sinxelo. A continuación, declaramos explícitamente un tema para o noso axente... Aquí paga a pena mencionar o que é, cal é o parámetro interno e como se pode organizar de forma diferente.
Temas en kafka, se queremos saber a definición exacta, é mellor ler apagado. documento, ou podes ler compendio en Habré en ruso, onde tamén se reflicte todo con bastante precisión :)
Parámetro interno, descrito bastante ben no documento de faust, permítenos configurar o tema directamente no código, por suposto, isto significa os parámetros proporcionados polos desenvolvedores de faust, por exemplo: retención, política de retención (por defecto eliminar, pero pode configurar compacto), número de particións por tema (puntuaciónsfacer, por exemplo, menos de importancia global aplicacións faust).
En xeral, o axente pode crear un tema xestionado con valores globais, non obstante, gústame declarar todo de forma explícita. Ademais, algúns parámetros (por exemplo, o número de particións ou a política de retención) do tema no anuncio do axente non se poden configurar.
Isto é o que pode parecer sen definir manualmente o tema:
Ben, agora imos describir o que fará o noso axente :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Entón, ao comezo do axente, abrimos unha sesión aiohttp para solicitudes a través do noso cliente. Así, ao iniciar un traballador, cando se inicie o noso axente, abrirase inmediatamente unha sesión: unha, durante todo o tempo que o traballador estea en execución (ou varias, se cambia o parámetro). concorrencia dun axente cunha unidade predeterminada).
A continuación, seguimos o fluxo (colocamos a mensaxe en _, xa que a nós, neste axente, non nos importa o contido) das mensaxes do noso tema, se existen na compensación actual, se non, o noso ciclo agardará a súa chegada. Pois ben, dentro do noso bucle, rexistramos a recepción da mensaxe, obtemos unha lista de valores activos (get_securities só devolve activos por defecto, consulta o código do cliente) e gardámolo na base de datos, comprobando se hai un valor co mesmo ticker e intercambio na base de datos , se o hai, entón (o papel) simplemente se actualizará.
Imos lanzar a nosa creación!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Características PS compoñente web Non vou considerar fausto nos artigos, polo que poñemos a bandeira adecuada.
No noso comando de lanzamento, dixémoslle a Faust onde buscar o obxecto da aplicación e que facer con el (iniciar un traballador) co nivel de saída do rexistro de información. Obtemos a seguinte saída:
Vexamos o conxunto de particións. Como podemos ver, creouse un tema co nome que designamos no código, o número de particións por defecto (8, tomado de particións_tema - parámetro de obxecto da aplicación), xa que non especificamos un valor individual para o noso tema (a través de particións). O axente iniciado no traballador ten asignadas as 8 particións, xa que é a única, pero isto comentarase con máis detalle na parte sobre a agrupación.
Ben, agora podemos ir a outra xanela de terminal e enviar unha mensaxe baleira ao noso tema:
PS usando @ mostramos que estamos enviando unha mensaxe a un tema chamado "coller_valores".
Neste caso, a mensaxe foi á partición 6; podes comprobalo indo a kafdrop on localhost:9000
Indo á xanela do terminal co noso traballador, veremos unha mensaxe feliz enviada usando loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Tamén podemos mirar en mongo (usando Robo3T ou Studio3T) e ver que os títulos están na base de datos:
Non son multimillonario e, polo tanto, conformámonos coa primeira opción de visualización.
Felicidade e alegría: o primeiro axente está listo :)
Axente listo, viva o novo axente!
Si, señores, só percorremos 1/3 do camiño que prepara este artigo, pero non vos desanimedes, porque agora será máis fácil.
Entón, agora necesitamos un axente que recolla metainformación e a poña nun documento de recollida:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Dado que este axente procesará información sobre unha seguridade específica, necesitamos indicar o ticker (símbolo) desta seguridade na mensaxe. Para este fin en faust hai Rexistros — clases que declaran o esquema de mensaxes no tema do axente.
Neste caso, imos a rexistros.pye describe como debería ser a mensaxe deste tema:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Como poderías ter adiviñado, faust usa a anotación de tipo Python para describir o esquema da mensaxe, polo que a versión mínima admitida pola biblioteca é 3.6.
Volvamos ao axente, establecemos os tipos e engadímolo:
Como podes ver, pasamos un novo parámetro cun esquema ao método de inicialización do tema - value_type. Ademais, todo segue o mesmo esquema, polo que non vexo ningún sentido a determe noutra cousa.
Ben, o toque final é engadir unha chamada ao axente de recollida de metainformación para collect_securitites:
Usamos o esquema anunciado previamente para a mensaxe. Neste caso, usei o método .cast xa que non necesitamos esperar o resultado do axente, pero vale a pena mencionar que xeitos enviar unha mensaxe ao tema:
cast - non bloquea porque non espera un resultado. Non pode enviar o resultado a outro tema como mensaxe.
enviar: non bloquea porque non espera un resultado. Podes especificar un axente no tema ao que irá o resultado.
preguntar - agarda un resultado. Podes especificar un axente no tema ao que irá o resultado.
Entón, iso é todo cos axentes para hoxe!
O Dream Team
O último que prometín escribir nesta parte son os comandos. Como se mencionou anteriormente, os comandos en faust son un envoltorio ao redor do clic. De feito, faust simplemente anexa o noso comando personalizado á súa interface cando especifica a tecla -A
Despois dos axentes anunciados en axentes.py engadir unha función cun decorador aplicación.comandochamando ao método publicar у recoller_valores:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Así, se chamamos á lista de comandos, o noso novo comando estará nela:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Podemos usalo como calquera outra persoa, así que reiniciemos o faust worker e comecemos unha colección completa de valores:
> faust -A horton.agents start-collect-securities
Que pasará despois?
Na seguinte parte, tomando como exemplo os restantes axentes, consideraremos o mecanismo de sumidoiro para buscar extremos nos prezos de peche de negociación do ano e o lanzamento cron dos axentes.