Фонавыя задачы на ​​Faust, Частка I: Увядзенне

Фонавыя задачы на ​​Faust, Частка I: Увядзенне

Як я дайшоў да жыцця такі?

Не так даўно мне прыйшлося працаваць над бэкэндам высока нагружанага праекту, у якім трэба было арганізаваць рэгулярнае выкананне вялікай колькасці фонавых задач са складанымі вылічэннямі і запытамі на іншыя сэрвісы. Праект асінхронны і да таго, як я прыйшоў, у ім быў просты механізм крон-запуску задач: цыкл з праверкай бягучага часу і запуск груп каруцін праз gather - такі падыход апынуўся прымальным да моманту, пакуль такіх каруцін былі дзясяткі і сотні, аднак, калі іх колькасць пераваліла праз дзве тысячы, прыйшлося думаць аб арганізацыі нармальнай чаргі задач з брокерам, некалькімі воркерамі і іншым.

Спачатку я вырашыў апрабаваць Celery, якім карыстаўся раней. У сувязі з асінхроннасцю праекту, я пагрузіўся ў пытанне і ўбачыў артыкул, А так жа праект, створаны аўтарам артыкула.

Скажу так, праект вельмі цікавы і суцэль паспяхова працуе ў іншых прыкладаннях нашай каманды, ды і сам аўтар кажа аб тым, што змог выкаціць у прод, заюзаўшы асінхронны пул. Але, нажаль, мне гэта не вельмі падышло, бо выявілася праблема з групавым запускам задач (гл. група). На момант напісання артыкула пытанне ужо зачынена, аднак, праца вялася на працягу месяца. У любым выпадку, аўтару поспеху і ўсіх выгод, бо працоўныя штукі на лібе ўжо ёсць… увогуле, справа ўва мне і для мяне апынулася прылада сыраватая. У дадатак, у некаторых задачах было па 2-3 http-запыты да розных сэрвісаў, такім чынам нават пры аптымізацыі задач мы ствараем 4 тысячы tcp злучэнняў, прыкладна кожныя 2 гадзіны – не вельмі… Хацелася б ствараць сесію на адзін тып задач пры запуску воркераў. Крыху падрабязней пра вялікую колькасць запытаў праз aiohttp тут.

У сувязі з гэтым, я пачаў шукаць альтэрнатывы і знайшоў! Стваральнікамі celery, а менавіта, як я зразумеў Ask Solem, была створана Фаўст, першапачаткова для праекту Робін Гуд. Faust напісана пад уражаннем ад Kafka Streams і працуе з Kafka у якасці брокера, таксама для захоўвання вынікаў ад працы агентаў выкарыстоўваецца rocksdb, а самае галоўнае - гэта тое, што бібліятэка асінхронная.

Таксама, можаце зірнуць кароткае параўнанне celery і faust ад стваральнікаў апошняй: іх адрозненні, адрозненні брокераў, рэалізацыю элементарнай задачы. Усё вельмі проста, аднак, у faust прыцягвае ўвагу прыемная асаблівасць – тыпізаваныя дадзеныя для перадачы ў топік.

Што будзем рабіць?

Такім чынам, у невялікай серыі артыкулаў я пакажу, як збіраць дадзеныя ў фонавых задачах з дапамогай Faust. Крыніцай для нашага прыклад-праекта будзе, як вынікае з назвы, alphavantage.co. Я прадэманструю, як пісаць агентаў (sink, топікі, партіціі), як рабіць рэгулярнае (cron) выкананне, найзручныя cli-каманды faust (абгортка над click), просты кластэрынг, а ў канцы прыкруцім datadog (які працуе са скрынкі) і паспрабуем, што -небудзь убачыць. Для захоўвання сабраных дадзеных будзем выкарыстоўваць mongodb і motor для падключэння.

PS Судзячы па ўпэўненасці, з якой напісаны пункт пра маніторынг, думаю, што чытач у канцы апошняга артыкула ўсёткі будзе выглядаць, неяк так:

Фонавыя задачы на ​​Faust, Частка I: Увядзенне

Патрабаванні да праекту

У сувязі з тым, што я ўжо паспеў наабяцаць, складзем невялікі спісачак таго, што павінен умець сэрвіс:

  1. Выгружаць каштоўныя паперы і overview па іх (у тым ліку прыбыткі і страты, баланс, cash flow - за апошні год) - рэгулярна
  2. Выгружаць гістарычныя дадзеныя (для кожнага гандлёвага года знаходзіць экстрэмумы кошту закрыцця таргоў) - рэгулярна
  3. Выгружаць апошнія гандлёвыя дадзеныя - рэгулярна
  4. Выгружаць наладжаны спіс індыкатараў для кожнай каштоўнай паперы - рэгулярна

Як належыць, выбіраемы імя праекту са столі: Хортон

Рыхтуем інфраструктуру

Загаловак вядома моцны, аднак, усё што трэба зрабіць - гэта напісаць невялікі канфіг для docker-compose з kafka (і zookeeper - у адным кантэйнеры), kafdrop (калі нам захочацца паглядзець паведамленні ў топікі), mongodb. Атрымліваем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) наступнага выгляду:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Тут увогуле нічога складанага. Для kafka абвясцілі два listener'а: аднаго (internal) для выкарыстання ўсярэдзіне кампознай сеткі, а другога (external) для запытаў з-за, таму пракінулі яго вонкі. 2181 - порт zookeeper'а. Па астатнім, я думаю, зразумела.

Рыхтуем шкілет праекту

У базавым варыянце структура нашага праекту павінна выглядаць так:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Усё, што я адзначыў мы пакуль не чапаем, а проста ствараем пустыя файлы.

Стварылі структуру. Зараз дадамо неабходныя залежнасці, напішам канфіг і падлучэнне да mongodb. Поўны тэкст файлаў прыводзіць у артыкуле не буду, каб не зацягваць, а зраблю спасылкі на патрэбныя версіі.

Пачнем з залежнасцяў і мэта аб праекце - pyproject.toml

Далей, запускаем усталёўку залежнасцяў і стварэнне virtualenv (альбо, можаце самі стварыць тэчку venv і актываваць асяроддзе):

pip3 install poetry (если ещё не установлено)
poetry install

Цяпер створым config.yml - крэды і куды стукацца. Адразу туды можна размясціць і дадзеныя для alphavantage. Ну і пераходзім да config.py - здабываем дадзеныя для прыкладання з нашага канфіга. Так, каюся, заюзаў сваю лібу - сітры.

Па падлучэнні з монга - зусім усё проста. Аб'явілі клас кліента для падлучэння і базавы клас для крудов, каб прасцей было рабіць запыты па калекцыях.

Што будзе далей?

Артыкул атрымаўся не вельмі вялікі, бо тут я кажу толькі пра матывацыю і падрыхтоўку, таму не крыўдуйце — абяцаю, што ў наступнай частцы будзе экшн і графіка.

Такім чынам, а ў гэтай самай наступнай частцы мы:

  1. Напішам невялікі кліентык для alphavantage на aiohttp з запытамі на патрэбныя нам эндпаінты.
  2. Зробім агента, які будзе збіраць дадзеныя аб каштоўных паперах і гістарычныя кошты па іх.

Код праекту

Код гэтай часткі

Крыніца: habr.com

Дадаць каментар