Základní úlohy o Faustovi, Část I: Úvod

Základní úlohy o Faustovi, Část I: Úvod

Jak jsem přišel takhle žít?

Nedávno jsem musel pracovat na backendu vysoce nabitého projektu, ve kterém bylo nutné organizovat pravidelné provádění velkého množství úloh na pozadí se složitými výpočty a požadavky na služby třetích stran. Projekt je asynchronní a než jsem přišel, měl jednoduchý mechanismus pro spouštění úloh: smyčka kontrolující aktuální čas a spouštění skupin korutin pomocí sběru – tento přístup se ukázal jako přijatelný, dokud takových korutin nebyly desítky a stovky. Když však jejich počet přesáhl dva tisíce, musel jsem přemýšlet o uspořádání normální fronty úkolů s makléřem, několika pracovníky a tak dále.

Nejprve jsem se rozhodla vyzkoušet Celer, který jsem dříve používala. Vzhledem k asynchronnímu charakteru projektu jsem se ponořil do otázky a viděl ČlánekStejně jako projekt, kterou vytvořil autor článku.

Řeknu to tak, že projekt je velmi zajímavý a docela úspěšně funguje i v jiných aplikacích našeho týmu a sám autor říká, že ho dokázal rozjet do výroby pomocí asynchronního fondu. Ale bohužel mi to moc nevyhovovalo, jak se ukázalo problém se skupinovým spouštěním úloh (viz. skupina). V době psaní otázka je již uzavřen, nicméně práce probíhají již měsíc. V kazdem pripade autorovi zdar a vse nejlepsi, kdyz uz na libu funguji veci... obecne je pointa ve me a nastroj se mi ukazal jako vlhky. Navíc některé úlohy měly 2-3 http požadavky na různé služby, takže i při optimalizaci úloh vytváříme 4 tisíce TCP spojení, přibližně každé 2 hodiny - ne moc dobré... Chtěl bych vytvořit relaci pro jeden typ úkol při nástupu pracovníků. Trochu více o velkém počtu požadavků přes aiohttp zde.

V tomto ohledu jsem začal hledat alternativy a našel to! Tvůrci celeru, konkrétně, jak to chápu já Zeptejte se Solema, byl vytvořen Faust, původně pro projekt robinhood. Faust se inspiruje Kafka Streams a spolupracuje s Kafkou jako broker, rocksdb slouží také k ukládání výsledků z práce agentů a nejdůležitější je, že knihovna je asynchronní.

Také se můžete podívat rychlé srovnání celer a faust od tvůrců posledně jmenovaných: jejich rozdíly, rozdíly mezi makléři, realizace elementárního úkolu. Vše je celkem jednoduché, nicméně pozornost přitahuje příjemná funkce ve faustu - typizovaná data pro přenos k tématu.

Co děláme?

V krátké sérii článků vám tedy ukážu, jak sbírat data z úloh na pozadí pomocí Fausta. Zdrojem pro náš vzorový projekt bude, jak název napovídá, alphavantage.co. Předvedu, jak psát agenty (sink, témata, oddíly), jak dělat pravidelné (cron) spouštění, nejpohodlnější faust cli příkazy (wrapper over click), jednoduché shlukování a na závěr připojíme datadog ( práce po vybalení z krabice) a pokuste se něco vidět. Pro uložení nasbíraných dat použijeme mongodb a motor pro připojení.

PS Soudě podle sebevědomí, se kterým byl bod o monitorování napsán, si myslím, že čtenář na konci minulého článku bude stále vypadat nějak takto:

Základní úlohy o Faustovi, Část I: Úvod

Požadavky projektu

Vzhledem k tomu, že jsem již slíbil, udělejme si malý výčet toho, co by služba měla umět:

  1. Nahrávejte cenné papíry a jejich přehled (včetně zisků a ztrát, rozvahy, cash flow - za poslední rok) - pravidelně
  2. Nahrávejte historická data (pro každý obchodní rok najděte extrémní hodnoty zavírací ceny obchodování) - pravidelně
  3. Nahrávejte nejnovější obchodní data – pravidelně
  4. Nahrajte přizpůsobený seznam indikátorů pro každé zabezpečení – pravidelně

Podle očekávání jsme pro projekt zvolili název od začátku: Horton

Připravujeme infrastrukturu

Název je jistě silný, nicméně stačí napsat malou konfiguraci pro docker-compose s kafka (a zookeeper - v jednom kontejneru), kafdrop (pokud se chceme podívat na zprávy v tématech), mongodb. Dostaneme [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) v následujícím formuláři:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Není zde vůbec nic složitého. Pro kafka byli deklarováni dva posluchači: jeden (interní) pro použití uvnitř kompozitní sítě a druhý (externí) pro požadavky zvenčí, takže to přeposlali ven. 2181 — přístav chovatelů zoo. Zbytek je myslím jasný.

Příprava kostry projektu

V základní verzi by struktura našeho projektu měla vypadat takto:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Vše, co jsem zaznamenal Zatím na to nesaháme, jen vytváříme prázdné soubory.**

Vytvořili jsme strukturu. Nyní přidáme potřebné závislosti, napíšeme konfiguraci a připojíme se k mongodb. Úplné znění souborů v článku neuvedu, abych to nezdržoval, ale uvedu odkazy na potřebné verze.

Začněme závislostmi a meta o projektu - pyproject.toml

Dále začneme instalovat závislosti a vytvářet virtualenv (nebo si můžete sami vytvořit složku venv a aktivovat prostředí):

pip3 install poetry (если ещё не установлено)
poetry install

Nyní pojďme tvořit config.yml - Pověření a kde zaklepat. Můžete tam okamžitě umístit data pro alphavantage. No, pojďme dál config.py — extrahujte data pro aplikaci z naší konfigurace. Ano, přiznávám, použil jsem svůj lib - sitri.

Při připojení k Mongo je vše docela jednoduché. oznámil třída klientů připojit a základní třída pro crud, aby bylo snazší provádět dotazy na sbírky.

Co se stane příště?

Článek není moc dlouhý, jelikož zde mluvím pouze o motivaci a přípravě, takže mi to nevyčítejte - slibuji, že další díl bude mít akci a grafiku.

Takže v této další části:

  1. Pojďme napsat malého klienta pro alphavantage na aiohttp s požadavky na koncové body, které potřebujeme.
  2. Vytvořme si agenta, který za ně bude sbírat data o cenných papírech a historických cenách.

Kód projektu

Kód pro tuto část

Zdroj: www.habr.com

Přidat komentář