Tarefas de fondo sobre Fausto, Parte I: Introdución

Tarefas de fondo sobre Fausto, Parte I: Introdución

Como cheguei a vivir así?

Non hai moito tiven que traballar no backend dun proxecto moi cargado, no que era necesario organizar a execución regular dunha gran cantidade de tarefas en segundo plano con cálculos complexos e solicitudes de servizos de terceiros. O proxecto é asíncrono e antes de chegar eu tiña un mecanismo sinxelo para as tarefas de lanzamento de crons: un bucle que verificaba a hora actual e lanzaba grupos de corrutinas a través de gather; este enfoque resultou ser aceptable ata que houbo decenas e centos de tales corrutinas. , porén, cando o seu número superaba os dous mil, tiven que pensar en organizar unha cola de tarefas normal cun corredor, varios traballadores, etc.

Primeiro decidín probar o apio, que xa usara antes. Debido á natureza asíncrona do proxecto, mergulleime na pregunta e vin un artigo, así como proxecto, creado polo autor do artigo.

Vou dicir isto, o proxecto é moi interesante e funciona con bastante éxito noutras aplicacións do noso equipo, e o propio autor di que foi capaz de poñelo en produción mediante un pool asíncrono. Pero, por desgraza, non me convén moito, como resultou un problema co lanzamento grupal de tarefas (ver. grupo). No momento de escribir cuestión xa está pechado, con todo, os traballos levan un mes. En todo caso, moita sorte ao autor e todo o mellor, xa que xa hai cousas traballando na lib... en xeral, o punto está en min e a ferramenta resultoume húmida. Ademais, algunhas tarefas tiñan 2-3 solicitudes http a diferentes servizos, polo que mesmo ao optimizar tarefas, creamos 4 mil conexións TCP, aproximadamente cada 2 horas - non moi boa... Gustaríame crear unha sesión para un tipo de tarefa ao iniciar os traballadores. Un pouco máis sobre a gran cantidade de solicitudes a través de aiohttp aquí.

Neste sentido, empecei a buscar alternativas e atopouno! Os creadores do apio, en concreto, tal e como eu o entendo Pregúntalle a Solem, foi creado Faust, orixinalmente para o proxecto Robin Hood. Faust está inspirado en Kafka Streams e traballa con Kafka como intermediario, rocksdb tamén se usa para almacenar resultados do traballo dos axentes, e o máis importante é que a biblioteca sexa asíncrona.

Ademais, podes mirar comparación rápida apio e fausto dos creadores deste último: as súas diferenzas, diferenzas entre corredores, execución dunha tarefa elemental. Todo é bastante sinxelo, con todo, unha característica agradable en Faust chama a atención: datos escritos para transmitir ao tema.

Que facemos?

Entón, nunha pequena serie de artigos mostrarei como recompilar datos de tarefas en segundo plano usando Faust. A fonte do noso proxecto de exemplo será, como o nome indica, alphaavantage.co. Demostrarei como escribir axentes (sink, topics, particións), como facer a execución regular (cron), os comandos faust cli máis convenientes (un envoltorio sobre o clic), agrupación sinxela e, ao final, achegaremos un datadog ( traballando fóra da caixa) e intenta algo para ver. Para almacenar os datos recollidos usaremos mongodb e motor para a conexión.

PD A xulgar pola confianza coa que se escribiu o punto sobre a vixilancia, creo que o lector do final do último artigo aínda terá un aspecto así:

Tarefas de fondo sobre Fausto, Parte I: Introdución

Requisitos do proxecto

Debido ao feito de que xa o prometín, fagamos unha pequena lista do que debería ser capaz de facer o servizo:

  1. Cargue títulos e unha visión xeral deles (incluídos beneficios e perdas, balance, fluxo de caixa - do último ano) - regularmente
  2. Cargue datos históricos (para cada ano de negociación, busque valores extremos do prezo de peche da negociación) - regularmente
  3. Carga os datos comerciais máis recentes, regularmente
  4. Carga regularmente unha lista personalizada de indicadores para cada seguridade

Como era de esperar, escollemos un nome para o proxecto desde cero: Horton

Estamos preparando a infraestrutura

O título é certamente forte, non obstante, todo o que tes que facer é escribir unha pequena configuración para docker-compose con kafka (e zookeeper - nun recipiente), kafdrop (se queremos ver as mensaxes nos temas), mongodb. Obtemos [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) da seguinte forma:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Aquí non hai nada complicado. Dous oíntes foron declarados para kafka: un (interno) para o seu uso dentro da rede composta e o segundo (externo) para solicitudes de fóra, polo que o reenviaron fóra. 2181 - porto zookeeper. O resto, creo que está claro.

Elaboración do esqueleto do proxecto

Na versión básica, a estrutura do noso proxecto debería verse así:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Todo o que notei Aínda non o tocamos, só creamos ficheiros baleiros.**

Creamos unha estrutura. Agora imos engadir as dependencias necesarias, escribir a configuración e conectarse a mongodb. Non facilitarei o texto completo dos ficheiros do artigo, para non demoralo, pero facilitarei ligazóns ás versións necesarias.

Imos comezar coas dependencias e meta sobre o proxecto - pyproject.toml

A continuación, comezamos a instalar dependencias e a crear un virtualenv (ou podes crear ti mesmo o cartafol venv e activar o entorno):

pip3 install poetry (если ещё не установлено)
poetry install

Agora imos crear config.yml - Credenciais e onde chamar. Podes colocar inmediatamente os datos para alphaavantage alí. Ben, pasemos a config.py — extraer datos para a aplicación da nosa configuración. Si, confeso que usei a miña biblioteca... sitri.

Cando se conecta a Mongo, todo é bastante sinxelo. anunciado clase cliente para conectar e clase base para cruds, para facilitar a consulta sobre coleccións.

Que pasará despois?

O artigo non é moi longo, xa que aquí só falo de motivación e preparación, así que non me culpes, prometo que a próxima parte terá acción e gráficos.

Entón, nesta próxima parte imos:

  1. Escribamos un pequeno cliente para alphaavantage en aiohttp con solicitudes para os puntos finais que necesitamos.
  2. Imos crear un axente que recompilará datos sobre valores e prezos históricos para eles.

Código do proxecto

Código para esta parte

Fonte: www.habr.com

Engadir un comentario