Sarcini de bază despre Faust, Partea I: Introducere

Sarcini de bază despre Faust, Partea I: Introducere

Cum am ajuns să trăiesc așa?

Nu cu mult timp în urmă a trebuit să lucrez la backend-ul unui proiect foarte încărcat, în care a fost necesar să organizez execuția regulată a unui număr mare de sarcini de fundal cu calcule complexe și solicitări pentru servicii terțe. Proiectul este asincron și înainte de a veni eu, avea un mecanism simplu pentru sarcinile de lansare cron: o buclă care verifică ora curentă și lansează grupuri de coroutine prin gather - această abordare s-a dovedit a fi acceptabilă până când au existat zeci și sute de astfel de corutine , însă, când numărul lor a depășit două mii, a trebuit să mă gândesc să organizez o coadă normală de sarcini cu un broker, mai mulți muncitori și așa mai departe.

Mai întâi am decis să încerc țelina, pe care o mai folosisem. Datorită naturii asincrone a proiectului, m-am scufundat în întrebare și am văzut статьюPrecum și proiect, creat de autorul articolului.

Voi spune asta, proiectul este foarte interesant și funcționează cu succes în alte aplicații ale echipei noastre, iar autorul însuși spune că a reușit să-l lanseze în producție folosind un pool asincron. Dar, din păcate, nu mi s-a potrivit cu adevărat, după cum sa dovedit problemă cu lansarea de grup a sarcinilor (vezi. grup). La momentul scrierii problema este deja închisă, însă, se lucrează de o lună. În orice caz, mult succes autorului și toate cele bune, din moment ce deja lucrează lucruri pe lib... în general, punctul este în mine și instrumentul s-a dovedit a fi umed pentru mine. În plus, unele task-uri aveau 2-3 solicitări http către diferite servicii, așa că, chiar și la optimizarea sarcinilor, creăm 4 mii de conexiuni TCP, aproximativ la fiecare 2 ore - nu foarte bine... Aș dori să creez o sesiune pentru un tip de sarcină la începerea lucrătorilor. Mai multe despre numărul mare de solicitări prin aiohttp aici.

În acest sens, am început să caut alternative și l-a găsit! Creatorii de țelină, mai exact, așa cum am înțeles-o Întreabă-l pe Solem, a fost creat Faust, inițial pentru proiect robinhood. Faust este inspirat de Kafka Streams și lucrează cu Kafka ca broker, rocksdb este folosit și pentru a stoca rezultate din munca agenților, iar cel mai important lucru este că biblioteca este asincronă.

De asemenea, poți să te uiți comparatie rapida țelină și faust de la creatorii acestuia din urmă: diferențele lor, diferențele dintre brokeri, implementarea unei sarcini elementare. Totul este destul de simplu, cu toate acestea, o caracteristică plăcută în faust atrage atenția - date tastate pentru a fi transmise la subiect.

Ce facem?

Deci, într-o serie scurtă de articole, vă voi arăta cum să colectați date din sarcinile de fundal folosind Faust. Sursa pentru proiectul nostru exemplu va fi, după cum sugerează și numele, alphaavantage.co. Voi demonstra cum să scrieți agenți (chiuvetă, subiecte, partiții), cum să faceți execuția obișnuită (cron), cele mai convenabile comenzi faust cli (un wrapper peste clic), grupare simplă, iar la sfârșit vom atașa un datadog ( lucrând din cutie) și încearcă să vezi ceva. Pentru a stoca datele colectate vom folosi mongodb și motor pentru conectare.

PS Judecând după încrederea cu care a fost scris punctul despre monitorizare, cred că cititorul de la sfârșitul ultimului articol va arăta în continuare cam așa:

Sarcini de bază despre Faust, Partea I: Introducere

Cerințele proiectului

Datorită faptului că am promis deja, să facem o mică listă cu ceea ce ar trebui să poată face serviciul:

  1. Încărcați titluri de valoare și o prezentare generală a acestora (inclusiv profituri și pierderi, bilanţ, flux de numerar - pentru ultimul an) - în mod regulat
  2. Încărcați date istorice (pentru fiecare an de tranzacționare, găsiți valori extreme ale prețului de închidere al tranzacționării) - în mod regulat
  3. Încărcați cele mai recente date de tranzacționare - în mod regulat
  4. Încărcați o listă personalizată de indicatori pentru fiecare securitate - în mod regulat

După cum era de așteptat, alegem un nume pentru proiect de la zero: Horton

Pregătim infrastructura

Titlul este cu siguranță puternic, totuși, tot ce trebuie să faceți este să scrieți o mică configurație pentru docker-compose cu kafka (și zookeeper - într-un singur container), kafdrop (dacă vrem să ne uităm la mesaje în subiecte), mongodb. Primim [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) de următoarea formă:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Nu este nimic complicat aici. Doi ascultători au fost declarați pentru kafka: unul (intern) pentru utilizare în interiorul rețelei compozite, iar al doilea (extern) pentru solicitări din exterior, așa că l-au transmis în exterior. 2181 — port grădina zoologică. Restul, cred, este clar.

Pregătirea scheletului proiectului

În versiunea de bază, structura proiectului nostru ar trebui să arate astfel:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Tot ce am notat Încă nu îl atingem, doar creăm fișiere goale.**

Am creat o structură. Acum să adăugăm dependențele necesare, să scriem configurația și să ne conectăm la mongodb. Nu voi furniza textul integral al fișierelor din articol, pentru a nu-l întârzia, dar voi oferi link-uri către versiunile necesare.

Să începem cu dependențe și meta despre proiect - pyproject.toml

În continuare, începem să instalăm dependențe și să creăm un virtualenv (sau puteți crea singur folderul venv și activați mediul):

pip3 install poetry (если ещё не установлено)
poetry install

Acum să creăm config.yml - Acreditări și unde să bat. Puteți plasa imediat date pentru alphaavantage acolo. Ei bine, să trecem la config.py — extrageți datele pentru aplicație din configurația noastră. Da, mărturisesc, mi-am folosit lib-ul... sitri.

Când vă conectați la Mongo, totul este destul de simplu. a anunţat clasa de client să se conecteze și clasa de bază pentru cruds, pentru a facilita efectuarea de interogări asupra colecțiilor.

Ce se va întâmpla în continuare?

Articolul nu este foarte lung, deoarece aici vorbesc doar despre motivație și pregătire, așa că nu mă învinovăți - promit că următoarea parte va avea acțiune și grafică.

Deci, în această parte următoare:

  1. Să scriem un client mic pentru alphaavantage pe aiohttp cu solicitări pentru punctele finale de care avem nevoie.
  2. Să creăm un agent care va colecta date despre valorile mobiliare și prețurile istorice pentru acestea.

Cod proiect

Cod pentru această parte

Sursa: www.habr.com

Adauga un comentariu