Pagrindinės užduotys apie Faustą, I dalis: Įvadas

Pagrindinės užduotys apie Faustą, I dalis: Įvadas

Kaip aš pradėjau taip gyventi?

Neseniai turėjau dirbti su labai apkrauto projekto fonu, kuriame reikėjo organizuoti reguliarų daugybės foninių užduočių vykdymą su sudėtingais skaičiavimais ir trečiųjų šalių paslaugų užklausomis. Projektas yra asinchroninis ir prieš man atvykstant turėjo paprastą cron paleidimo užduočių mechanizmą: kilpą, tikrinančią esamą laiką ir paleidžiant korutinių grupes per surinkimą – toks metodas pasirodė priimtinas, kol tokių korutinų nebuvo dešimtys ir šimtai. , tačiau kai jų skaičius viršijo du tūkstančius, teko galvoti apie normalios užduočių eilės organizavimą su brokeriu, keliais darbininkais ir pan.

Pirmiausia nusprendžiau išbandyti salierą, kurį naudojau anksčiau. Dėl projekto asinchroniškumo pasineriau į klausimą ir pamačiau straipsnisTaip pat projektas, kurį sukūrė straipsnio autorius.

Pasakysiu taip, projektas yra labai įdomus ir gana sėkmingai veikia kitose mūsų komandos programose, o pats autorius sako, kad jį pavyko išvynioti į gamybą naudojant asinchroninį baseiną. Bet, deja, tai man nelabai tiko, kaip paaiškėjo problema su grupiniu užduočių paleidimu (žr. grupė). Rašymo metu emisija jau uždaryta, tačiau darbai vyksta jau mėnesį. Bet kokiu atveju sėkmės autoriui ir viso ko geriausio, nes jau yra darbiniai reikalai ant lib... apskritai esmė manyje ir priemonė man pasirodė drėgna. Be to, kai kurios užduotys turėjo 2-3 http užklausas skirtingoms tarnyboms, todėl net optimizuodami užduotis sukuriame 4 tūkstančius TCP jungčių, maždaug kas 2 valandas - nelabai gerai... Norėčiau sukurti sesiją vieno tipo užduotis pradedant darbuotojus. Šiek tiek daugiau apie didelį užklausų skaičių per aiohttp čia.

Šiuo atžvilgiu aš pradėjau ieškoti alternatyvų ir rado! Salierų kūrėjai, konkrečiai, kaip suprantu Paklausk Solemo, buvo sukurtas "Faustas", iš pradžių skirtas projektui robinhood. Faustas yra įkvėptas Kafka Streams ir dirba su Kafka kaip brokeris, rocksdb taip pat naudojamas agentų darbo rezultatams saugoti, o svarbiausia, kad biblioteka būtų asinchroninė.

Be to, galite pažiūrėti greitas palyginimas salieras ir faustas iš pastarųjų kūrėjų: jų skirtumai, brokerių skirtumai, elementarios užduoties įgyvendinimas. Viskas gana paprasta, tačiau dėmesį patraukia graži fausto savybė – įvesti duomenys, skirti perduoti temai.

Ką mes darome?

Taigi, trumpoje straipsnių serijoje parodysiu, kaip rinkti duomenis iš foninių užduočių naudojant Faust. Mūsų pavyzdinio projekto šaltinis bus, kaip rodo pavadinimas, alphavantage.co. Pademonstruosiu kaip rašyti agentus (sink, temos, skaidiniai), kaip atlikti įprastą (cron) vykdymą, patogiausias faust cli komandas (a wrapper over click), paprastą klasterizavimą, o pabaigoje pridėsime datadog ( dirbti iš dėžutės) ir pabandyti ką nors pamatyti. Norėdami išsaugoti surinktus duomenis, prisijungimui naudosime mongodb ir variklį.

PS Sprendžiant iš pasitikėjimo, su kuriuo buvo parašytas taškas apie stebėjimą, manau, kad skaitytojas paskutinio straipsnio pabaigoje vis tiek atrodys maždaug taip:

Pagrindinės užduotys apie Faustą, I dalis: Įvadas

Projekto reikalavimai

Dėl to, kad jau pažadėjau, sudarykime nedidelį sąrašą, ką paslauga turėtų turėti:

  1. Įkelti vertybinius popierius ir jų apžvalgą (įskaitant pelną ir nuostolius, balansą, pinigų srautus – už praėjusius metus) – reguliariai
  2. Įkelkite istorinius duomenis (kiekvieniems prekybos metams, suraskite ekstremalias prekybos uždarymo kainos vertes) - reguliariai
  3. Įkelkite naujausius prekybos duomenis – reguliariai
  4. Reguliariai įkelkite individualų kiekvienos apsaugos rodiklių sąrašą

Kaip ir tikėtasi, mes pasirenkame projekto pavadinimą nuo nulio: hortonas

Ruošiame infrastruktūrą

Pavadinimas tikrai stiprus, tačiau tereikia parašyti nedidelę docker-compose konfigūraciją su kafka (ir zookeeper – viename konteineryje), kafdrop (jei norime žinutes žiūrėti temose), mongodb. Mes gauname [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) tokios formos:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Čia apskritai nėra nieko sudėtingo. Kafka buvo deklaruoti du klausytojai: vienas (vidinis) skirtas naudoti sudėtinio tinklo viduje, o antrasis (išorinis) užklausoms iš išorės, todėl jie persiuntė jį į išorę. 2181 — zoologijos sodo prižiūrėtojo uostas. Likusi dalis, manau, aišku.

Projekto skeleto ruošimas

Pagrindinėje versijoje mūsų projekto struktūra turėtų atrodyti taip:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Viskas, ką pažymėjau Kol kas jo neliečiame, tik kuriame tuščius failus.**

Sukūrėme struktūrą. Dabar pridėkime reikiamas priklausomybes, parašykite konfigūraciją ir prisijunkite prie mongodb. Straipsnyje nepateiksiu viso failų teksto, kad nevilkinčiau, bet pateiksiu nuorodas į reikalingas versijas.

Pradėkime nuo priklausomybių ir meta apie projektą - pyproject.toml

Tada pradedame diegti priklausomybes ir kurti virtualenv (arba galite patys sukurti aplanką venv ir suaktyvinti aplinką):

pip3 install poetry (если ещё не установлено)
poetry install

Dabar kurkime config.yml - Kredencialai ir kur pasibelsti. Čia galite iš karto patalpinti alfavantage duomenis. Na, pereikime prie config.py - iš mūsų konfigūracijos ištraukite programos duomenis. Taip, prisipažįstu, naudojau savo lib - sitri.

Prisijungiant prie Mongo viskas gana paprasta. paskelbė klientų klasė prisijungti ir bazinė klasė cruds, kad būtų lengviau pateikti užklausas apie kolekcijas.

Kas bus toliau?

Straipsnis nėra labai ilgas, nes čia kalbu tik apie motyvaciją ir pasiruošimą, tad nekaltinkite manęs – pažadu, kad kitoje dalyje bus veiksmas ir grafika.

Taigi, kitoje dalyje mes:

  1. Parašykime nedidelį alfavantage klientą aiohttp su užklausomis dėl mums reikalingų galinių taškų.
  2. Sukurkime agentą, kuris rinks duomenis apie vertybinius popierius ir jų istorines kainas.

Projekto kodas

Šios dalies kodas

Šaltinis: www.habr.com

Добавить комментарий