《浮士德》的背景任務,第一部分:簡介

《浮士德》的背景任務,第一部分:簡介

我怎麼過這樣的生活呢?

不久前,我要從事一個高負載專案的後端工作,其中需要組織大量後台任務的定期執行,這些任務具有複雜的計算和對第三方服務的請求。 該專案是異步的,在我來之前,它有一個用於cron 啟動任務的簡單機制:一個循環檢查當前時間並通過收集啟動協程組- 這種方法被證明是可以接受的,直到有數十個和數百個這樣的協程然而,當他們的數量超過兩千時,我不得不考慮組織一個正常的任務隊列,有一個broker,幾個worker等等。

首先,我決定嘗試我以前用過的 Celery。 由於該專案的非同步性質,我深入研究了這個問題並看到 一篇文章以及 項目,由文章作者創建。

我想說的是,這個專案非常有趣,並且在我們團隊的其他應用程式中運行得相當成功,作者本人表示,他能夠透過使用非同步池將其投入生產。 但不幸的是,事實證明它不適合我 一個問題 透過小組啟動任務(參見。 )。 在撰寫本文時 問題 已經關閉了,但是工作已經進行了一個月。 無論如何,祝作者好運,一切順利,因為庫上已經有了可用的東西……總的來說,重點在於我,而且這個工具對我來說很潮濕。 此外,某些任務對不同的服務有2-3 個http 請求,因此即使在優化任務時,我們也會創建4 個TCP 連接,大約每2 小時一次- 不太好......我想為一種類型的任務建立一個會話啟動工人時的任務。 更多關於 aiohttp 的大量請求的信息 這裡.

對此,我開始尋找 備擇方案 找到了! 芹菜的創造者,具體來說,據我所知 詢問索勒姆,被創建 “浮士德”,最初用於該項目 羅賓漢。 Faust 受到 Kafka Streams 的啟發,使用 Kafka 作為代理,rocksdb 也用於儲存代理工作的結果,最重要的是該程式庫是異步的。

另外,你還可以看看 快速比較 芹菜和浮士德來自後者的創造者:它們的差異,經紀人之間的差異,基本任務的實施。 一切都很簡單,然而,faust 中的一個很好的功能引起了人們的注意——用於傳輸到主題的類型化資料。

我們做什麼?

因此,在一系列簡短的文章中,我將向您展示如何使用 Faust 從後台任務收集資料。 顧名思義,我們範例專案的原始碼是: 阿爾法優勢公司。 我將示範如何編寫代理程式(接收器、主題、分區)、如何定期 (cron) 執行、最方便的 faust cli 命令(單擊的包裝器)、簡單的集群,最後我們將附加一個 datadog (開箱即用)並嘗試一些東西來看看。 為了儲存收集到的數據,我們將使用 mongodb 和 motor 進行連接。

PS 從關於監控的觀點寫的信心來看,我認為上一篇文章結尾處的讀者仍然會是這樣的:

《浮士德》的背景任務,第一部分:簡介

項目要求

由於我已經承諾過,讓我們列出一個服務應該能夠執行的操作的小清單:

  1. 定期上傳證券及其概覽(包括去年的損益表、資產負債表、現金流量)
  2. 上傳歷史資料(針對每個交易年度,找出交易收盤價的極值)-定期
  3. 定期上傳最新交易數據
  4. 定期上傳每個證券的客製化指標列表

正如預期的那樣,我們從頭開始為專案選擇一個名稱: 霍頓

我們正在準備基礎設施

這個標題確實很強大,但是,您需要做的就是為 docker-compose 編寫一​​個小配置,其中包含 kafka(和 Zookeeper - 在一個容器中)、kafdrop(如果我們想查看主題中的消息)、mongodb。 我們得到 [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml)的形式如下:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

這裡根本沒有什麼複雜的。 為 kafka 聲明了兩個偵聽器:一個(內部)用於複合網路內部,第二個(外部)用於來自外部的請求,因此它們將其轉送到外部。 2181 — 動物園管理員端口。 我想,剩下的事情就很清楚了。

準備專案的框架

在基本版本中,我們專案的結構應該如下所示:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*我注意到的一切 我們還沒有觸及它,我們只是創建空文件。**

我們創建了一個結構。 現在讓我們新增必要的依賴項,編寫配置並連接到 mongodb。 我不會提供文章中文件的全文,以免耽誤時間,但我會提供必要版本的連結。

讓我們從專案的依賴項和元資料開始 - pyproject.toml

接下來,我們開始安裝依賴項並建立 virtualenv (或者您可以自己建立 venv 資料夾並啟動環境):

pip3 install poetry (если ещё не установлено)
poetry install

現在讓我們創建 配置文件 - 憑證和敲門的地方。 您可以立即將 alphavantage 的資料放在那裡。 好吧,讓我們繼續 設定檔 — 從我們的配置中提取應用程式的資料。 是的,我承認,我使用了我的庫 - 西特里.

連接到 Mongo 時,一切都非常簡單。 宣告 客戶類別 連接和 基底類 對於cruds,可以更輕鬆地對集合進行查詢。

接下來會發生什麼?

文章不是很長,因為這裡我只講動機和準備,所以不要怪我——我保證下一部分會有動作和圖形。

因此,在下一部分中我們:

  1. 讓我們在 aiohttp 上為 alphavantage 編寫一​​個小型客戶端,並請求我們需要的端點。
  2. 讓我們建立一個代理人來收集證券及其歷史價格的資料。

專案程式碼

這部分的程式碼

來源: www.habr.com

添加評論