Bakgrunnsoppgaver om Faust, del I: Introduksjon

Bakgrunnsoppgaver om Faust, del I: Introduksjon

Hvordan kom jeg til å leve slik?

For ikke lenge siden måtte jeg jobbe på baksiden av et svært belastet prosjekt, der det var nødvendig å organisere regelmessig utførelse av et stort antall bakgrunnsoppgaver med komplekse beregninger og forespørsler om tredjepartstjenester. Prosjektet er asynkront og før jeg kom, hadde det en enkel mekanisme for cron-lanseringsoppgaver: en sløyfe som sjekker gjeldende tid og lanserer grupper med koroutiner via gather - denne tilnærmingen viste seg å være akseptabel inntil det var dusinvis og hundrevis av slike koroutiner , men da antallet oversteg to tusen, måtte jeg tenke på å organisere en normal oppgavekø med en megler, flere arbeidere, og så videre.

Først bestemte jeg meg for å prøve ut Selleri, som jeg hadde brukt før. På grunn av prosjektets asynkrone natur, dykket jeg ned i spørsmålet og så artikkelså vel som prosjekt, laget av forfatteren av artikkelen.

Jeg vil si dette, prosjektet er veldig interessant og fungerer ganske vellykket i andre applikasjoner av teamet vårt, og forfatteren selv sier at han var i stand til å rulle det ut i produksjon ved å bruke et asynkront basseng. Men dessverre passet det ikke meg, som det viste seg problem med gruppelansering av oppgaver (se. gruppe). I skrivende stund utstedelse er allerede stengt, men arbeidet har pågått i en måned. Uansett, lykke til forfatteren og alt godt, siden det allerede er ting som fungerer på lib... generelt sett ligger poenget i meg og verktøyet viste seg å være fuktig for meg. I tillegg hadde noen oppgaver 2-3 http-forespørsler til forskjellige tjenester, så selv når vi optimaliserer oppgaver oppretter vi 4 TCP-forbindelser, omtrent hver 2. time - ikke veldig bra... Jeg vil gjerne lage en økt for én type oppgave ved oppstart av arbeidere. Litt mer om det store antallet forespørsler via aiohttp her.

I denne forbindelse begynte jeg å lete alternativer og fant den! Skaperne av selleri, nærmere bestemt, slik jeg forstår det Spør Solem, ble laget Faust, opprinnelig for prosjektet Robin Hood. Faust er inspirert av Kafka Streams og jobber med Kafka som megler, rocksdb brukes også til å lagre resultater fra agenters arbeid, og det viktigste er at biblioteket er asynkront.

Du kan også se rask sammenligning selleri og faust fra skaperne av sistnevnte: deres forskjeller, forskjeller mellom meglere, gjennomføring av en elementær oppgave. Alt er ganske enkelt, men en fin funksjon i faust tiltrekker seg oppmerksomhet - maskinskrevne data for overføring til emnet.

Hva skal vi gjøre?

Så, i en kort serie med artikler, skal jeg vise deg hvordan du samler inn data fra bakgrunnsoppgaver ved å bruke Faust. Kilden til vårt eksempelprosjekt vil være, som navnet antyder, alphavantage.co. Jeg vil demonstrere hvordan man skriver agenter (sink, emner, partisjoner), hvordan man gjør vanlig (cron) kjøring, de mest praktiske faust cli-kommandoene (en wrapper over click), enkel clustering, og til slutt vil vi legge ved en datadog ( jobber ut av boksen) og prøv å finne noe å se. For å lagre de innsamlede dataene vil vi bruke mongodb og motor for tilkobling.

PS Å dømme etter tilliten som punktet om overvåking ble skrevet med, tror jeg at leseren på slutten av den siste artikkelen fortsatt vil se omtrent slik ut:

Bakgrunnsoppgaver om Faust, del I: Introduksjon

Prosjektkrav

På grunn av det faktum at jeg allerede har lovet, la oss lage en liten liste over hva tjenesten skal kunne gjøre:

  1. Last opp verdipapirer og en oversikt over dem (inkludert fortjeneste og tap, balanse, kontantstrøm - for det siste året) - regelmessig
  2. Last opp historiske data (for hvert handelsår, finn ekstreme verdier av sluttkursen for handel) - regelmessig
  3. Last opp siste handelsdata - regelmessig
  4. Last opp en tilpasset liste over indikatorer for hvert verdipapir – regelmessig

Som forventet velger vi et navn på prosjektet fra bunnen av: Horton

Vi forbereder infrastrukturen

Tittelen er absolutt sterk, men alt du trenger å gjøre er å skrive en liten konfigurasjon for docker-compose med kafka (og zookeeper - i en container), kafdrop (hvis vi vil se på meldinger i emner), mongodb. Vi får [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) av følgende form:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Det er ikke noe komplisert her i det hele tatt. To lyttere ble erklært for kafka: en (intern) for bruk i det sammensatte nettverket, og den andre (ekstern) for forespørsler utenfra, så de videresendte den utenfor. 2181 - dyrepasserhavn. Resten tror jeg er klart.

Forbereder skjelettet til prosjektet

I grunnversjonen skal strukturen til prosjektet vårt se slik ut:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Alt jeg noterte Vi rører det ikke ennå, vi lager bare tomme filer.**

Vi har laget en struktur. La oss nå legge til de nødvendige avhengighetene, skrive konfigurasjonen og koble til mongodb. Jeg vil ikke gi hele teksten til filene i artikkelen, for ikke å forsinke det, men jeg vil gi lenker til de nødvendige versjonene.

La oss starte med avhengigheter og meta om prosjektet - pyproject.toml

Deretter begynner vi å installere avhengigheter og lage en virtualenv (eller du kan lage venv-mappen selv og aktivere miljøet):

pip3 install poetry (если ещё не установлено)
poetry install

La oss nå lage config.yml - Legitimasjon og hvor du skal banke. Du kan umiddelbart plassere data for alphavantage der. Vel, la oss gå videre til config.py - trekk ut data for applikasjonen fra vår konfigurasjon. Ja, jeg innrømmer, jeg brukte min lib - sitri.

Når du kobler til Mongo, er alt ganske enkelt. annonsert klientklasse å koble til og grunnklasse for cruds, for å gjøre det lettere å foreta søk på samlinger.

Hva vil skje videre?

Artikkelen er ikke veldig lang, siden jeg her bare snakker om motivasjon og forberedelse, så ikke klandre meg - jeg lover at neste del vil ha action og grafikk.

Så i denne neste delen:

  1. La oss skrive en liten klient for alphavantage på aiohttp med forespørsler om endepunktene vi trenger.
  2. La oss opprette en agent som vil samle inn data om verdipapirer og historiske priser for dem.

Prosjektkode

Kode for denne delen

Kilde: www.habr.com

Legg til en kommentar