我怎麼過這樣的生活呢?
不久前,我要從事一個高負載專案的後端工作,其中需要組織大量後台任務的定期執行,這些任務具有複雜的計算和對第三方服務的請求。 該專案是異步的,在我來之前,它有一個用於cron 啟動任務的簡單機制:一個循環檢查當前時間並通過收集啟動協程組- 這種方法被證明是可以接受的,直到有數十個和數百個這樣的協程然而,當他們的數量超過兩千時,我不得不考慮組織一個正常的任務隊列,有一個broker,幾個worker等等。
首先,我決定嘗試我以前用過的 Celery。 由於該專案的非同步性質,我深入研究了這個問題並看到
我想說的是,這個專案非常有趣,並且在我們團隊的其他應用程式中運行得相當成功,作者本人表示,他能夠透過使用非同步池將其投入生產。 但不幸的是,事實證明它不適合我
對此,我開始尋找 備擇方案 找到了! 芹菜的創造者,具體來說,據我所知
另外,你還可以看看
我們做什麼?
因此,在一系列簡短的文章中,我將向您展示如何使用 Faust 從後台任務收集資料。 顧名思義,我們範例專案的原始碼是:
PS 從關於監控的觀點寫的信心來看,我認為上一篇文章結尾處的讀者仍然會是這樣的:
項目要求
由於我已經承諾過,讓我們列出一個服務應該能夠執行的操作的小清單:
- 定期上傳證券及其概覽(包括去年的損益表、資產負債表、現金流量)
- 上傳歷史資料(針對每個交易年度,找出交易收盤價的極值)-定期
- 定期上傳最新交易數據
- 定期上傳每個證券的客製化指標列表
正如預期的那樣,我們從頭開始為專案選擇一個名稱: 霍頓
我們正在準備基礎設施
這個標題確實很強大,但是,您需要做的就是為 docker-compose 編寫一個小配置,其中包含 kafka(和 Zookeeper - 在一個容器中)、kafdrop(如果我們想查看主題中的消息)、mongodb。 我們得到 [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
這裡根本沒有什麼複雜的。 為 kafka 聲明了兩個偵聽器:一個(內部)用於複合網路內部,第二個(外部)用於來自外部的請求,因此它們將其轉送到外部。 2181 — 動物園管理員端口。 我想,剩下的事情就很清楚了。
準備專案的框架
在基本版本中,我們專案的結構應該如下所示:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*我注意到的一切 我們還沒有觸及它,我們只是創建空文件。**
我們創建了一個結構。 現在讓我們新增必要的依賴項,編寫配置並連接到 mongodb。 我不會提供文章中文件的全文,以免耽誤時間,但我會提供必要版本的連結。
讓我們從專案的依賴項和元資料開始 -
接下來,我們開始安裝依賴項並建立 virtualenv (或者您可以自己建立 venv 資料夾並啟動環境):
pip3 install poetry (если ещё не установлено)
poetry install
現在讓我們創建
連接到 Mongo 時,一切都非常簡單。 宣告
接下來會發生什麼?
文章不是很長,因為這裡我只講動機和準備,所以不要怪我——我保證下一部分會有動作和圖形。
因此,在下一部分中我們:
- 讓我們在 aiohttp 上為 alphavantage 編寫一個小型客戶端,並請求我們需要的端點。
- 讓我們建立一個代理人來收集證券及其歷史價格的資料。
來源: www.habr.com