How did I get to this life?
Not so long ago, I had to work on the backend of a highly loaded project, in which it was necessary to organize the regular execution of a large number of background tasks with complex calculations and requests for third-party services. The project is asynchronous and before I came, it had a simple mechanism for cron-launching tasks: a loop with checking the current time and launching groups of coroutines through gather - this approach turned out to be acceptable until there were tens and hundreds of such coroutines, however, when their number exceeded two thousand, I had to think about organizing a normal task queue with a broker, several workers, and so on.
First, I decided to try out Celery, which I used earlier. Due to the asynchrony of the project, I dived into the question and saw
I will say this, the project is very interesting and works quite successfully in other applications of our team, and the author himself says that he was able to roll it out to production by using an asynchronous pool. But, unfortunately, it did not suit me very well, as it turned out
As a result, I started looking alternatives and found! The creators of celery, specifically, as I understand it
Also, you can look
What do we do?
So, in a small series of articles, I'll show you how to collect data in background tasks using Faust. The source for our sample project will be, as the name suggests,
PS Judging by the confidence with which the paragraph about monitoring was written, I think that the reader at the end of the last article will still look something like this:
Project Requirements
Due to the fact that I have already managed to promise, let's make a small list of what the service should be able to do:
- Upload securities and an overview of them (including profit and loss, balance sheet, cash flow - for the last year) - regularly
- Upload historical data (for each trading year, find extremums of the closing price of trading) - regularly
- Upload the latest trading data - regularly
- Upload a customized list of indicators for each security - regularly
As expected, we choose the name of the project from the ceiling: horton
We are preparing the infrastructure
The title is certainly strong, however, all that needs to be done is to write a small config for docker-compose with kafka (and zookeeper - in the same container), kafdrop (if we want to see messages in topics), mongodb. We get [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
There is nothing complicated at all. For kafka, two listeners were declared: one (internal) for use inside the composite network, and the second (external) for requests from outside, so they forwarded it outside. 2181 is zookeeper's port. For the rest, I think it's clear.
Preparing the skeleton of the project
In the basic version, the structure of our project should look like this:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*All that I noted we don't touch it yet, we just create empty files.**
Created a structure. Now let's add the necessary dependencies, write the config and connect to mongodb. I will not give the full text of the files in the article, so as not to delay, but I will make links to the necessary versions.
Let's start with dependencies and meta about the project −
Next, we run the installation of dependencies and the creation of virtualenv (or, you can create the venv folder yourself and activate the environment):
pip3 install poetry (если ещё не установлено)
poetry install
Now let's create
By connecting with mongo, everything is quite simple. Announced
What will happen next?
The article turned out to be not very long, since here I am only talking about motivation and preparation, so do not blame me - I promise that in the next part there will be action and graphics.
So, in this very next part we:
- Let's write a small client for alphavantage on aiohttp with requests for the endpoints we need.
- Let's make an agent that will collect data on securities and historical prices for them.
Source: habr.com