Tareas previas sobre Fausto, Parte I: Introducción

Tareas previas sobre Fausto, Parte I: Introducción

¿Cómo llegué a vivir así?

No hace mucho tuve que trabajar en el backend de un proyecto muy cargado, en el que era necesario organizar la ejecución regular de una gran cantidad de tareas en segundo plano con cálculos complejos y solicitudes de servicios de terceros. El proyecto es asincrónico y antes de mi llegada, tenía un mecanismo simple para ejecutar tareas de cron: un bucle que verifica la hora actual y lanza grupos de corrutinas a través de recopilación; este enfoque resultó aceptable hasta que hubo docenas y cientos de tales corrutinas. Sin embargo, cuando su número superó los dos mil, tuve que pensar en organizar una cola de tareas normal con un intermediario, varios trabajadores, etc.

Primero decidí probar el apio, que ya había usado antes. Debido a la naturaleza asincrónica del proyecto, me sumergí en la pregunta y vi ArtículoAsí como proyecto, creado por el autor del artículo.

Diré esto, el proyecto es muy interesante y funciona con bastante éxito en otras aplicaciones de nuestro equipo, y el propio autor dice que pudo implementarlo en producción utilizando un grupo asíncrono. Pero, lamentablemente, resultó que no me convenía mucho. problema con lanzamiento grupal de tareas (ver. grupo de XNUMX). En el momento de escribir ya está cerrado, sin embargo, las obras llevan un mes en marcha. En cualquier caso, mucha suerte al autor y todo lo mejor, ya que ya hay cosas funcionando en la biblioteca... en general, el punto está en mí y la herramienta me resultó húmeda. Además, algunas tareas tenían 2-3 solicitudes http a diferentes servicios, por lo que incluso al optimizar las tareas, creamos 4 mil conexiones TCP, aproximadamente cada 2 horas, lo que no es muy bueno... Me gustaría crear una sesión para un tipo de tarea al iniciar trabajadores. Un poco más sobre la gran cantidad de solicitudes vía aiohttp aquí.

En este sentido, comencé a buscar альтернативы ¡y lo encontré! Los creadores del apio, concretamente, según tengo entendido. Pregúntale a Solem, fue creado Faust, originalmente para el proyecto robinhood. Faust está inspirado en Kafka Streams y trabaja con Kafka como intermediario, rocksdb también se utiliza para almacenar resultados del trabajo de los agentes y lo más importante es que la biblioteca es asincrónica.

Además, puedes mirar comparación rápida apio y fausto de los creadores de este último: sus diferencias, diferencias entre corredores, implementación de una tarea elemental. Todo es bastante simple, sin embargo, llama la atención una característica interesante de Fausto: los datos escritos para transmitirlos al tema.

¿Qué vamos a hacer?

Entonces, en una breve serie de artículos, le mostraré cómo recopilar datos de tareas en segundo plano usando Faust. La fuente de nuestro proyecto de ejemplo será, como sugiere el nombre, alfavantage.co. Demostraré cómo escribir agentes (sumidero, temas, particiones), cómo realizar una ejecución regular (cron), los comandos faust cli más convenientes (un contenedor sobre un clic), agrupación simple y, al final, adjuntaremos un datadog ( trabajando fuera de la caja) e intente ver algo. Para almacenar los datos recopilados usaremos mongodb y motor para la conexión.

PD: A juzgar por la confianza con la que se escribió el punto sobre el monitoreo, creo que el lector al final del último artículo todavía se verá así:

Tareas previas sobre Fausto, Parte I: Introducción

Requerimientos del proyecto

Debido a que ya lo prometí, hagamos una pequeña lista de lo que el servicio debería poder hacer:

  1. Cargue valores y una descripción general de ellos (incluidas ganancias y pérdidas, balance, flujo de caja, durante el último año), con regularidad
  2. Cargue datos históricos (para cada año comercial, encuentre valores extremos del precio de cierre de la negociación) - regularmente
  3. Cargue los datos comerciales más recientes, periódicamente
  4. Cargue una lista personalizada de indicadores para cada valor, periódicamente

Como era de esperar, elegimos un nombre para el proyecto desde cero: Horton

Estamos preparando la infraestructura.

El título es ciertamente sólido, sin embargo, todo lo que necesitas hacer es escribir una pequeña configuración para docker-compose con kafka (y zookeeper - en un contenedor), kafdrop (si queremos ver mensajes en temas), mongodb. Obtenemos [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) de la siguiente forma:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Aquí no hay nada complicado en absoluto. Se declararon dos oyentes para Kafka: uno (interno) para uso dentro de la red compuesta y el segundo (externo) para solicitudes desde afuera, por lo que lo reenviaron al exterior. 2181 - puerto de cuidadores del zoológico. El resto, creo, está claro.

Preparando el esqueleto del proyecto.

En la versión básica, la estructura de nuestro proyecto debería verse así:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Todo lo que noté No lo tocamos todavía, simplemente creamos archivos vacíos.**

Creamos una estructura. Ahora agreguemos las dependencias necesarias, escribamos la configuración y nos conectemos a mongodb. No proporcionaré el texto completo de los archivos en el artículo para no retrasarlo, pero proporcionaré enlaces a las versiones necesarias.

Comencemos con las dependencias y meta sobre el proyecto: pyproject.toml

A continuación, comenzamos a instalar dependencias y a crear un virtualenv (o puede crear la carpeta venv usted mismo y activar el entorno):

pip3 install poetry (если ещё не установлено)
poetry install

Ahora vamos a crear configuración.yml - Credenciales y dónde llamar. Allí podrá colocar inmediatamente datos para alphavantage. Bueno, pasemos a configuración.py — extraer datos para la aplicación desde nuestra configuración. Sí, lo confieso, usé mi lib... sitri.

Al conectarse a Mongo, todo es bastante sencillo. Anunciado clase de cliente para conectar y clase base para cruds, para facilitar la realización de consultas sobre cobros.

¿Qué pasará después?

El artículo no es muy largo, ya que aquí sólo hablo de motivación y preparación, así que no me culpen: prometo que la siguiente parte tendrá acción y gráficos.

Entonces, en la siguiente parte nosotros:

  1. Escribamos un pequeño cliente para alphavantage en aiohttp con solicitudes para los puntos finales que necesitamos.
  2. Creemos un agente que recopile datos sobre valores y precios históricos de los mismos.

Código de proyecto

Código para esta parte

Fuente: habr.com

Añadir un comentario