《浮士德》的背景任務,第二部分:代理和團隊

《浮士德》的背景任務,第二部分:代理和團隊

目錄

  1. 第一部分:簡介

  2. 第二部分:代理商和團隊

我們在這裡做什麼?

所以,所以,第二部分。 如前所述,我們將在其中執行以下操作:

  1. 讓我們在 aiohttp 上為 alphavantage 編寫一​​個小型客戶端,並請求我們需要的端點。

  2. 讓我們建立一個代理來收集證券資料及其元資訊。

但是,這就是我們將為專案本身所做的事情,就faust 研究而言,我們將學習如何編寫處理來自kafka 的串流事件的代理,以及如何編寫命令(單擊包裝器),在我們的例子中-用於將訊息手動推送到代理正在監控的主題。

訓練

AlphaVantage 用戶端

首先,讓我們為 alphavantage 的請求編寫一個小型 aiohttp 用戶端。

阿爾法優勢.py

擾流板

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

其實,一切都已經很清楚了:

  1. AlphaVantage API 非常簡單且設計精美,因此我決定透過該方法發出所有請求 construct_query 其中又存在一個 http 呼叫。

  2. 我把所有的領域帶到 snake_case 為了舒適。

  3. 嗯,logger.catch 裝飾用於漂亮且資訊豐富的回溯輸出。

PS 不要忘記將 alphavantage 令牌本地添加到 config.yml,或匯出環境變量 HORTON_SERVICE_APIKEY。 我們收到一個令牌 這裡.

增刪改查類

我們將有一個證券集合來儲存有關證券的元資訊。

資料庫/安全性.py

在我看來,這裡沒有必要解釋任何東西,基類本身非常簡單。

獲取應用程式()

讓我們新增一個用於創建應用程式物件的函數 應用程序

擾流板

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

現在我們將創建最簡單的應用程序,稍後我們將擴展它,但是,為了不讓您等待,這裡 參考 到App級。 我還建議您查看設定類,因為它負責大部分設定。

主要部分

收集和維護證券清單的代理

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

所以,首先我們得到 faust 應用程式物件 - 這非常簡單。 接下來,我們明確地為我們的代理聲明一個主題......這裡值得一提的是它是什麼,內部參數是什麼以及如何以不同的方式安排它。

  1. kafka中的主題,如果我們想知道確切的定義,最好閱讀 離開。 文件,或者你可以閱讀 抽象的 關於俄語的 Habré,一切都反映得相當準確:)

  2. 參數內部在faust doc中描述得很好,允許我們直接在程式碼中配置主題,當然,這意味著faust開發人員提供的參數,例如:保留,保留策略(預設刪除,但你可以設定 Compact single),每個主題的分區數(分數例如,做少於 全球意義 應用程式浮士德)。

  3. 一般來說,代理可以創建具有全域值的託管主題,但是,我喜歡明確聲明所有內容。 另外,代理通告中主題的某些參數(例如分區數量或保留策略)無法配置。

    如果不手動定義主題,它可能如下所示:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

好吧,現在讓我們來描述一下我們的代理將做什麼:)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

因此,在代理開始時,我們透過客戶端開啟一個 aiohttp 會話以接收請求。 因此,當啟動工作程序時,當我們的代理啟動時,將立即打開一個會話 - 一個,在工作程序運行的整個時間內(或多個,如果您更改參數 並發 來自具有預設單位的代理)。

接下來,我們跟隨流(我們將訊息放入 _,因為我們在這個代理中不關心來自我們主題的訊息的內容,如果它們存在於當前偏移處,否則我們的週期將等待它們的到達。 好吧,在我們的循環中,我們記錄訊息的接收,獲取活躍證券清單(get_securities 僅在預設情況下返回活躍證券,請參閱客戶端代碼)並將其保存到資料庫,檢查是否存在具有相同股票代碼的證券以及資料庫中的交換,如果有,那麼它(論文)將被簡單地更新。

讓我們開始我們的創作吧!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS特點 網路元件 我不會在文章中考慮浮士德,因此我們設置了適當的標誌。

在我們的啟動命令中,我們透過資訊日誌輸出等級告訴 faust 在哪裡尋找應用程式物件以及如何處理它(啟動工作程序)。 我們得到以下輸出:

擾流板

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

它還活著!!!

我們來看看分區集。 正如我們所看到的,使用我們在程式碼中指定的名稱創建了一個主題,預設分區數(8,取自 主題分區 - 應用程式物件參數),因為我們沒有為我們的主題指定單獨的值(透過分區)。 Worker 中啟動的代理程式被指派了所有 8 個分割區,因為它是唯一的一個,但這將在有關叢集的部分中更詳細地討論。

好吧,現在我們可以轉到另一個終端機視窗並向我們的主題發送空訊息:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS使用 @ 我們表明我們正在向名為「collect_securities」的主題發送訊息。

在本例中,訊息會傳送到分割區 6 - 您可以透過前往 kafdrop 來檢查這一點 localhost:9000

與我們的工作人員一起進入終端窗口,我們將看到使用 loguru 發送的一條快樂訊息:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

我們也可以查看 mongo(使用 Robo3T 或 Studio3T)並查看資料庫中的證券:

我不是億萬富翁,因此我們對第一個觀看選項感到滿意。

《浮士德》的背景任務,第二部分:代理和團隊《浮士德》的背景任務,第二部分:代理和團隊

幸福與快樂-第一位特務已經準備好了:)

代理準備就緒,新代理萬歲!

是的,先生們,我們只走了本文準備的道路的 1/3,但不要灰心,因為現在會更容易。

所以現在我們需要一個代理來收集元資訊並將其放入集合文件中:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

由於該代理將處理有關特定證券的信息,因此我們需要在訊息中指示該證券的股票代碼(符號)。 為此目的,在 fast 中有 記錄 — 在代理主題中聲明訊息方案的類別。

在這種情況下,我們去 記錄.py 並描述該主題的訊息應該是什麼樣子:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

正如您可能已經猜到的,faust 使用 python 類型註釋來描述訊息模式,這就是為什麼該庫支援的最低版本是 3.6.

讓我們返回到代理,設定類型並添加它:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

正如您所看到的,我們將一個帶有方案的新參數傳遞給主題初始化方法 - value_type。 此外,一切都遵循相同的方案,所以我認為沒有必要糾纏於其他任何事情。

好吧,最後一步是將元資訊收集代理的呼叫加入到collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

我們使用先前宣布的訊息方案。 在本例中,我使用了 .cast 方法,因為我們不需要等待代理的結果,但值得一提的是 方式 向主題發送訊息:

  1. cast - 不會阻塞,因為它不期望結果。 您不能將結果作為訊息發送到另一個主題。

  2. send - 不會阻塞,因為它不期望結果。 您可以在結果將轉到的主題中指定代理程式。

  3. 詢問 - 等待結果。 您可以在結果將轉到的主題中指定代理程式。

那麼,今天的代理商就到此為止了!

夢之隊

我承諾在這部分中寫的最後一件事是命令。 如前所述,faust 中的命令是點擊的包裝。 事實上,faust 只是在指定 -A 鍵時將我們的自訂命令附加到其介面

公佈代理後 代理.py 添加帶有裝飾器的函數 應用程式命令呼叫方法 у 收集證券:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

因此,如果我們調用命令列表,我們的新命令將在其中:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

我們可以像其他人一樣使用它,所以讓我們重新啟動 faust 工作程序並開始全面的證券收集:

> faust -A horton.agents start-collect-securities

接下來會發生什麼?

在下一部分中,我們以剩餘的代理為例,考慮尋找當年交易收盤價極值的下沉機制以及代理的 cron 啟動。

這就是今天的全部內容! 謝謝閱讀 :)

這部分的程式碼

《浮士德》的背景任務,第二部分:代理和團隊

PS 在最後一部分中,我被問到有關 faust 和 confluence kafka 的問題(confluence有什麼特色?)。 看起來 confluence 在很多方面都更實用,但事實是 faust 並沒有對 confluence 提供完整的客戶端支援 - 這是從 文件中對客戶端限制的描述.

來源: www.habr.com

添加評論