どうして私はこのように生きるようになったのでしょうか?
少し前まで、私は負荷の高いプロジェクトのバックエンドで作業する必要がありました。その中で、複雑な計算やサードパーティ サービスのリクエストを伴う多数のバックグラウンド タスクの定期的な実行を整理する必要がありました。 このプロジェクトは非同期であり、私が来る前は、cron 起動タスク用の単純なメカニズムがありました。つまり、ループで現在時刻をチェックし、収集を介してコルーチンのグループを起動します。このアプローチは、そのようなコルーチンが数十、数百になるまで受け入れられることが判明しました。ただし、その数が XNUMX を超えると、ブローカーや数人のワーカーなどを使用して通常のタスク キューを編成することを考えなければなりませんでした。
まずは以前使っていたセロリを試してみることにしました。 プロジェクトの非同期的な性質のため、質問を詳しく調べて見ました。
これだけは言っておきますが、このプロジェクトは非常に興味深いもので、私たちのチームの他のアプリケーションでも非常にうまく機能しており、作者自身も、非同期プールを使用することでこのプロジェクトを本番環境に展開することができたと述べています。 しかし、残念ながら、結局のところ、それは私にはあまり合わなかった
この点に関して、私は探し始めました 代替案 そして見つけました! 私の理解では、特にセロリの生産者
また、見ることもできます
私たちは何をしますか?
そこで、短い一連の記事で、Faust を使用してバックグラウンド タスクからデータを収集する方法を説明します。 サンプルプロジェクトのソースは、名前が示すように、次のようになります。
PS 監視に関する要点が自信を持って書かれたことから判断すると、前回の記事の最後を読んだ読者はまだ次のように見えると思います。
プロジェクトの要件
すでに約束したので、このサービスでできることの小さなリストを作成しましょう。
- 有価証券とその概要 (過去 XNUMX 年間の損益、貸借対照表、キャッシュ フローを含む) を定期的にアップロードします。
- 履歴データを定期的にアップロードします (取引年ごとに、取引の終値の極値を見つけます)。
- 最新の取引データを定期的にアップロードします
- 各証券のカスタマイズされた指標リストを定期的にアップロードします
予想どおり、プロジェクトの名前を最初から選択します。 ホートン
インフラを準備中です
タイトルは確かに強力ですが、必要なのは、kafka (および XNUMX つのコンテナー内の Zookeeper)、kafdrop (トピック内のメッセージを確認したい場合)、mongodb を使用して docker-compose 用の小さな構成を作成することだけです。 我々が得る [docker-compose.yml]](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
ここでは複雑なことは何もありません。 kafka に対して 2181 つのリスナーが宣言されました。XNUMX つは複合ネットワーク内で使用するためのリスナー (内部)、もう XNUMX つは外部からのリクエスト用 (外部) であるため、リスナーは外部に転送されました。 XNUMX — 飼育員のポート。 残りは明らかだと思います。
プロジェクトの骨子を準備する
基本バージョンでは、プロジェクトの構造は次のようになります。
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*私がメモしたものすべて まだ触っていません。空のファイルを作成するだけです。**
私たちは構造を作りました。 次に、必要な依存関係を追加し、構成を記述して、mongodb に接続しましょう。 記事を遅らせないように、この記事ではファイルの全文は提供しませんが、必要なバージョンへのリンクを提供します。
プロジェクトの依存関係とメタから始めましょう -
次に、依存関係のインストールと virtualenv の作成を開始します (または、自分で venv フォルダーを作成して環境をアクティブ化することもできます)。
pip3 install poetry (если ещё не установлено)
poetry install
さあ、作成しましょう
Mongo に接続するときは、すべてが非常に簡単です。 発表された
次に何が起こるのだろうか?
この記事はそれほど長くありません。ここではモチベーションと準備についてのみ話しているので、私を責めないでください。次の部分にはアクションとグラフィックスがあることをお約束します。
したがって、この次のパートでは次のことを行います。
- 必要なエンドポイントに対するリクエストを含む aiohttp 上の alphavantage 用の小さなクライアントを作成してみましょう。
- 有価証券とその過去の価格に関するデータを収集するエージェントを作成しましょう。
出所: habr.com