Hintergrundaufgaben zu Faust, Teil I: Einleitung

Hintergrundaufgaben zu Faust, Teil I: Einleitung

Wie kam ich dazu, so zu leben?

Vor nicht allzu langer Zeit musste ich im Backend eines hochbelasteten Projekts arbeiten, bei dem es darum ging, die regelmäßige Ausführung einer Vielzahl von Hintergrundaufgaben mit komplexen Berechnungen und Anfragen für Dienste Dritter zu organisieren. Das Projekt ist asynchron und hatte, bevor ich kam, einen einfachen Mechanismus zum Starten von Cron-Aufgaben: eine Schleife, die die aktuelle Zeit überprüft und Gruppen von Coroutinen über Gathering startet – dieser Ansatz erwies sich als akzeptabel, bis es Dutzende und Hunderte solcher Coroutinen gab Als ihre Zahl jedoch zweitausend überstieg, musste ich darüber nachdenken, eine normale Aufgabenwarteschlange mit einem Makler, mehreren Arbeitern usw. zu organisieren.

Zuerst beschloss ich, Sellerie auszuprobieren, den ich zuvor verwendet hatte. Aufgrund der asynchronen Natur des Projekts habe ich mich mit der Frage beschäftigt und gesehen ArtikelSowie Projekt, erstellt vom Autor des Artikels.

Ich muss sagen, das Projekt ist sehr interessant und funktioniert in anderen Anwendungen unseres Teams recht erfolgreich, und der Autor selbst sagt, dass er es mithilfe eines asynchronen Pools in die Produktion einführen konnte. Aber leider hat es mir nicht wirklich gepasst, wie sich herausstellte Problem mit Gruppenstart von Aufgaben (siehe. Gruppe). Zum Zeitpunkt des Schreibens Problem ist bereits geschlossen, die Arbeiten dauern jedoch schon seit einem Monat an. Auf jeden Fall viel Glück für den Autor und alles Gute, da es bereits funktionierende Dinge in der Bibliothek gibt... Im Allgemeinen liegt der Punkt bei mir und das Tool hat sich für mich als feucht herausgestellt. Darüber hinaus hatten einige Aufgaben 2-3 HTTP-Anfragen an verschiedene Dienste, sodass wir selbst bei der Optimierung von Aufgaben ungefähr alle 4 Stunden 2 TCP-Verbindungen erstellen – nicht sehr gut ... Ich würde gerne eine Sitzung für einen Typ erstellen Aufgabe beim Start von Arbeitern. Etwas mehr über die große Anzahl an Anfragen über aiohttp hier.

Diesbezüglich begann ich zu suchen Alternativen und habe es gefunden! Genauer gesagt die Schöpfer von Sellerie, wie ich es verstehe Fragen Sie Solem, wurde erstellt Faust, ursprünglich für das Projekt Robinhood. Faust ist von Kafka Streams inspiriert und arbeitet mit Kafka als Broker zusammen, rocksdb wird auch zum Speichern von Ergebnissen aus der Arbeit von Agenten verwendet und das Wichtigste ist, dass die Bibliothek asynchron ist.

Sie können auch nachschauen schneller Vergleich Sellerie und Faust von den Schöpfern der letzteren: ihre Unterschiede, Unterschiede zwischen Maklern, Umsetzung einer elementaren Aufgabe. Alles ist ganz einfach, aber eine nette Funktion in Faust erregt Aufmerksamkeit – getippte Daten zur Übertragung auf das Thema.

Was werden wir machen?

Deshalb zeige ich Ihnen in einer kurzen Artikelserie, wie Sie mit Faust Daten aus Hintergrundaufgaben sammeln. Die Quelle für unser Beispielprojekt wird, wie der Name schon sagt, sein: alphavantage.co. Ich werde zeigen, wie man Agenten schreibt (Senke, Themen, Partitionen), wie man eine reguläre (Cron-)Ausführung durchführt, die bequemsten Faust-CLI-Befehle (ein Wrapper über einem Klick) und einfaches Clustering, und am Ende werden wir einen Datadog anhängen ( Arbeiten außerhalb der Box) und versuchen, etwas zu sehen. Um die gesammelten Daten zu speichern, verwenden wir Mongodb und Motor für die Verbindung.

PS: Gemessen an der Sicherheit, mit der der Punkt zur Überwachung geschrieben wurde, denke ich, dass der Leser am Ende des letzten Artikels immer noch ungefähr so ​​aussehen wird:

Hintergrundaufgaben zu Faust, Teil I: Einleitung

Projektanforderungen

Aufgrund der Tatsache, dass ich es bereits versprochen habe, erstellen wir eine kleine Liste dessen, was der Dienst können sollte:

  1. Laden Sie Wertpapiere und eine Übersicht darüber (einschließlich Gewinne und Verluste, Bilanz, Cashflow – für das letzte Jahr) regelmäßig hoch
  2. Laden Sie regelmäßig historische Daten hoch (finden Sie für jedes Handelsjahr Extremwerte des Handelsschlusskurses).
  3. Laden Sie regelmäßig die neuesten Handelsdaten hoch
  4. Laden Sie regelmäßig eine individuelle Liste mit Indikatoren für jedes Wertpapier hoch

Wie erwartet wählen wir von Grund auf einen Namen für das Projekt: Horton

Wir bereiten die Infrastruktur vor

Der Titel ist sicherlich stark, aber Sie müssen lediglich eine kleine Konfiguration für Docker-Compose mit Kafka (und Zookeeper – in einem Container), Kafdrop (wenn wir Nachrichten in Themen betrachten möchten) und Mongodb schreiben. Wir bekommen [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) der folgenden Form:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Hier gibt es überhaupt nichts Kompliziertes. Für Kafka wurden zwei Listener deklariert: einer (intern) für die Verwendung innerhalb des Verbundnetzwerks und der zweite (extern) für Anfragen von außen, sodass sie ihn nach außen weiterleiteten. 2181 – Zookeeper-Port. Der Rest ist meiner Meinung nach klar.

Vorbereitung des Grundgerüsts des Projekts

In der Grundversion sollte der Aufbau unseres Projekts so aussehen:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Alles was ich notiert habe Wir rühren es noch nicht an, wir erstellen lediglich leere Dateien.**

Wir haben eine Struktur geschaffen. Fügen wir nun die erforderlichen Abhängigkeiten hinzu, schreiben die Konfiguration und stellen eine Verbindung zu Mongodb her. Ich werde nicht den vollständigen Text der Dateien im Artikel bereitstellen, um ihn nicht zu verzögern, aber ich werde Links zu den erforderlichen Versionen bereitstellen.

Beginnen wir mit Abhängigkeiten und Metadaten zum Projekt – pyproject.toml

Als nächstes beginnen wir mit der Installation von Abhängigkeiten und der Erstellung einer virtuellen Umgebung (oder Sie können den Ordner „venv“ selbst erstellen und die Umgebung aktivieren):

pip3 install poetry (если ещё не установлено)
poetry install

Jetzt lasst uns erstellen config.yml - Anmeldeinformationen und wo man anklopfen kann. Dort können Sie sofort Daten für alphavantage hinterlegen. Nun, lasst uns weitermachen config.py – Extrahieren Sie Daten für die Anwendung aus unserer Konfiguration. Ja, ich gestehe, ich habe meine Freiheit genutzt – Sitri.

Bei der Verbindung mit Mongo ist alles ganz einfach. angekündigt Client-Klasse verbinden und Basisklasse für Cruds, um das Abfragen von Sammlungen zu erleichtern.

Was wird als nächstes passieren?

Der Artikel ist nicht sehr lang, da ich hier nur über Motivation und Vorbereitung spreche, also machen Sie mir keine Vorwürfe – ich verspreche, dass der nächste Teil Action und Grafiken enthalten wird.

Im nächsten Teil gehen wir also wie folgt vor:

  1. Schreiben wir einen kleinen Client für Alphavantage auf aiohttp mit Anfragen für die Endpunkte, die wir benötigen.
  2. Lassen Sie uns einen Agenten erstellen, der Daten zu Wertpapieren und historischen Preisen für diese sammelt.

Projektnummer

Code für diesen Teil

Source: habr.com

Kommentar hinzufügen