Основни задачи върху Фауст, част I: Въведение

Основни задачи върху Фауст, част I: Въведение

Как успях да живея така?

Неотдавна трябваше да работя върху бекенда на силно натоварен проект, в който беше необходимо да организирам редовното изпълнение на голям брой фонови задачи със сложни изчисления и заявки за услуги на трети страни. Проектът е асинхронен и преди да дойда, имаше прост механизъм за задачи за стартиране на cron: цикъл, проверяващ текущото време и стартиращ групи от съпрограмми чрез gather - този подход се оказа приемлив, докато не се появиха десетки и стотици такива съпрограми , обаче, когато броят им надхвърли две хиляди, трябваше да помисля за организиране на нормална опашка със задачи с брокер, няколко работници и т.н.

Първо реших да пробвам Целина, която използвах преди. Поради асинхронния характер на проекта се потопих във въпроса и видях СтатияКакто и проект, създаден от автора на статията.

Ще кажа това, проектът е много интересен и работи доста успешно в други приложения на нашия екип, а самият автор казва, че е успял да го внедри в производство с помощта на асинхронен пул. Но, за съжаление, не ми подхождаше, както се оказа проблем с групово стартиране на задачи (вж. група). Към момента на писане издаване вече е затворен, но се работи от месец. Във всеки случай успех на автора и всичко най-добро, тъй като вече има работещи неща на lib ... общо взето, въпросът е в мен и инструментът се оказа влажен за мен. Освен това някои задачи имаха 2-3 http заявки към различни услуги, така че дори когато оптимизираме задачи, създаваме 4 хиляди TCP връзки, приблизително на всеки 2 часа - не е много добре... Бих искал да създам сесия за един тип задача при стартиране на работници. Малко повече за големия брой заявки през aiohttp тук.

В тази връзка започнах да търся алтернативи и го намери! Създателите на целина, по-специално, както разбирам Питай Солем, беше създаден Фауст, първоначално за проекта robinhood. Faust е вдъхновен от Kafka Streams и работи с Kafka като брокер, rocksdb се използва и за съхраняване на резултати от работата на агентите и най-важното е, че библиотеката е асинхронна.

Освен това можете да погледнете бързо сравнение целина и фауст от създателите на последния: техните различия, различия между брокери, изпълнение на елементарна задача. Всичко е доста просто, но хубава функция във faust привлича вниманието - въведени данни за предаване към темата.

И какво ще правим?

И така, в кратка поредица от статии ще ви покажа как да събирате данни от фонови задачи с помощта на Faust. Източникът за нашия примерен проект ще бъде, както подсказва името, alphavantage.co. Ще демонстрирам как се пишат агенти (sink, topics, partitions), как се прави редовно (cron) изпълнение, най-удобните faust cli команди (обвивка върху щракване), просто клъстериране и накрая ще прикачим datadog ( работа извън кутията) и се опитайте да видите нещо. За съхраняване на събраните данни ще използваме mongodb и motor за връзка.

PS Съдейки по увереността, с която е написана точката за мониторинга, мисля, че читателят в края на последната статия ще изглежда така:

Основни задачи върху Фауст, част I: Въведение

Изисквания към проекта

Поради факта, че вече обещах, нека направим малък списък на това, което услугата трябва да може да прави:

  1. Качване на ценни книжа и преглед на тях (включително печалби и загуби, баланс, паричен поток - за последната година) - редовно
  2. Качете исторически данни (за всяка търговска година, намерете екстремни стойности на цената на затваряне на търговията) - редовно
  3. Качвайте най-новите данни за търговия - редовно
  4. Качвайте персонализиран списък с индикатори за всяка ценна книга - редовно

Както се очакваше, ние избираме име за проекта от нулата: Хортън

Подготвяме инфраструктурата

Заглавието определено е силно, но всичко, което трябва да направите, е да напишете малка конфигурация за docker-compose с kafka (и zookeeper - в един контейнер), kafdrop (ако искаме да гледаме съобщенията в темите), mongodb. Получаваме [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) от следната форма:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Тук изобщо няма нищо сложно. Два слушателя бяха декларирани за kafka: един (вътрешен) за използване вътре в композитната мрежа, а вторият (външен) за заявки отвън, така че те го препратиха навън. 2181 — пристанище за пазачи в зоологическата градина. Останалото, мисля, е ясно.

Изготвяне на скелета на проекта

В основната версия структурата на нашия проект трябва да изглежда така:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Всичко, което отбелязах Все още не го докосваме, просто създаваме празни файлове.**

Създадохме структура. Сега нека добавим необходимите зависимости, да напишем конфигурацията и да се свържем с mongodb. Няма да предоставя пълния текст на файловете в статията, за да не я забавя, но ще дам връзки към необходимите версии.

Нека започнем със зависимости и мета за проекта - pyproject.toml

След това започваме да инсталираме зависимости и да създаваме virtualenv (или можете сами да създадете папката venv и да активирате средата):

pip3 install poetry (если ещё не установлено)
poetry install

Сега да творим config.yml - Пълномощия и къде да чукам. Можете веднага да поставите данни за alphavantage там. Е, нека да преминем към config.py — извличане на данни за приложението от нашата конфигурация. Да, признавам си, използвах моя либ - sitri.

Когато се свързвате с Mongo, всичко е доста просто. обяви клиентски клас за свързване и базов клас за cruds, за да улесните правенето на заявки за колекции.

Какво ще стане след това?

Статията не е много дълга, тъй като тук говоря само за мотивация и подготовка, така че не ме винете - обещавам, че следващата част ще има екшън и графики.

И така, в тази съвсем следваща част ние:

  1. Нека напишем малък клиент за alphavantage на aiohttp със заявки за крайните точки, от които се нуждаем.
  2. Нека създадем агент, който ще събира данни за ценни книжа и исторически цени за тях.

Код на проекта

Код за тази част

Източник: www.habr.com

Добавяне на нов коментар