¿Cómo llegué a vivir así?
No hace mucho tuve que trabajar en el backend de un proyecto muy cargado, en el que era necesario organizar la ejecución regular de una gran cantidad de tareas en segundo plano con cálculos complejos y solicitudes de servicios de terceros. El proyecto es asincrónico y antes de mi llegada, tenía un mecanismo simple para ejecutar tareas de cron: un bucle que verifica la hora actual y lanza grupos de corrutinas a través de recopilación; este enfoque resultó aceptable hasta que hubo docenas y cientos de tales corrutinas. Sin embargo, cuando su número superó los dos mil, tuve que pensar en organizar una cola de tareas normal con un intermediario, varios trabajadores, etc.
Primero decidí probar el apio, que ya había usado antes. Debido a la naturaleza asincrónica del proyecto, me sumergí en la pregunta y vi
Diré esto, el proyecto es muy interesante y funciona con bastante éxito en otras aplicaciones de nuestro equipo, y el propio autor dice que pudo implementarlo en producción utilizando un grupo asíncrono. Pero, lamentablemente, resultó que no me convenía mucho.
En este sentido, comencé a buscar альтернативы ¡y lo encontré! Los creadores del apio, concretamente, según tengo entendido.
Además, puedes mirar
¿Qué vamos a hacer?
Entonces, en una breve serie de artículos, le mostraré cómo recopilar datos de tareas en segundo plano usando Faust. La fuente de nuestro proyecto de ejemplo será, como sugiere el nombre,
PD: A juzgar por la confianza con la que se escribió el punto sobre el monitoreo, creo que el lector al final del último artículo todavía se verá así:
Requerimientos del proyecto
Debido a que ya lo prometí, hagamos una pequeña lista de lo que el servicio debería poder hacer:
- Cargue valores y una descripción general de ellos (incluidas ganancias y pérdidas, balance, flujo de caja, durante el último año), con regularidad
- Cargue datos históricos (para cada año comercial, encuentre valores extremos del precio de cierre de la negociación) - regularmente
- Cargue los datos comerciales más recientes, periódicamente
- Cargue una lista personalizada de indicadores para cada valor, periódicamente
Como era de esperar, elegimos un nombre para el proyecto desde cero: Horton
Estamos preparando la infraestructura.
El título es ciertamente sólido, sin embargo, todo lo que necesitas hacer es escribir una pequeña configuración para docker-compose con kafka (y zookeeper - en un contenedor), kafdrop (si queremos ver mensajes en temas), mongodb. Obtenemos [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Aquí no hay nada complicado en absoluto. Se declararon dos oyentes para Kafka: uno (interno) para uso dentro de la red compuesta y el segundo (externo) para solicitudes desde afuera, por lo que lo reenviaron al exterior. 2181 - puerto de cuidadores del zoológico. El resto, creo, está claro.
Preparando el esqueleto del proyecto.
En la versión básica, la estructura de nuestro proyecto debería verse así:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Todo lo que noté No lo tocamos todavía, simplemente creamos archivos vacíos.**
Creamos una estructura. Ahora agreguemos las dependencias necesarias, escribamos la configuración y nos conectemos a mongodb. No proporcionaré el texto completo de los archivos en el artículo para no retrasarlo, pero proporcionaré enlaces a las versiones necesarias.
Comencemos con las dependencias y meta sobre el proyecto:
A continuación, comenzamos a instalar dependencias y a crear un virtualenv (o puede crear la carpeta venv usted mismo y activar el entorno):
pip3 install poetry (если ещё не установлено)
poetry install
Ahora vamos a crear
Al conectarse a Mongo, todo es bastante sencillo. Anunciado
¿Qué pasará después?
El artículo no es muy largo, ya que aquí sólo hablo de motivación y preparación, así que no me culpen: prometo que la siguiente parte tendrá acción y gráficos.
Entonces, en la siguiente parte nosotros:
- Escribamos un pequeño cliente para alphavantage en aiohttp con solicitudes para los puntos finales que necesitamos.
- Creemos un agente que recopile datos sobre valores y precios históricos de los mismos.
Fuente: habr.com