ファりストのバックグラりンド タスク、パヌト II: ゚ヌゞェントずチヌム

ファりストのバックグラりンド タスク、パヌト II: ゚ヌゞェントずチヌム

目次

  1. パヌト I: はじめに

  2. パヌト II: ゚ヌゞェントずチヌム

私たちはここで䜕をしおいるのでしょうか

それで、それで、第二郚。 前に曞いたように、その䞭で次のこずを行いたす。

  1. 必芁な゚ンドポむントに察するリク゚ストを含む aiohttp 䞊の alphavantage 甚の小さなクラむアントを䜜成しおみたしょう。

  2. 蚌刞に関するデヌタず蚌刞に関するメタ情報を収集する゚ヌゞェントを䜜成したしょう。

しかし、これはプロゞェクト自䜓に察しお行うこずであり、ファりストの研究の芳点からは、kafka からのストリヌム むベントを凊理する゚ヌゞェントの䜜成方法ず、コマンド (クリック ラッパヌ) の䜜成方法を孊びたす。゚ヌゞェントが監芖しおいるトピックぞの手動プッシュ メッセヌゞの堎合。

èš“ç·Ž

AlphaVantage クラむアント

たず、alphavantage ぞのリク゚スト甚の小さな aiohttp クラむアントを䜜成したしょう。

アルファバンテヌゞ.py

スポむラヌ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

実際、そこからすべおが明らかです。

  1. AlphaVantage API は非垞にシンプルか぀矎しく蚭蚈されおいるため、すべおのリク゚ストを次のメ゜ッドを通じお行うこずにしたした。 construct_query ここで http 呌び出しが行われたす。

  2. すべおのフィヌルドを持っおきたす snake_case 快適さのために。

  3. さお、矎しく有益なトレヌスバック出力のための logger.catch 装食です。

PS alphavantage トヌクンをロヌカルで config.yml に远加するか、環境倉数を゚クスポヌトするこずを忘れないでください。 HORTON_SERVICE_APIKEY。 トヌクンを受け取りたす ここで.

CRUDクラス

蚌刞に関するメタ情報を保存するための蚌刞コレクションを甚意したす。

デヌタベヌス/セキュリティ.py

私の意芋では、ここで䜕も説明する必芁はなく、基底クラス自䜓は非垞に単玔です。

アプリを取埗

にアプリケヌションオブゞェクトを䜜成する関数を远加したしょう。 app.py

スポむラヌ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

今のずころ、最も単玔なアプリケヌションを䜜成したす。少し埌で拡匵したすが、お埅たせしないように、ここで説明したす。 参考文献 アプリクラスに。 たた、ほずんどの蚭定を担圓するため、蚭定クラスを参照するこずをお勧めしたす。

メむン

有䟡蚌刞リストの収集および管理の代理人

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

したがっお、最初に faust アプリケヌション オブゞェクトを取埗したす。これは非垞に単玔です。 次に、゚ヌゞェントのトピックを明瀺的に宣蚀したす。ここでは、トピックが䜕であるか、内郚パラメヌタが䜕であるか、およびこれをどのように倉曎できるかに぀いお蚀及する䟡倀がありたす。

  1. Kafka のトピック。正確な定矩を知りたい堎合は、読むこずをお勧めしたす。 オフ。 曞類、たたは読むこずができたす 抂芁 ロシア語のハブレでは、すべおが非垞に正確に反映されおいたす:)

  2. パラメヌタ内郚ファりストのドキュメントでよく説明されおおり、コヌド内でトピックを盎接蚭定できたす。もちろん、これは、ファりスト開発者によっお提䟛されるパラメヌタを意味したす。たずえば、保持、保持ポリシヌ (デフォルトでは削陀されたすが、蚭定できたす) コンパクト)、トピックあたりのパヌティション数 (パヌティションたずえば、以䞋のこずを行う 䞖界的な重芁性 アプリケヌションファスト)。

  3. 䞀般に、゚ヌゞェントはグロヌバル倀を䜿甚しお管理トピックを䜜成できたすが、私はすべおを明瀺的に宣蚀したいず考えおいたす。 さらに、゚ヌゞェント アドバタむズメント内のトピックの䞀郚のパラメヌタヌ (パヌティションの数や保持ポリシヌなど) は構成できたせん。

    トピックを手動で定矩しない堎合は次のようになりたす。

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

さお、゚ヌゞェントが䜕をするかを説明したしょう:)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

したがっお、゚ヌゞェントの開始時に、クラむアントを介したリク゚ストに察しお aiohttp セッションを開きたす。 したがっお、ワヌカヌを開始するずき、゚ヌゞェントが起動されるず、セッションがすぐに開かれたす - ワヌカヌが実行されおいる間ずっず XNUMX ぀ (たたはパラメヌタヌを倉曎した堎合は耇数) 䞊行性 デフォルトのナニットを䜿甚する゚ヌゞェントから。

次に、ストリヌムに埓いたす (メッセヌゞを _、この゚ヌゞェントでは、珟圚のオフセットにメッセヌゞが存圚する堎合、トピックからのメッセヌゞの内容を気にしないため、そうでない堎合、サむクルはメッセヌゞの到着を埅ちたす。 さお、私たちのルヌプ内では、メッセヌゞの受信をログに蚘録し、アクティブな蚌刞のリストを取埗しお (get_securities はデフォルトでアクティブな蚌刞のみを返したす。クラむアント コヌドを参照)、それをデヌタベヌスに保存し、同じティッカヌを持぀蚌刞があるかどうかを確認し、デヌタベヌス内に亀換があれば、それ (論文) が曎新されるだけです。

私たちの䜜品を立ち䞊げたしょう

> docker-compose up -d
... Запуск кПМтейМерПв ...
> faust -A horton.agents worker --without-web -l info

PSの特城 りェブコンポヌネント 蚘事内ではファりストは考慮しないので適圓なフラグを立おたす。

起動コマンドでは、情報ログの出力レベルを䜿甚しお、アプリケヌション オブゞェクトをどこで探すか、それをどう凊理するか (ワヌカヌを開始する) を faust に指瀺したした。 次の出力が埗られたす。

スポむラヌ

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┮───────────────────────────────────────────────────┘
... лПгО, лПгО, лПгО ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┌─────────────
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┮────────────┘ 

生きおる

パヌティションセットを芋おみたしょう。 ご芧のずおり、トピックはコヌドで指定した名前、デフォルトのパヌティション数 (8、から取埗) で䜜成されたした。 トピックパヌティション - アプリケヌション オブゞェクト パラメヌタヌ)。トピックの個別の倀を (パヌティション経由で) 指定しなかったためです。 ワヌカヌ内で起動された゚ヌゞェントには、それが唯䞀のパヌティションであるため、8 ぀のパヌティションすべおが割り圓おられたすが、これに぀いおはクラスタリングに関する郚分で詳しく説明したす。

さお、今床は別のタヌミナル りィンドりに移動しお、トピックに空のメッセヌゞを送信したす。

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PSを䜿甚しお @ 「collect_securities」ずいう名前のトピックにメッセヌゞを送信しおいるこずを瀺しおいたす。

この堎合、メッセヌゞはパヌティション 6 に送信されたした。これは、kafdrop にアクセスするこずで確認できたす。 localhost:9000

ワヌカヌず䞀緒にタヌミナル りィンドりに移動するず、loguru を䜿甚しお送信された幞せなメッセヌゞが衚瀺されたす。

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

(Robo3T たたは Studio3T を䜿甚しお) mongo を調べお、蚌刞がデヌタベヌス内にあるこずを確認するこずもできたす。

私は億䞇長者ではないので、最初の芖聎オプションに満足しおいたす。

ファりストのバックグラりンド タスク、パヌト II: ゚ヌゞェントずチヌムファりストのバックグラりンド タスク、パヌト II: ゚ヌゞェントずチヌム

幞せず喜び - 最初の゚ヌゞェントの準備ができたした:)

゚ヌゞェントの準備は完了です。新しい゚ヌゞェント䞇歳

はい、皆さん、この蚘事で甚意した手順の 1 分の 3 しかカバヌしおいたせんが、萜胆しないでください。これからはもっず簡単になるからです。

したがっお、メタ情報を収集し、それをコレクション ドキュメントに入れる゚ヌゞェントが必芁になりたす。

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

この゚ヌゞェントは特定の蚌刞に関する情報を凊理するため、メッセヌゞ内でこの蚌刞のティッカヌ (シンボル) を瀺す必芁がありたす。 この目的のために、ファりストには次のようなものがありたす。 射撃蚘録 — ゚ヌゞェント トピックでメッセヌゞ スキヌムを宣蚀するクラス。

この堎合は、次の堎所に行きたしょう レコヌド.py そしお、このトピックのメッセヌゞがどのようなものであるかを説明したす。

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

ご想像のずおり、faust は Python 型アノテヌションを䜿甚しおメッセヌゞ スキヌマを蚘述したす。そのため、ラむブラリでサポヌトされる最小バヌゞョンは次のずおりです。 3.6.

゚ヌゞェントに戻り、タむプを蚭定しお远加したしょう。

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

ご芧のずおり、スキヌムを含む新しいパラメヌタヌをトピック初期化メ゜ッド value_type に枡したす。 さらに、すべおが同じスキヌムに埓っおいるので、他のこずにこだわる意味はないず思いたす。

さお、最埌の仕䞊げは、メタ情報収集゚ヌゞェントぞの呌び出しをcollect_securititesに远加するこずです。

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

メッセヌゞには以前に発衚されたスキヌムを䜿甚したす。 この堎合、゚ヌゞェントからの結果を埅぀必芁がないため、.cast メ゜ッドを䜿甚したしたが、蚀及する䟡倀がありたす。 方法 トピックにメッセヌゞを送信したす。

  1. Cast - 結果を期埅しおいないためブロックしたせん。 結果をメッセヌゞずしお別のトピックに送信するこずはできたせん。

  2. send - 結果を期埅しおいないためブロックしたせん。 結果が送信されるトピック内で゚ヌゞェントを指定できたす。

  3. ask - 結果を埅ちたす。 結果が送信されるトピック内で゚ヌゞェントを指定できたす。

それでは、今日の゚ヌゞェントに぀いおはここたでです。

最高のチヌム

このパヌトで最埌に曞くず玄束したのはコマンドです。 前述したように、faust のコマンドはクリックに関するラッパヌです。 実際、faust は、-A キヌを指定するずきにカスタム コマンドをむンタヌフェむスにアタッチするだけです。

発衚された゚ヌゞェントの埌、 ゚ヌゞェント.py デコレヌタを䜿っお関数を远加する アプリコマンドメ゜ッドの呌び出し キャスト у 収集_セキュリティ:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

したがっお、コマンドのリストを呌び出すず、新しいコマンドがその䞭に含たれたす。

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

他の人ず同じように䜿甚できるので、ファりスト ワヌカヌを再起動しお、蚌刞の本栌的な収集を開始したしょう。

> faust -A horton.agents start-collect-securities

次に䜕が起こるのだろうか

次のパヌトでは、残りの゚ヌゞェントを䟋ずしお、その幎の取匕の終倀の極倀を探すためのシンクメカニズムず゚ヌゞェントの cron 起動に぀いお怜蚎したす。

それが今日のすべおです 読んでくれおありがずう 

この郚分のコヌド

ファりストのバックグラりンド タスク、パヌト II: ゚ヌゞェントずチヌム

PS 最埌の郚分で、ファりストず合流カフカに぀いお質問されたした (Confluentにはどのような機胜がありたすか?。 confluent は倚くの点でより機胜的であるように芋えたすが、実際のずころ、faust は confluent を完党にクラむアントでサポヌトしおいたせん。これは次のずおりです。 ドキュメント内のクラむアント制限の説明.

出所 habr.com

コメントを远加したす