Tarefas em segundo plano no Faust, Parte II: Agentes e equipes

Tarefas em segundo plano no Faust, Parte II: Agentes e equipes

Índice analítico

  1. Parte I: Introdução

  2. Parte II: Agentes e Equipes

o que você está fazendo aqui?

Então, então, a segunda parte. Conforme escrito anteriormente, nele faremos o seguinte:

  1. Vamos escrever um pequeno cliente para alphavantage em aiohttp com solicitações para os endpoints que precisamos.

  2. Vamos criar um agente que coletará dados sobre títulos e metainformações sobre eles.

Mas, isso é o que faremos para o projeto em si, e em termos de pesquisa rápida, aprenderemos como escrever agentes que processam eventos de fluxo do kafka, bem como escrever comandos (click wrapper), no nosso caso - para mensagens push manuais para o tópico que o agente está monitorando.

Treinamento

Cliente AlphaVantage

Primeiro, vamos escrever um pequeno cliente aiohttp para solicitações ao alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Na verdade, tudo fica claro:

  1. A API AlphaVantage é bastante simples e lindamente projetada, então decidi fazer todas as solicitações através do método construct_query onde por sua vez há uma chamada http.

  2. Eu trago todos os campos para snake_case Por conveniência.

  3. Bem, a decoração logger.catch para uma saída de traceback bonita e informativa.

PS Não se esqueça de adicionar o token alphavantage localmente ao config.yml ou exportar a variável de ambiente HORTON_SERVICE_APIKEY. Recebemos um token aqui.

Classe CRUD

Teremos uma coleção de títulos para armazenar metainformações sobre títulos.

banco de dados/segurança.py

Na minha opinião, não há necessidade de explicar nada aqui, e a classe base em si é bastante simples.

adquirir aplicativo()

Vamos adicionar uma função para criar um objeto de aplicativo em aplicativo.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Por enquanto teremos a criação de aplicativos mais simples, um pouco mais tarde iremos expandi-la, porém, para não deixar vocês esperando, aqui referências para a classe App. Recomendo também dar uma olhada na classe de configurações, pois ela é responsável pela maior parte das configurações.

Parte principal

Agente para coleta e manutenção de uma lista de títulos

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Então, primeiro obtemos o objeto de aplicação fausto - é bastante simples. A seguir, declaramos explicitamente um tópico para nosso agente... Aqui vale a pena mencionar o que é, qual é o parâmetro interno e como isso pode ser organizado de forma diferente.

  1. Tópicos em kafka, se quisermos saber a definição exata, é melhor ler desligado. documento, ou você pode ler abstrato no Habré em russo, onde tudo também é refletido com bastante precisão :)

  2. Parâmetro interno, descrito muito bem no documento do faust, nos permite configurar o tópico diretamente no código, claro, isso significa os parâmetros fornecidos pelos desenvolvedores do faust, por exemplo: retenção, política de retenção (por padrão, exclua, mas você pode definir compacto), número de partições por tópico (pontuaçõesfazer, por exemplo, menos de significado global aplicações faust).

  3. Em geral, o agente pode criar um tópico gerenciado com valores globais, porém gosto de declarar tudo explicitamente. Além disso, alguns parâmetros (por exemplo, o número de partições ou a política de retenção) do tópico no anúncio do agente não podem ser configurados.

    Veja como seria sem definir manualmente o tópico:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bem, agora vamos descrever o que nosso agente fará :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Assim, no início do agente, abrimos uma sessão aiohttp para solicitações através do nosso cliente. Assim, ao iniciar um trabalhador, quando nosso agente for lançado, uma sessão será imediatamente aberta - uma, durante todo o tempo em que o trabalhador estiver em execução (ou várias, se você alterar o parâmetro simultaneidade de um agente com uma unidade padrão).

A seguir, seguimos o fluxo (colocamos a mensagem em _, já que nós, neste agente, não nos importamos com o conteúdo) das mensagens do nosso tópico, se existirem no deslocamento atual, caso contrário nosso ciclo aguardará sua chegada. Pois bem, dentro do nosso loop, registramos o recebimento da mensagem, obtemos uma lista de títulos ativos (get_securities retorna apenas ativos por padrão, veja o código do cliente) títulos e salvamos no banco de dados, verificando se existe um título com o mesmo ticker e troca no banco de dados, se houver, então ele (o papel) será simplesmente atualizado.

Vamos lançar nossa criação!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Recursos PS componente web Não considerarei Faust nos artigos, por isso colocamos a bandeira apropriada.

Em nosso comando de inicialização, informamos ao Faust onde procurar o objeto do aplicativo e o que fazer com ele (iniciar um trabalhador) com o nível de saída do log de informações. Obtemos a seguinte saída:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Está vivo!!!

Vejamos o conjunto de partições. Como podemos ver, foi criado um tópico com o nome que designamos no código, o número padrão de partições (8, retirado de topic_partitions - parâmetro do objeto de aplicação), pois não especificamos um valor individual para nosso tópico (via partições). O agente iniciado no trabalhador recebe todas as 8 partições, já que é o único, mas isso será discutido com mais detalhes na parte sobre clustering.

Bem, agora podemos ir para outra janela do terminal e enviar uma mensagem vazia para o nosso tópico:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usando @ mostramos que estamos enviando uma mensagem para um tópico denominado “collect_securities”.

Neste caso, a mensagem foi para a partição 6 - você pode verificar isso acessando o kafdrop em localhost:9000

Indo para a janela do terminal com nosso trabalhador, veremos uma mensagem feliz enviada usando loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Também podemos olhar no mongo (usando Robo3T ou Studio3T) e ver se os títulos estão no banco de dados:

Não sou bilionário e, portanto, estamos satisfeitos com a primeira opção de visualização.

Tarefas em segundo plano no Faust, Parte II: Agentes e equipesTarefas em segundo plano no Faust, Parte II: Agentes e equipes

Felicidade e alegria - o primeiro agente está pronto :)

Agente pronto, viva o novo agente!

Sim, senhores, percorremos apenas 1/3 do caminho preparado neste artigo, mas não desanimem, pois agora será mais fácil.

Então agora precisamos de um agente que colete metainformações e as coloque em um documento de coleta:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Como este agente irá processar informações sobre um título específico, precisamos indicar o ticker (símbolo) deste título na mensagem. Para este propósito em fausto existem GRAVAÇÕES — classes que declaram o esquema de mensagens no tópico do agente.

Neste caso, vamos para registros.py e descreva como deve ser a mensagem deste tópico:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Como você deve ter adivinhado, faust usa a anotação de tipo python para descrever o esquema da mensagem, e é por isso que a versão mínima suportada pela biblioteca é 3.6.

Vamos voltar ao agente, definir os tipos e adicioná-lo:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Como você pode ver, passamos um novo parâmetro com um esquema para o método de inicialização do tópico - value_type. Além disso, tudo segue o mesmo esquema, então não vejo sentido em insistir em mais nada.

Bem, o toque final é adicionar uma chamada ao agente de coleta de meta informações para collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Usamos o esquema anunciado anteriormente para a mensagem. Neste caso utilizei o método .cast pois não precisamos esperar o resultado do agente, mas vale ressaltar que maneiras envie uma mensagem para o tópico:

  1. cast - não bloqueia porque não espera resultado. Você não pode enviar o resultado para outro tópico como mensagem.

  2. send - não bloqueia porque não espera resultado. Você pode especificar um agente no tópico para o qual irá o resultado.

  3. pergunte - espera por um resultado. Você pode especificar um agente no tópico para o qual irá o resultado.

Então, isso é tudo com os agentes por hoje!

A equipa de sonho

A última coisa que prometi escrever nesta parte são comandos. Como mencionado anteriormente, os comandos em faust são um wrapper em torno do clique. Na verdade, faust simplesmente anexa nosso comando personalizado à sua interface ao especificar a chave -A

Depois dos agentes anunciados em agentes.py adicione uma função com um decorador app.commandchamando o método casto у coletar_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Assim, se chamarmos a lista de comandos, nela estará nosso novo comando:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Podemos usá-lo como qualquer outra pessoa, então vamos reiniciar o faust trabalhador e começar uma coleção completa de títulos:

> faust -A horton.agents start-collect-securities

O que vai acontecer a seguir?

Na próxima parte, usando como exemplo os demais agentes, consideraremos o mecanismo sink para busca de extremos nos preços de fechamento das negociações do ano e no cron de lançamento dos agentes.

Isso é tudo por hoje! Obrigado por ler :)

Código para esta parte

Tarefas em segundo plano no Faust, Parte II: Agentes e equipes

PS Na última parte me perguntaram sobre fausto e kafka confluente (quais recursos o confluente possui?). Parece que o confluent é mais funcional em muitos aspectos, mas o fato é que o faust não tem suporte completo ao cliente para o confluent - isso decorre de descrições das restrições do cliente no documento.

Fonte: habr.com

Adicionar um comentário