Tâches de base sur Faust, Partie I : Introduction

Tâches de base sur Faust, Partie I : Introduction

Comment en suis-je arrivé à vivre ainsi ?

Il n'y a pas si longtemps, j'ai dû travailler sur le backend d'un projet très chargé, dans lequel il fallait organiser l'exécution régulière d'un grand nombre de tâches en arrière-plan avec des calculs complexes et des demandes de services tiers. Le projet est asynchrone et avant mon arrivée, il disposait d'un mécanisme simple pour lancer des tâches cron : une boucle vérifiant l'heure actuelle et lançant des groupes de coroutines via rassembler - cette approche s'est avérée acceptable jusqu'à ce qu'il y ait des dizaines et des centaines de ces coroutines Cependant, lorsque leur nombre dépassait les deux mille, j'ai dû penser à organiser une file d'attente de tâches normale avec un courtier, plusieurs ouvriers, etc.

J’ai d’abord décidé d’essayer le céleri, que j’avais déjà utilisé. En raison de la nature asynchrone du projet, j'ai plongé dans la question et j'ai vu статьюAussi bien que projet, créé par l'auteur de l'article.

Je dirai ceci, le projet est très intéressant et fonctionne avec beaucoup de succès dans d'autres applications de notre équipe, et l'auteur lui-même dit qu'il a pu le déployer en production en utilisant un pool asynchrone. Mais malheureusement, cela ne me convenait pas vraiment, comme il s’est avéré problème avec lancement groupé de tâches (voir. groupe). Au moment de la rédaction aide est déjà fermé, mais les travaux durent depuis un mois. En tout cas, bonne chance à l'auteur et bonne chance, car il y a déjà des choses qui fonctionnent sur la lib... en général, le problème est en moi et l'outil s'est avéré humide pour moi. De plus, certaines tâches avaient 2-3 requêtes http vers différents services, donc même lors de l'optimisation des tâches, nous créons 4 2 connexions TCP, environ toutes les XNUMX heures - pas très bon... J'aimerais créer une session pour un type de tâche lors du démarrage des travailleurs. Un peu plus sur le grand nombre de requêtes via aiohttp ici.

À cet égard, j'ai commencé à chercher Альтернативы et je l'ai trouvé ! Les créateurs du céleri, en particulier, si je comprends bien Demandez à Solem, a été créé Faust, à l'origine pour le projet Robinhood. Faust s'inspire de Kafka Streams et travaille avec Kafka en tant que courtier, rocksdb est également utilisé pour stocker les résultats du travail des agents, et le plus important est que la bibliothèque soit asynchrone.

Vous pouvez également regarder comparaison rapide céleri et faust des créateurs de ce dernier : leurs différences, différences entre courtiers, mise en œuvre d'une tâche élémentaire. Tout est assez simple, cependant, une fonctionnalité intéressante de Faust attire l'attention - des données saisies à transmettre au sujet.

Que ferons-nous?

Ainsi, dans une courte série d'articles, je vais vous montrer comment collecter des données à partir de tâches en arrière-plan à l'aide de Faust. La source de notre exemple de projet sera, comme son nom l'indique, alphavantage.co. Je vais montrer comment écrire des agents (récepteur, sujets, partitions), comment effectuer une exécution régulière (cron), les commandes Faust cli les plus pratiques (un wrapper sur clic), un clustering simple, et à la fin nous attacherons un datadog ( travailler hors de la boîte) et essayez de voir quelque chose. Pour stocker les données collectées, nous utiliserons mongodb et motor pour la connexion.

PS À en juger par la confiance avec laquelle le point sur la surveillance a été rédigé, je pense que le lecteur à la fin du dernier article ressemblera toujours à ceci :

Tâches de base sur Faust, Partie I : Introduction

Exigences du projet

Étant donné que j’ai déjà promis, dressons une courte liste de ce que le service devrait être capable de faire :

  1. Téléchargez régulièrement les titres et un aperçu de ceux-ci (y compris les profits et les pertes, le bilan, les flux de trésorerie - pour la dernière année)
  2. Téléchargez des données historiques (pour chaque année de négociation, recherchez les valeurs extrêmes du cours de clôture) - régulièrement
  3. Téléchargez régulièrement les dernières données de trading
  4. Téléchargez régulièrement une liste personnalisée d'indicateurs pour chaque titre

Comme prévu, nous choisissons un nom pour le projet à partir de zéro : Horton

Nous préparons l'infrastructure

Le titre est certainement fort, cependant, tout ce que vous avez à faire est d'écrire une petite configuration pour docker-compose avec kafka (et zookeeper - dans un conteneur), kafdrop (si nous voulons regarder les messages dans les sujets), mongodb. On a [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) de la forme suivante :

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Il n'y a rien de compliqué ici. Deux auditeurs ont été déclarés pour kafka : un (interne) pour une utilisation à l'intérieur du réseau composite et le second (externe) pour les demandes de l'extérieur, ils l'ont donc transmis à l'extérieur. 2181 — port de gardien de zoo. Le reste, je pense, est clair.

Préparer le squelette du projet

Dans la version de base, la structure de notre projet devrait ressembler à ceci :

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Tout ce que j'ai noté On n'y touche pas encore, on crée juste des fichiers vides.**

Nous avons créé une structure. Ajoutons maintenant les dépendances nécessaires, écrivons la configuration et connectons-nous à mongodb. Je ne fournirai pas le texte intégral des fichiers dans l'article, afin de ne pas le retarder, mais je fournirai des liens vers les versions nécessaires.

Commençons par les dépendances et les méta du projet - pyproject.toml

Ensuite, nous commençons à installer les dépendances et à créer un virtualenv (ou vous pouvez créer vous-même le dossier venv et activer l'environnement) :

pip3 install poetry (если ещё не установлено)
poetry install

Maintenant, créons config.yml - Informations d'identification et où frapper. Vous pouvez immédiatement y placer des données pour alphavantage. Eh bien, passons à config.py — extraire les données de l'application de notre configuration. Oui, je l'avoue, j'ai utilisé ma lib - sitri.

Lors de la connexion à Mongo, tout est assez simple. annoncé classe de clients pour se connecter et classe de base pour les cruds, pour faciliter les requêtes sur les collections.

Qu'est-ce qui va se passer?

L'article n'est pas très long, puisque je ne parle ici que de motivation et de préparation, alors ne me blâmez pas - je vous promets que la prochaine partie aura de l'action et des graphismes.

Ainsi, dans la partie suivante, nous :

  1. Écrivons un petit client pour alphavantage sur aiohttp avec des requêtes pour les points de terminaison dont nous avons besoin.
  2. Créons un agent qui collectera des données sur les titres et leurs prix historiques.

Code de projet

Code pour cette pièce

Source: habr.com

Ajouter un commentaire