Háttérfeladatok a Faustról, I. rész: Bevezetés

Háttérfeladatok a Faustról, I. rész: Bevezetés

Hogyan éltem így?

Nemrég egy nagy terhelésű projekt hátterén kellett dolgoznom, amelyben nagyszámú háttérfeladat rendszeres végrehajtását kellett megszervezni bonyolult számításokkal és külső szolgáltatások igénylésével. A projekt aszinkron, és mielőtt eljöttem, volt egy egyszerű mechanizmusa a cron-indító feladatokhoz: egy hurok, amely ellenőrzi az aktuális időt, és korutincsoportokat indít el az összegyűjtésen keresztül - ez a megközelítés elfogadhatónak bizonyult, amíg több tucat és száz ilyen korutin nem volt. , azonban amikor a számuk meghaladta a kétezret, el kellett gondolkodnom azon, hogy rendes feladatsort szervezek egy brókerrel, több munkással stb.

Először úgy döntöttem, hogy kipróbálom a Zellert, amit korábban is használtam. A projekt aszinkron jellege miatt belemerültem a kérdésbe és láttam статьюis terv, amelyet a cikk szerzője készített.

Elmondom, a projekt nagyon érdekes, és elég sikeresen működik csapatunk más alkalmazásaiban is, és maga a szerző is elmondja, hogy egy aszinkron pool használatával tudta gyártásba állítani. De sajnos nem igazán jött be, mint kiderült probléma a feladatok csoportos indításával (lásd. csoport). Az írás idején kérdés már zárva van, azonban a munka egy hónapja folyik. Mindenesetre sok sikert a szerzőnek és minden jót, hiszen a lib-en már működnek a dolgok... általánosságban a lényeg bennem van, és nekem nyirkosnak bizonyult az eszköz. Ráadásul egyes feladatoknál 2-3 http kérés volt a különböző szolgáltatások felé, így a feladatok optimalizálásakor is 4 ezer TCP kapcsolatot hozunk létre, kb 2 óránként - nem túl jó... Egy típusú munkamenethez szeretnék munkamenetet létrehozni. feladat a munkások indításakor. Egy kicsit többet az aiohttp-n keresztül érkező kérések nagy számáról itt.

Ezzel kapcsolatban elkezdtem keresni alternatívák és megtalálta! Konkrétan a zeller alkotói, ahogy én értem Kérdezd meg Solemet, elkészült Faust, eredetileg a projekthez robinhood. A Faust a Kafka Streams ihlette, és Kafkával brókerként dolgozik, a rocksdb-t az ügynökök munkájából származó eredmények tárolására is használják, és a legfontosabb, hogy a könyvtár aszinkron.

Emellett meg is nézheti gyors összehasonlítás zeller és faust az utóbbi alkotóitól: különbségeik, brókerek közötti különbségek, egy elemi feladat megvalósítása. Minden nagyon egyszerű, azonban a faust egy szép tulajdonsága felkelti a figyelmet - gépelt adatok a témához való továbbításhoz.

Mit csináljunk?

Tehát egy rövid cikksorozatban megmutatom, hogyan gyűjthetsz adatokat háttérfeladatokból a Faust segítségével. Példaprojektünk forrása, ahogy a neve is sugallja, alphavantage.co. Bemutatom az ágensek írását (sink, topicok, partíciók), a rendszeres (cron) végrehajtást, a legkényelmesebb faust cli parancsokat (a wrapper over click), az egyszerű klaszterezést, és a végére csatolunk egy datadogot ( kidolgozva), és próbáljon meg valamit látni. Az összegyűjtött adatok tárolására mongodb-t és motort használunk a csatlakozáshoz.

PS Abból a magabiztosságból ítélve, amellyel a monitorozásról szóló pontot megírták, úgy gondolom, hogy az utolsó cikk végén az olvasó továbbra is valahogy így fog kinézni:

Háttérfeladatok a Faustról, I. rész: Bevezetés

A projekt követelményei

Tekintettel arra, hogy már megígértem, készítsünk egy kis listát arról, hogy mire kell képes a szolgáltatás:

  1. Töltsön fel értékpapírokat és azok áttekintését (beleértve a nyereséget és veszteséget, mérleget, cash flow-t - az elmúlt évre vonatkozóan) - rendszeresen
  2. Töltsön fel történelmi adatokat (minden kereskedési évre, keresse meg a kereskedési záróár szélső értékeit) - rendszeresen
  3. Töltsd fel a legfrissebb kereskedési adatokat – rendszeresen
  4. Töltsön fel egy testreszabott indikátorlistát minden egyes értékpapírhoz – rendszeresen

Ahogy az várható volt, a semmiből választunk nevet a projektnek: Horton

Előkészítjük az infrastruktúrát

A cím minden bizonnyal erős, de nem kell mást tenned, mint írnod ​​egy kis konfigot a docker-compose-hoz a kafka (és a zookeeper - egy konténerben), a kafdrop (ha az üzeneteket topikban akarjuk nézni), mongodb. Kapunk [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) a következő formában:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Itt egyáltalán nincs semmi bonyolult. A kafkához két hallgatót deklaráltak: az egyiket (belső) az összetett hálózaton belüli használatra, a másodikat (külsőt) a külső kérésekre, így továbbították azt kívülről. 2181 — állatkerti kikötő. A többi szerintem egyértelmű.

A projekt vázának elkészítése

Az alapváltozatban a projektünk felépítése így néz ki:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Minden, amit megjegyeztem Még nem nyúlunk hozzá, csak üres fájlokat hozunk létre.**

Létrehoztunk egy szerkezetet. Most adjuk hozzá a szükséges függőségeket, írjuk meg a konfigurációt és csatlakozzunk a mongodb-hez. A cikkben nem adom meg a fájlok teljes szövegét, hogy ne késlekedjen, de a szükséges verziókhoz hivatkozásokat adok.

Kezdjük a függőségekkel és a projekt metainformációival - pyproject.toml

Ezután megkezdjük a függőségek telepítését és a virtualenv létrehozását (vagy létrehozhatja saját maga a venv mappát, és aktiválhatja a környezetet):

pip3 install poetry (если ещё не установлено)
poetry install

Most pedig alkossunk config.yml - Hitelesítési adatok és a kopogtatás helye. Ott azonnal elhelyezheti az alfaelőnyhöz szükséges adatokat. Nos, menjünk tovább config.py — az alkalmazás adatainak kinyerése a konfigurációnkból. Igen, bevallom, használtam a lib - sitri.

A Mongohoz való csatlakozáskor minden nagyon egyszerű. bejelentett ügyfél osztály csatlakozni és alaposztály a cruds számára, hogy megkönnyítse a gyűjtemények lekérdezését.

Mi fog ezután történni?

A cikk nem túl hosszú, mivel itt csak a motivációról és a felkészülésről beszélek, szóval ne engem hibáztass - ígérem, a következő rész akció és grafika lesz.

Tehát ebben a következő részben mi:

  1. Írjunk egy kis klienst az alphavantage számára az aiohttp-n a szükséges végpontok kérésével.
  2. Hozzon létre egy ügynököt, aki adatokat gyűjt az értékpapírokról és a korábbi árakról.

Projekt kód

Ennek a résznek a kódja

Forrás: will.com

Hozzászólás