Baggrundsopgaver om Faust, del I: Introduktion

Baggrundsopgaver om Faust, del I: Introduktion

Hvordan kom jeg til at leve sådan her?

For ikke længe siden skulle jeg arbejde på backend af et meget belastet projekt, hvor det var nødvendigt at organisere den regelmæssige udførelse af en lang række baggrundsopgaver med komplekse beregninger og anmodninger om tredjepartstjenester. Projektet er asynkront, og før jeg kom, havde det en simpel mekanisme til cron-lancering af opgaver: en løkke, der tjekkede det aktuelle tidspunkt og lancerede grupper af coroutiner via gather - denne tilgang viste sig at være acceptabel, indtil der var snesevis og hundredvis af sådanne coroutiner , men da deres antal oversteg to tusinde, måtte jeg tænke på at organisere en normal opgavekø med en mægler, flere arbejdere og så videre.

Først besluttede jeg at prøve selleri, som jeg havde brugt før. På grund af projektets asynkrone karakter dykkede jeg ned i spørgsmålet og så en artikelsåvel som projekt, oprettet af artiklens forfatter.

Jeg vil sige dette, projektet er meget interessant og fungerer ganske vellykket i andre applikationer af vores team, og forfatteren siger selv, at han var i stand til at rulle det ud i produktion ved at bruge en asynkron pool. Men desværre passede det ikke rigtigt til mig, som det viste sig et problem med gruppelancering af opgaver (se. gruppe). I skrivende stund spørgsmål er allerede lukket, men arbejdet har stået på i en måned. Under alle omstændigheder held og lykke til forfatteren og alt det bedste, da der allerede er arbejdende ting på lib... generelt ligger pointen i mig, og værktøjet viste sig at være fugtigt for mig. Derudover havde nogle opgaver 2-3 http-forespørgsler til forskellige tjenester, så selv når vi optimerer opgaver, opretter vi 4 tusinde TCP-forbindelser, cirka hver 2. time - ikke særlig godt... Jeg vil gerne oprette en session for én type af opgave ved opstart af arbejdere. Lidt mere om det store antal forespørgsler via aiohttp her.

I den forbindelse begyndte jeg at lede alternativer og fandt det! Skaberne af selleri, specifikt, som jeg forstår det Spørg Solem, var lavet Faust, oprindeligt til projektet Robin Hood. Faust er inspireret af Kafka Streams og arbejder med Kafka som mægler, rocksdb bruges også til at gemme resultater fra agenters arbejde, og det vigtigste er, at biblioteket er asynkront.

Du kan også kigge hurtig sammenligning selleri og faust fra skaberne af sidstnævnte: deres forskelle, forskelle mellem mæglere, gennemførelse af en elementær opgave. Alt er ret simpelt, men en fin funktion i faust tiltrækker opmærksomhed - indtastede data til overførsel til emnet.

Hvad gør vi?

Så i en kort serie af artikler vil jeg vise dig, hvordan du indsamler data fra baggrundsopgaver ved hjælp af Faust. Kilden til vores eksempelprojekt vil være, som navnet antyder, alphavantage.co. Jeg vil demonstrere, hvordan man skriver agenter (sink, emner, partitioner), hvordan man laver regelmæssig (cron) eksekvering, de mest bekvemme faust cli-kommandoer (en indpakning over klik), simpel clustering, og til sidst vil vi vedhæfte en datadog ( arbejder ud af boksen) og prøv at se noget. For at gemme de indsamlede data vil vi bruge mongodb og motor til tilslutning.

PS At dømme efter den tillid, som punktet om overvågning blev skrevet med, tror jeg, at læseren i slutningen af ​​den sidste artikel stadig vil se sådan ud:

Baggrundsopgaver om Faust, del I: Introduktion

Projektkrav

På grund af det faktum, at jeg allerede har lovet, lad os lave en lille liste over, hvad tjenesten skal kunne:

  1. Upload værdipapirer og en oversigt over dem (inklusive overskud og tab, balance, pengestrøm - for det sidste år) - regelmæssigt
  2. Upload historiske data (for hvert handelsår, find ekstreme værdier af slutkursen for handel) - regelmæssigt
  3. Upload seneste handelsdata - regelmæssigt
  4. Upload en tilpasset liste over indikatorer for hvert værdipapir - regelmæssigt

Som forventet vælger vi et navn til projektet fra bunden: Horton

Vi forbereder infrastrukturen

Titlen er bestemt stærk, men alt du skal gøre er at skrive en lille konfiguration til docker-compose med kafka (og zookeeper - i én container), kafdrop (hvis vi vil se på beskeder i emner), mongodb. Vi får [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) af følgende form:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Der er ikke noget kompliceret her overhovedet. To lyttere blev erklæret for kafka: en (intern) til brug i det sammensatte netværk, og den anden (ekstern) til anmodninger udefra, så de videresendte den udenfor. 2181 - dyrepasserhavn. Resten synes jeg er klart.

Udarbejdelse af skelettet til projektet

I grundversionen skulle strukturen af ​​vores projekt se sådan ud:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Alt hvad jeg noterede Vi rører det ikke endnu, vi opretter bare tomme filer.**

Vi har skabt en struktur. Lad os nu tilføje de nødvendige afhængigheder, skrive konfigurationen og oprette forbindelse til mongodb. Jeg vil ikke give den fulde tekst af filerne i artiklen, for ikke at forsinke det, men jeg vil give links til de nødvendige versioner.

Lad os starte med afhængigheder og meta om projektet - pyproject.toml

Dernæst begynder vi at installere afhængigheder og oprette en virtualenv (eller du kan selv oprette venv-mappen og aktivere miljøet):

pip3 install poetry (если ещё не установлено)
poetry install

Lad os nu skabe config.yml - Legitimationsoplysninger og hvor man kan banke på. Du kan straks placere data til alphavantage der. Nå, lad os gå videre til config.py - udtræk data til applikationen fra vores konfiguration. Ja, jeg indrømmer, jeg brugte min lib - sitri.

Når du opretter forbindelse til Mongo, er alt ganske enkelt. annonceret klientklasse at forbinde og basisklasse for cruds, for at gøre det nemmere at lave forespørgsler på samlinger.

Hvad sker der nu?

Artiklen er ikke særlig lang, da jeg her kun taler om motivation og forberedelse, så bebrejde mig ikke - jeg lover, at den næste del vil have action og grafik.

Så i denne meget næste del:

  1. Lad os skrive en lille klient til alphavantage på aiohttp med anmodninger om de endepunkter, vi har brug for.
  2. Lad os oprette en agent, der vil indsamle data om værdipapirer og historiske priser for dem.

Projektkode

Kode for denne del

Kilde: www.habr.com

Tilføj en kommentar