
Як я дійшов до такого життя?
Нещодавно мені довелося працювати над бекендом високо навантаженого проекту, в якому потрібно було організувати регулярне виконання великої кількості фонових завдань зі складними обчисленнями та запитами на сторонні сервіси. Проект асинхронний і до того, як я прийшов, у ньому був простий механізм крон-запуску завдань: цикл з перевіркою поточного часу і запуск груп корутин через gather - такий підхід виявився прийнятним до моменту, поки таких корутин були десятки і сотні, проте коли їхня кількість перевалила через дві тисячі, довелося думати про організацію нормальної черги завдань з брокером, кількома воркерами та іншим.
Спочатку я вирішив випробувати Celery, яким я користувався раніше. У зв'язку з асинхронністю проекту, я поринув у запитання та побачив , А так само , створений автором статті
Скажу так, проект дуже цікавий і успішно працює в інших додатках нашої команди, та й сам автор говорить про те, що зміг викотити в прод, заюзавши асинхронний пул. Але, на жаль, мені це не дуже підійшло, оскільки виявилася із груповим запуском завдань (див. ). На момент написання статті вже закрито, проте робота велася протягом місяця. У будь-якому разі, автору удачі і всіх благ, тому що робочі штуки на лібі вже є ... загалом, справа в мені і для мене виявився інструмент сируватий. До того ж, у деяких завданнях було по 2-3 http-запити до різних сервісів, таким чином навіть при оптимізації завдань ми створюємо 4 тисячі tcp з'єднань, приблизно кожні 2 години – не дуже… Хотілося б створювати сесію на один тип завдань під час запуску воркерів. Трохи докладніше про велику кількість запитів через aiohttp .
У зв'язку з цим я почав шукати альтернативи і знайшов! Творцями celery, а саме, як я зрозумів , була створена , спочатку для проекту . Faust написана під враженням від Kafka Streams і працює з Kafka як брокер, також для зберігання результатів від роботи агентів використовується rocksdb, а найголовніше - це те, що бібліотека асинхронна.
Також, можете глянути celery та faust від творців останньої: їх відмінності, відмінності брокерів, реалізацію елементарного завдання. Все дуже просто, однак, у Faust привертає увагу приємна особливість - типізовані дані для передачі в топік.
Що будемо робити?
Отже, у невеликій серії статей я покажу, як збирати дані у фонових задачах за допомогою Faust. Джерелом для нашого приклад-проекту буде, як випливає з назви, . Я продемонструю, як писати агентів (sink, топіки, партиції), як робити регулярне (cron) виконання, найзручніші cli-команди faust (обгортка над click), простий кластеринг, а в кінці прикрутимо datadog (працюючий з коробки) і спробуємо, що -небудь побачити. Для зберігання зібраних даних будемо використовувати mongodb та motor для підключення.
PS Судячи з впевненості, з якою написано пункт про моніторинг, думаю, що читач наприкінці останньої статті все-таки виглядатиме так:

Вимоги до проекту
У зв'язку з тим, що я вже встиг наобіцяти, складемо невеликий списочок того, що маю вміти сервіс:
- Вивантажувати цінні папери та overview за ними (в т.ч. прибутки та збитки, баланс, cash flow – за останній рік) – регулярно
- Вивантажувати історичні дані (для кожного торгового року знаходити екстремуми ціни закриття торгів) регулярно
- Вивантажувати останні торговельні дані – регулярно
- Вивантажувати налаштований список індикаторів для кожного цінного паперу регулярно
Як належить, вибираємо ім'я проекту зі стелі: Хортон
Готуємо інфраструктуру
Заголовок звичайно сильний, однак, все, що потрібно зробити - це написати невеликий конфіг для docker-compose з kafka (і zookeeper - в одному контейнері), kafdrop (якщо нам захочеться подивитися повідомлення в топіках), mongodb. Отримуємо [docker-compose.yml]() наступного виду:
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-serviceТут загалом нічого складного. Для kafka оголосили два listener'а: одного (internal) для використання всередині композної мережі, а другого (external) для запитів ззовні, тому прокинули його назовні. 2181 - порт zookeeper'а. По іншому, я думаю, ясно.
Готуємо скелет проекту
У базовому варіанті структура нашого проекту має виглядати так:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py **Все, що я відзначив ми поки що не чіпаємо, а просто створюємо порожні файли.**
Створили структуру. Тепер додамо необхідні залежності, напишемо конфіг та підключення до mongodb. Повний текст файлів наводити у статті не буду, щоб не затягувати, а зроблю посилання на потрібні версії.
Почнемо з залежностей і мета про проект
Далі, запускаємо установку залежностей та створення virtualenv (або, можете самі створити папку venv та активувати оточення):
pip3 install poetry (если ещё не установлено)
poetry installТепер створимо — креди і куди стукати. Відразу туди можна розмістити і дані для Alphavantage. Ну і переходимо до - Витягуємо дані для програми з нашого конфігу. Так, каюся, заюзав свою лібу. .
За підключенням з монго - все просто. Оголосили для підключення та для крудів, щоб простіше було робити запити з колекцій.
Що буде далі?
Стаття вийшла не дуже великою, бо тут я говорю лише про мотивацію та підготовку, тому не зневажте — обіцяю, що в наступній частині буде екшн та графіка.
Отже, а в цій самій наступній частині ми:
- Напишемо невеликий клієнтик для alphavantage на aiohttp із запитами на потрібні нам ендпоінти.
- Зробимо агента, який збиратиме дані про цінні папери та історичні ціни за ними.
Джерело: habr.com
