Background Tasks on Faust, Part I: An Introduction

Background Tasks on Faust, Part I: An Introduction

How did I get to this life?

Not so long ago, I had to work on the backend of a highly loaded project, in which it was necessary to organize the regular execution of a large number of background tasks with complex calculations and requests for third-party services. The project is asynchronous and before I came, it had a simple mechanism for cron-launching tasks: a loop with checking the current time and launching groups of coroutines through gather - this approach turned out to be acceptable until there were tens and hundreds of such coroutines, however, when their number exceeded two thousand, I had to think about organizing a normal task queue with a broker, several workers, and so on.

First, I decided to try out Celery, which I used earlier. Due to the asynchrony of the project, I dived into the question and saw ArticleAs well as project, created by the author of the article.

I will say this, the project is very interesting and works quite successfully in other applications of our team, and the author himself says that he was able to roll it out to production by using an asynchronous pool. But, unfortunately, it did not suit me very well, as it turned out problem with group launch of tasks (see group). At the time of writing issue already closed, however, the work was carried out for a month. In any case, good luck to the author and all the best, since there are already working things on the lib ... in general, it's me and the tool turned out to be damp for me. In addition, in some tasks there were 2-3 http requests to different services, so even when optimizing tasks, we create 4 thousand tcp connections, approximately every 2 hours - not very ... I would like to create a session for one type of tasks when starting workers. A little more about a large number of requests via aiohttp here.

As a result, I started looking alternatives and found! The creators of celery, specifically, as I understand it Ask Solem, was created Faust, initially for the project robinhood. Faust was inspired by Kafka Streams and works with Kafka as a broker, rocksdb is also used to store the results from the work of agents, and most importantly, the library is asynchronous.

Also, you can look brief comparison celery and faust from the creators of the latter: their differences, differences between brokers, the implementation of an elementary task. Everything is quite simple, however, a nice feature in faust attracts attention - typed data for transfer to the topic.

What do we do?

So, in a small series of articles, I'll show you how to collect data in background tasks using Faust. The source for our sample project will be, as the name suggests, alphavantage.com. I will demonstrate how to write agents (sink, topics, partitions), how to do regular (cron) execution, the most convenient cli commands faust (wrapper over click), simple clustering, and at the end we will fasten the datadog (working out of the box) and try that see anything. To store the collected data, we will use mongodb and motor to connect.

PS Judging by the confidence with which the paragraph about monitoring was written, I think that the reader at the end of the last article will still look something like this:

Background Tasks on Faust, Part I: An Introduction

Project Requirements

Due to the fact that I have already managed to promise, let's make a small list of what the service should be able to do:

  1. Upload securities and an overview of them (including profit and loss, balance sheet, cash flow - for the last year) - regularly
  2. Upload historical data (for each trading year, find extremums of the closing price of trading) - regularly
  3. Upload the latest trading data - regularly
  4. Upload a customized list of indicators for each security - regularly

As expected, we choose the name of the project from the ceiling: horton

We are preparing the infrastructure

The title is certainly strong, however, all that needs to be done is to write a small config for docker-compose with kafka (and zookeeper - in the same container), kafdrop (if we want to see messages in topics), mongodb. We get [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) of the following form:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

There is nothing complicated at all. For kafka, two listeners were declared: one (internal) for use inside the composite network, and the second (external) for requests from outside, so they forwarded it outside. 2181 is zookeeper's port. For the rest, I think it's clear.

Preparing the skeleton of the project

In the basic version, the structure of our project should look like this:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*All that I noted we don't touch it yet, we just create empty files.**

Created a structure. Now let's add the necessary dependencies, write the config and connect to mongodb. I will not give the full text of the files in the article, so as not to delay, but I will make links to the necessary versions.

Let's start with dependencies and meta about the project − pyproject.toml

Next, we run the installation of dependencies and the creation of virtualenv (or, you can create the venv folder yourself and activate the environment):

pip3 install poetry (если ещё не установлено)
poetry install

Now let's create config.yml — credits and where to knock. You can immediately place data for alphavantage there. Well, let's move on to config.py - we extract data for the application from our config. Yes, I confess, I used my lib - sitri.

By connecting with mongo, everything is quite simple. Announced client class to connect and base class for cruds to make it easier to query collections.

What will happen next?

The article turned out to be not very long, since here I am only talking about motivation and preparation, so do not blame me - I promise that in the next part there will be action and graphics.

So, in this very next part we:

  1. Let's write a small client for alphavantage on aiohttp with requests for the endpoints we need.
  2. Let's make an agent that will collect data on securities and historical prices for them.

Project Code

Code for this part

Source: habr.com

Add a comment