Então, então, a segunda parte. Conforme escrito anteriormente, nele faremos o seguinte:
Vamos escrever um pequeno cliente para alphavantage em aiohttp com solicitações para os endpoints que precisamos.
Vamos criar um agente que coletará dados sobre títulos e metainformações sobre eles.
Mas, isso é o que faremos para o projeto em si, e em termos de pesquisa rápida, aprenderemos como escrever agentes que processam eventos de fluxo do kafka, bem como escrever comandos (click wrapper), no nosso caso - para mensagens push manuais para o tópico que o agente está monitorando.
Treinamento
Cliente AlphaVantage
Primeiro, vamos escrever um pequeno cliente aiohttp para solicitações ao alphavantage.
A API AlphaVantage é bastante simples e lindamente projetada, então decidi fazer todas as solicitações através do método construct_query onde por sua vez há uma chamada http.
Eu trago todos os campos para snake_case Por conveniência.
Bem, a decoração logger.catch para uma saída de traceback bonita e informativa.
PS Não se esqueça de adicionar o token alphavantage localmente ao config.yml ou exportar a variável de ambiente HORTON_SERVICE_APIKEY. Recebemos um token aqui.
Classe CRUD
Teremos uma coleção de títulos para armazenar metainformações sobre títulos.
Por enquanto teremos a criação de aplicativos mais simples, um pouco mais tarde iremos expandi-la, porém, para não deixar vocês esperando, aqui referências para a classe App. Recomendo também dar uma olhada na classe de configurações, pois ela é responsável pela maior parte das configurações.
Parte principal
Agente para coleta e manutenção de uma lista de títulos
Então, primeiro obtemos o objeto de aplicação fausto - é bastante simples. A seguir, declaramos explicitamente um tópico para nosso agente... Aqui vale a pena mencionar o que é, qual é o parâmetro interno e como isso pode ser organizado de forma diferente.
Tópicos em kafka, se quisermos saber a definição exata, é melhor ler desligado. documento, ou você pode ler abstrato no Habré em russo, onde tudo também é refletido com bastante precisão :)
Parâmetro interno, descrito muito bem no documento do faust, nos permite configurar o tópico diretamente no código, claro, isso significa os parâmetros fornecidos pelos desenvolvedores do faust, por exemplo: retenção, política de retenção (por padrão, exclua, mas você pode definir compacto), número de partições por tópico (pontuaçõesfazer, por exemplo, menos de significado global aplicações faust).
Em geral, o agente pode criar um tópico gerenciado com valores globais, porém gosto de declarar tudo explicitamente. Além disso, alguns parâmetros (por exemplo, o número de partições ou a política de retenção) do tópico no anúncio do agente não podem ser configurados.
Bem, agora vamos descrever o que nosso agente fará :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Assim, no início do agente, abrimos uma sessão aiohttp para solicitações através do nosso cliente. Assim, ao iniciar um trabalhador, quando nosso agente for lançado, uma sessão será imediatamente aberta - uma, durante todo o tempo em que o trabalhador estiver em execução (ou várias, se você alterar o parâmetro simultaneidade de um agente com uma unidade padrão).
A seguir, seguimos o fluxo (colocamos a mensagem em _, já que nós, neste agente, não nos importamos com o conteúdo) das mensagens do nosso tópico, se existirem no deslocamento atual, caso contrário nosso ciclo aguardará sua chegada. Pois bem, dentro do nosso loop, registramos o recebimento da mensagem, obtemos uma lista de títulos ativos (get_securities retorna apenas ativos por padrão, veja o código do cliente) títulos e salvamos no banco de dados, verificando se existe um título com o mesmo ticker e troca no banco de dados, se houver, então ele (o papel) será simplesmente atualizado.
Vamos lançar nossa criação!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Recursos PS componente web Não considerarei Faust nos artigos, por isso colocamos a bandeira apropriada.
Em nosso comando de inicialização, informamos ao Faust onde procurar o objeto do aplicativo e o que fazer com ele (iniciar um trabalhador) com o nível de saída do log de informações. Obtemos a seguinte saída:
Vejamos o conjunto de partições. Como podemos ver, foi criado um tópico com o nome que designamos no código, o número padrão de partições (8, retirado de topic_partitions - parâmetro do objeto de aplicação), pois não especificamos um valor individual para nosso tópico (via partições). O agente iniciado no trabalhador recebe todas as 8 partições, já que é o único, mas isso será discutido com mais detalhes na parte sobre clustering.
Bem, agora podemos ir para outra janela do terminal e enviar uma mensagem vazia para o nosso tópico:
PS usando @ mostramos que estamos enviando uma mensagem para um tópico denominado “collect_securities”.
Neste caso, a mensagem foi para a partição 6 - você pode verificar isso acessando o kafdrop em localhost:9000
Indo para a janela do terminal com nosso trabalhador, veremos uma mensagem feliz enviada usando loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Também podemos olhar no mongo (usando Robo3T ou Studio3T) e ver se os títulos estão no banco de dados:
Não sou bilionário e, portanto, estamos satisfeitos com a primeira opção de visualização.
Felicidade e alegria - o primeiro agente está pronto :)
Agente pronto, viva o novo agente!
Sim, senhores, percorremos apenas 1/3 do caminho preparado neste artigo, mas não desanimem, pois agora será mais fácil.
Então agora precisamos de um agente que colete metainformações e as coloque em um documento de coleta:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Como este agente irá processar informações sobre um título específico, precisamos indicar o ticker (símbolo) deste título na mensagem. Para este propósito em fausto existem GRAVAÇÕES — classes que declaram o esquema de mensagens no tópico do agente.
Neste caso, vamos para registros.pye descreva como deve ser a mensagem deste tópico:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Como você deve ter adivinhado, faust usa a anotação de tipo python para descrever o esquema da mensagem, e é por isso que a versão mínima suportada pela biblioteca é 3.6.
Vamos voltar ao agente, definir os tipos e adicioná-lo:
Como você pode ver, passamos um novo parâmetro com um esquema para o método de inicialização do tópico - value_type. Além disso, tudo segue o mesmo esquema, então não vejo sentido em insistir em mais nada.
Bem, o toque final é adicionar uma chamada ao agente de coleta de meta informações para collect_securitites:
Usamos o esquema anunciado anteriormente para a mensagem. Neste caso utilizei o método .cast pois não precisamos esperar o resultado do agente, mas vale ressaltar que maneiras envie uma mensagem para o tópico:
cast - não bloqueia porque não espera resultado. Você não pode enviar o resultado para outro tópico como mensagem.
send - não bloqueia porque não espera resultado. Você pode especificar um agente no tópico para o qual irá o resultado.
pergunte - espera por um resultado. Você pode especificar um agente no tópico para o qual irá o resultado.
Então, isso é tudo com os agentes por hoje!
A equipa de sonho
A última coisa que prometi escrever nesta parte são comandos. Como mencionado anteriormente, os comandos em faust são um wrapper em torno do clique. Na verdade, faust simplesmente anexa nosso comando personalizado à sua interface ao especificar a chave -A
Depois dos agentes anunciados em agentes.py adicione uma função com um decorador app.commandchamando o método casto у coletar_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Assim, se chamarmos a lista de comandos, nela estará nosso novo comando:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Podemos usá-lo como qualquer outra pessoa, então vamos reiniciar o faust trabalhador e começar uma coleção completa de títulos:
> faust -A horton.agents start-collect-securities
O que vai acontecer a seguir?
Na próxima parte, usando como exemplo os demais agentes, consideraremos o mecanismo sink para busca de extremos nos preços de fechamento das negociações do ano e no cron de lançamento dos agentes.