ファウストに関するバックグラウンド タスク、パート I: はじめに

ファウストに関するバックグラウンド タスク、パート I: はじめに

どうして私はこのように生きるようになったのでしょうか?

少し前まで、私は負荷の高いプロジェクトのバックエンドで作業する必要がありました。その中で、複雑な計算やサードパーティ サービスのリクエストを伴う多数のバックグラウンド タスクの定期的な実行を整理する必要がありました。 このプロジェクトは非同期であり、私が来る前は、cron 起動タスク用の単純なメカニズムがありました。つまり、ループで現在時刻をチェックし、収集を介してコルーチンのグループを起動します。このアプローチは、そのようなコルーチンが数十、数百になるまで受け入れられることが判明しました。ただし、その数が XNUMX を超えると、ブローカーや数人のワーカーなどを使用して通常のタスク キューを編成することを考えなければなりませんでした。

まずは以前使っていたセロリを試してみることにしました。 プロジェクトの非同期的な性質のため、質問を詳しく調べて見ました。 статью及び プロジェクト、記事の著者によって作成されました。

これだけは言っておきますが、このプロジェクトは非常に興味深いもので、私たちのチームの他のアプリケーションでも非常にうまく機能しており、作者自身も、非同期プールを使用することでこのプロジェクトを本番環境に展開することができたと述べています。 しかし、残念ながら、結局のところ、それは私にはあまり合わなかった 問題 タスクのグループ起動を使用します (「. グループヘッド)。 執筆時点では 問題 すでに閉店していますが、作業は2か月続いています。 いずれにせよ、作者の幸運と幸運を祈ります。ライブラリにはすでに動作するものがあるので...一般的に、ポイントは私にあり、ツールは私にとって湿ったものであることが判明しました。 さらに、一部のタスクには異なるサービスへの 3 ~ 4 の http リクエストが含まれていたため、タスクを最適化する場合でも、約 2 時間ごとに XNUMX の TCP 接続が作成されます。これはあまり良いことではありません... XNUMX 種類のセッションを作成したいと考えています。ワーカーを開始するときのタスク。 aiohttp 経由の大量のリクエストについてもう少し詳しく ここで.

この点に関して、私は探し始めました 代替案 そして見つけました! 私の理解では、特にセロリの生産者 ソーレムに聞く、 作成されました Faust、元々はプロジェクト用でした ロビンフッド。 Faust は Kafka Streams からインスピレーションを受けており、Kafka をブローカーとして使用します。rocksdb はエージェントの作業結果を保存するためにも使用されます。最も重要なことは、ライブラリが非同期であることです。

また、見ることもできます 簡単な比較 セロリとファウストの作成者によるもの: それらの違い、ブローカー間の違い、基本的なタスクの実装。 すべては非常に単純ですが、faust の優れた機能が注目を集めています。それは、トピックに送信するための型付きデータです。

私たちは何をしますか?

そこで、短い一連の記事で、Faust を使用してバックグラウンド タスクからデータを収集する方法を説明します。 サンプルプロジェクトのソースは、名前が示すように、次のようになります。 アルファバンテージ.co。 エージェント (シンク、トピック、パーティション) の作成方法、通常の (cron) 実行の方法、最も便利な faust cli コマンド (クリック上のラッパー)、単純なクラスタリングを示し、最後に datadog (箱から出してすぐに使える)、何かを見てみましょう。 収集したデータを保存するには、接続に mongodb と motor を使用します。

PS 監視に関する要点が自信を持って書かれたことから判断すると、前回の記事の最後を読んだ読者はまだ次のように見えると思います。

ファウストに関するバックグラウンド タスク、パート I: はじめに

プロジェクトの要件

すでに約束したので、このサービスでできることの小さなリストを作成しましょう。

  1. 有価証券とその概要 (過去 XNUMX 年間の損益、貸借対照表、キャッシュ フローを含む) を定期的にアップロードします。
  2. 履歴データを定期的にアップロードします (取引年ごとに、取引の終値の極値を見つけます)。
  3. 最新の取引データを定期的にアップロードします
  4. 各証券のカスタマイズされた指標リストを定期的にアップロードします

予想どおり、プロジェクトの名前を最初から選択します。 ホートン

インフラを準備中です

タイトルは確かに強力ですが、必要なのは、kafka (および XNUMX つのコンテナー内の Zookeeper)、kafdrop (トピック内のメッセージを確認したい場合)、mongodb を使用して docker-compose 用の小さな構成を作成することだけです。 我々が得る [docker-compose.yml]](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) 次の形式:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

ここでは複雑なことは何もありません。 kafka に対して 2181 つのリスナーが宣言されました。XNUMX つは複合ネットワーク内で使用するためのリスナー (内部)、もう XNUMX つは外部からのリクエスト用 (外部) であるため、リスナーは外部に転送されました。 XNUMX — 飼育員のポート。 残りは明らかだと思います。

プロジェクトの骨子を準備する

基本バージョンでは、プロジェクトの構造は次のようになります。

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*私がメモしたものすべて まだ触っていません。空のファイルを作成するだけです。**

私たちは構造を作りました。 次に、必要な依存関係を追加し、構成を記述して、mongodb に接続しましょう。 記事を遅らせないように、この記事ではファイルの全文は提供しませんが、必要なバージョンへのリンクを提供します。

プロジェクトの依存関係とメタから始めましょう - pyproject.toml

次に、依存関係のインストールと virtualenv の作成を開始します (または、自分で venv フォルダーを作成して環境をアクティブ化することもできます)。

pip3 install poetry (если ещё не установлено)
poetry install

さあ、作成しましょう config.yml - 資格情報とノックする場所。 そこに alphavantage 用のデータをすぐに配置できます。 さて、次に進みましょう config.py — 構成からアプリケーションのデータを抽出します。 はい、告白します、私は自分のライブラリを使いました - シトリ.

Mongo に接続するときは、すべてが非常に簡単です。 発表された クライアントクラス 接続して、 基本クラス クラッドの場合、コレクションに対するクエリを簡単に作成できるようにします。

次に何が起こるのだろうか?

この記事はそれほど長くありません。ここではモチベーションと準備についてのみ話しているので、私を責めないでください。次の部分にはアクションとグラフィックスがあることをお約束します。

したがって、この次のパートでは次のことを行います。

  1. 必要なエンドポイントに対するリクエストを含む aiohttp 上の alphavantage 用の小さなクライアントを作成してみましょう。
  2. 有価証券とその過去の価格に関するデータを収集するエージェントを作成しましょう。

プロジェクトコード

この部分のコード

出所: habr.com

コメントを追加します