Comment en suis-je arrivé à vivre ainsi ?
Il n'y a pas si longtemps, j'ai dû travailler sur le backend d'un projet très chargé, dans lequel il fallait organiser l'exécution régulière d'un grand nombre de tâches en arrière-plan avec des calculs complexes et des demandes de services tiers. Le projet est asynchrone et avant mon arrivée, il disposait d'un mécanisme simple pour lancer des tâches cron : une boucle vérifiant l'heure actuelle et lançant des groupes de coroutines via rassembler - cette approche s'est avérée acceptable jusqu'à ce qu'il y ait des dizaines et des centaines de ces coroutines Cependant, lorsque leur nombre dépassait les deux mille, j'ai dû penser à organiser une file d'attente de tâches normale avec un courtier, plusieurs ouvriers, etc.
J’ai d’abord décidé d’essayer le céleri, que j’avais déjà utilisé. En raison de la nature asynchrone du projet, j'ai plongé dans la question et j'ai vu
Je dirai ceci, le projet est très intéressant et fonctionne avec beaucoup de succès dans d'autres applications de notre équipe, et l'auteur lui-même dit qu'il a pu le déployer en production en utilisant un pool asynchrone. Mais malheureusement, cela ne me convenait pas vraiment, comme il s’est avéré
À cet égard, j'ai commencé à chercher Альтернативы et je l'ai trouvé ! Les créateurs du céleri, en particulier, si je comprends bien
Vous pouvez également regarder
Que ferons-nous?
Ainsi, dans une courte série d'articles, je vais vous montrer comment collecter des données à partir de tâches en arrière-plan à l'aide de Faust. La source de notre exemple de projet sera, comme son nom l'indique,
PS À en juger par la confiance avec laquelle le point sur la surveillance a été rédigé, je pense que le lecteur à la fin du dernier article ressemblera toujours à ceci :
Exigences du projet
Étant donné que j’ai déjà promis, dressons une courte liste de ce que le service devrait être capable de faire :
- Téléchargez régulièrement les titres et un aperçu de ceux-ci (y compris les profits et les pertes, le bilan, les flux de trésorerie - pour la dernière année)
- Téléchargez des données historiques (pour chaque année de négociation, recherchez les valeurs extrêmes du cours de clôture) - régulièrement
- Téléchargez régulièrement les dernières données de trading
- Téléchargez régulièrement une liste personnalisée d'indicateurs pour chaque titre
Comme prévu, nous choisissons un nom pour le projet à partir de zéro : Horton
Nous préparons l'infrastructure
Le titre est certainement fort, cependant, tout ce que vous avez à faire est d'écrire une petite configuration pour docker-compose avec kafka (et zookeeper - dans un conteneur), kafdrop (si nous voulons regarder les messages dans les sujets), mongodb. On a [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Il n'y a rien de compliqué ici. Deux auditeurs ont été déclarés pour kafka : un (interne) pour une utilisation à l'intérieur du réseau composite et le second (externe) pour les demandes de l'extérieur, ils l'ont donc transmis à l'extérieur. 2181 — port de gardien de zoo. Le reste, je pense, est clair.
Préparer le squelette du projet
Dans la version de base, la structure de notre projet devrait ressembler à ceci :
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Tout ce que j'ai noté On n'y touche pas encore, on crée juste des fichiers vides.**
Nous avons créé une structure. Ajoutons maintenant les dépendances nécessaires, écrivons la configuration et connectons-nous à mongodb. Je ne fournirai pas le texte intégral des fichiers dans l'article, afin de ne pas le retarder, mais je fournirai des liens vers les versions nécessaires.
Commençons par les dépendances et les méta du projet -
Ensuite, nous commençons à installer les dépendances et à créer un virtualenv (ou vous pouvez créer vous-même le dossier venv et activer l'environnement) :
pip3 install poetry (если ещё не установлено)
poetry install
Maintenant, créons
Lors de la connexion à Mongo, tout est assez simple. annoncé
Qu'est-ce qui va se passer?
L'article n'est pas très long, puisque je ne parle ici que de motivation et de préparation, alors ne me blâmez pas - je vous promets que la prochaine partie aura de l'action et des graphismes.
Ainsi, dans la partie suivante, nous :
- Écrivons un petit client pour alphavantage sur aiohttp avec des requêtes pour les points de terminaison dont nous avons besoin.
- Créons un agent qui collectera des données sur les titres et leurs prix historiques.
Source: habr.com