Tarefas básicas sobre Fausto, Parte I: Introdução

Tarefas básicas sobre Fausto, Parte I: Introdução

Como vim viver assim?

Há pouco tempo tive que trabalhar no backend de um projeto altamente carregado, no qual era necessário organizar a execução regular de um grande número de tarefas em segundo plano com cálculos complexos e solicitações de serviços de terceiros. O projeto é assíncrono e antes de eu chegar, ele tinha um mecanismo simples para iniciar tarefas do cron: um loop verificando a hora atual e lançando grupos de corrotinas via coleta - essa abordagem acabou sendo aceitável até que houvesse dezenas e centenas dessas corrotinas , porém, quando seu número ultrapassou dois mil, tive que pensar em organizar uma fila normal de tarefas com um corretor, vários trabalhadores e assim por diante.

Primeiro decidi experimentar o Aipo, que já tinha usado antes. Devido à natureza assíncrona do projeto, mergulhei na questão e vi статьюBem como projeto, criado pelo autor do artigo.

Direi uma coisa, o projeto é muito interessante e funciona com bastante sucesso em outras aplicações de nossa equipe, e o próprio autor diz que conseguiu implementá-lo em produção usando um pool assíncrono. Mas, infelizmente, isso realmente não me agradou, como descobri problema com lançamento em grupo de tarefas (ver. grupo). No momento em que escrevo emitem já está fechado, porém, as obras já duram um mês. De qualquer forma, boa sorte ao autor e tudo de bom, pois já existem coisas funcionando na lib... em geral, a questão está em mim e a ferramenta acabou ficando úmida para mim. Além disso, algumas tarefas tinham de 2 a 3 solicitações HTTP para serviços diferentes, portanto, mesmo na otimização de tarefas, criamos 4 mil conexões TCP, aproximadamente a cada 2 horas - não muito bom... Gostaria de criar uma sessão para um tipo de tarefa ao iniciar trabalhadores. Um pouco mais sobre o grande número de solicitações via aiohttp aqui.

Nesse sentido, comecei a procurar alternativas e encontrei! Os criadores do aipo, especificamente, pelo que entendi Pergunte a Solem, foi criado Faust, originalmente para o projeto robinhood. Faust se inspira no Kafka Streams e trabalha com Kafka como corretor, o rocksdb também é usado para armazenar resultados do trabalho dos agentes, e o mais importante é que a biblioteca é assíncrona.

Além disso, você pode olhar comparação rápida aipo e fausto dos criadores deste último: suas diferenças, diferenças entre corretores, implementação de uma tarefa elementar. Tudo é bastante simples, porém, um recurso bacana do faust chama a atenção - dados digitados para transmissão ao tema.

O que nós fazemos?

Então, em uma pequena série de artigos, mostrarei como coletar dados de tarefas em segundo plano usando o Faust. A fonte do nosso projeto de exemplo será, como o nome sugere, alphavantage.co. Demonstrarei como escrever agentes (sink, tópicos, partições), como fazer execução regular (cron), os comandos faust cli mais convenientes (um wrapper sobre clique), clustering simples e, no final, anexaremos um datadog ( trabalhando fora da caixa) e tente ver algo. Para armazenar os dados coletados usaremos mongodb e motor para conexão.

PS A julgar pela confiança com que foi escrito o ponto sobre monitoramento, acho que o leitor do final do último artigo ainda ficará mais ou menos assim:

Tarefas básicas sobre Fausto, Parte I: Introdução

Requisitos do projeto

Pelo que já prometi, vamos fazer uma pequena lista do que o serviço deve ser capaz de fazer:

  1. Carregue títulos e uma visão geral deles (incluindo lucros e perdas, balanço patrimonial, fluxo de caixa - do último ano) - regularmente
  2. Carregue dados históricos (para cada ano de negociação, encontre valores extremos do preço de fechamento da negociação) - regularmente
  3. Carregue os dados comerciais mais recentes - regularmente
  4. Carregue regularmente uma lista personalizada de indicadores para cada título

Como esperado, escolhemos um nome para o projeto do zero: horton

Estamos preparando a infraestrutura

O título é certamente forte, no entanto, tudo o que você precisa fazer é escrever uma pequena configuração para docker-compose com kafka (e zookeeper - em um contêiner), kafdrop (se quisermos ver as mensagens nos tópicos), mongodb. Nós temos [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) do seguinte formato:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Não há nada complicado aqui. Dois ouvintes foram declarados para o kafka: um (interno) para uso dentro da rede composta e o segundo (externo) para solicitações externas, para que eles o encaminhassem para fora. 2181 — porto do zookeeper. O resto, penso eu, está claro.

Preparando o esqueleto do projeto

Na versão básica, a estrutura do nosso projeto deve ficar assim:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Tudo que eu anotei Ainda não tocamos nisso, apenas criamos arquivos vazios.**

Criamos uma estrutura. Agora vamos adicionar as dependências necessárias, escrever a configuração e conectar ao mongodb. Não fornecerei o texto completo dos arquivos do artigo, para não atrasá-lo, mas fornecerei links para as versões necessárias.

Vamos começar com dependências e meta sobre o projeto - pyproject.toml

A seguir, começamos a instalar dependências e criar um virtualenv (ou você mesmo pode criar a pasta venv e ativar o ambiente):

pip3 install poetry (если ещё не установлено)
poetry install

Agora vamos criar configuração.yml - Credenciais e onde bater. Você pode colocar imediatamente os dados do Alphavantage lá. Bem, vamos passar para config.py — extraia dados para o aplicativo de nossa configuração. Sim, confesso, usei minha lib - sitri.

Ao conectar-se ao Mongo, tudo é bastante simples. anunciado classe de cliente para conectar e classe básica para cruds, para facilitar a consulta de coleções.

O que vai acontecer a seguir?

O artigo não é muito longo, pois aqui estou falando apenas de motivação e preparação, então não me culpe – prometo que a próxima parte terá ação e gráficos.

Então, nesta próxima parte nós:

  1. Vamos escrever um pequeno cliente para alphavantage em aiohttp com solicitações para os endpoints que precisamos.
  2. Vamos criar um agente que irá coletar dados sobre títulos e preços históricos para eles.

Código do projeto

Código para esta parte

Fonte: habr.com

Adicionar um comentário