Фонові завдання Faust, Частина I: Введення

Фонові завдання Faust, Частина I: Введення

Як я дійшов до такого життя?

Нещодавно мені довелося працювати над бекендом високо навантаженого проекту, в якому потрібно було організувати регулярне виконання великої кількості фонових завдань зі складними обчисленнями та запитами на сторонні сервіси. Проект асинхронний і до того, як я прийшов, у ньому був простий механізм крон-запуску завдань: цикл з перевіркою поточного часу і запуск груп корутин через gather - такий підхід виявився прийнятним до моменту, поки таких корутин були десятки і сотні, проте коли їхня кількість перевалила через дві тисячі, довелося думати про організацію нормальної черги завдань з брокером, кількома воркерами та іншим.

Спочатку я вирішив випробувати Celery, яким я користувався раніше. У зв'язку з асинхронністю проекту, я поринув у запитання та побачив статтю, А так само проект, створений автором статті

Скажу так, проект дуже цікавий і успішно працює в інших додатках нашої команди, та й сам автор говорить про те, що зміг викотити в прод, заюзавши асинхронний пул. Але, на жаль, мені це не дуже підійшло, оскільки виявилася проблема із груповим запуском завдань (див. група). На момент написання статті питання вже закрито, проте робота велася протягом місяця. У будь-якому разі, автору удачі і всіх благ, тому що робочі штуки на лібі вже є ... загалом, справа в мені і для мене виявився інструмент сируватий. До того ж, у деяких завданнях було по 2-3 http-запити до різних сервісів, таким чином навіть при оптимізації завдань ми створюємо 4 тисячі tcp з'єднань, приблизно кожні 2 години – не дуже… Хотілося б створювати сесію на один тип завдань під час запуску воркерів. Трохи докладніше про велику кількість запитів через aiohttp тут.

У зв'язку з цим я почав шукати альтернативи і знайшов! Творцями celery, а саме, як я зрозумів Ask Solem, була створена Фауст, спочатку для проекту Робін Гуд. Faust написана під враженням від Kafka Streams і працює з Kafka як брокер, також для зберігання результатів від роботи агентів використовується rocksdb, а найголовніше - це те, що бібліотека асинхронна.

Також, можете глянути коротке порівняння celery та faust від творців останньої: їх відмінності, відмінності брокерів, реалізацію елементарного завдання. Все дуже просто, однак, у Faust привертає увагу приємна особливість - типізовані дані для передачі в топік.

Що будемо робити?

Отже, у невеликій серії статей я покажу, як збирати дані у фонових задачах за допомогою Faust. Джерелом для нашого приклад-проекту буде, як випливає з назви, alphavantage.co. Я продемонструю, як писати агентів (sink, топіки, партиції), як робити регулярне (cron) виконання, найзручніші cli-команди faust (обгортка над click), простий кластеринг, а в кінці прикрутимо datadog (працюючий з коробки) і спробуємо, що -небудь побачити. Для зберігання зібраних даних будемо використовувати mongodb та motor для підключення.

PS Судячи з впевненості, з якою написано пункт про моніторинг, думаю, що читач наприкінці останньої статті все-таки виглядатиме так:

Фонові завдання Faust, Частина I: Введення

Вимоги до проекту

У зв'язку з тим, що я вже встиг наобіцяти, складемо невеликий списочок того, що маю вміти сервіс:

  1. Вивантажувати цінні папери та overview за ними (в т.ч. прибутки та збитки, баланс, cash flow – за останній рік) – регулярно
  2. Вивантажувати історичні дані (для кожного торгового року знаходити екстремуми ціни закриття торгів) регулярно
  3. Вивантажувати останні торговельні дані – регулярно
  4. Вивантажувати налаштований список індикаторів для кожного цінного паперу регулярно

Як належить, вибираємо ім'я проекту зі стелі: Хортон

Готуємо інфраструктуру

Заголовок звичайно сильний, однак, все, що потрібно зробити - це написати невеликий конфіг для docker-compose з kafka (і zookeeper - в одному контейнері), kafdrop (якщо нам захочеться подивитися повідомлення в топіках), mongodb. Отримуємо [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) наступного виду:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Тут загалом нічого складного. Для kafka оголосили два listener'а: одного (internal) для використання всередині композної мережі, а другого (external) для запитів ззовні, тому прокинули його назовні. 2181 - порт zookeeper'а. По іншому, я думаю, ясно.

Готуємо скелет проекту

У базовому варіанті структура нашого проекту має виглядати так:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Все, що я відзначив ми поки що не чіпаємо, а просто створюємо порожні файли.**

Створили структуру. Тепер додамо необхідні залежності, напишемо конфіг та підключення до mongodb. Повний текст файлів наводити у статті не буду, щоб не затягувати, а зроблю посилання на потрібні версії.

Почнемо з залежностей і мета про проект pyproject.toml

Далі, запускаємо установку залежностей та створення virtualenv (або, можете самі створити папку venv та активувати оточення):

pip3 install poetry (если ещё не установлено)
poetry install

Тепер створимо config.yml — креди і куди стукати. Відразу туди можна розмістити і дані для Alphavantage. Ну і переходимо до config.py - Витягуємо дані для програми з нашого конфігу. Так, каюся, заюзав свою лібу. sitri.

За підключенням з монго - все просто. Оголосили клас клієнта для підключення та базовий клас для крудів, щоб простіше було робити запити з колекцій.

Що буде далі?

Стаття вийшла не дуже великою, бо тут я говорю лише про мотивацію та підготовку, тому не зневажте — обіцяю, що в наступній частині буде екшн та графіка.

Отже, а в цій самій наступній частині ми:

  1. Напишемо невеликий клієнтик для alphavantage на aiohttp із запитами на потрібні нам ендпоінти.
  2. Зробимо агента, який збиратиме дані про цінні папери та історичні ціни за ними.

Код проекту

Код цієї частини

Джерело: habr.com

Купити надійний хостинг для сайтів із захистом від DDoS, VPS VDS сервери 🔥 Купити надійний хостинг для сайтів із захистом від DDoS, VPS VDS сервери | ProHoster