Faust 的后台任务,第二部分:代理和团队

Faust 的后台任务,第二部分:代理和团队

目录

  1. 第一部分:简介

  2. 第二部分:代理和团队

我们在这里做什么?

所以,所以,第二部分。 如前所述,我们将在其中执行以下操作:

  1. 让我们在 aiohttp 上为 alphavantage 编写一个小型客户端,并请求我们需要的端点。

  2. 让我们创建一个代理来收集证券数据及其元信息。

但是,这就是我们将为项目本身所做的事情,就 faust 研究而言,我们将学习如何编写处理来自 kafka 的流事件的代理,以及如何编写命令(单击包装器),在我们的例子中 -用于将消息手动推送到代理正在监控的主题。

训练

AlphaVantage 客户端

首先,让我们为 alphavantage 的请求编写一个小型 aiohttp 客户端。

阿尔法优势.py

扰流板

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

其实,一切都已经很清楚了:

  1. AlphaVantage API 非常简单且设计精美,因此我决定通过该方法发出所有请求 construct_query 其中又存在一个 http 调用。

  2. 我把所有的领域都带到 snake_case 为了方便。

  3. 嗯,logger.catch 装饰用于漂亮且信息丰富的回溯输出。

PS 不要忘记将 alphavantage 令牌本地添加到 config.yml,或导出环境变量 HORTON_SERVICE_APIKEY。 我们收到一个令牌 这里.

增删改查类

我们将有一个证券集合来存储有关证券的元信息。

数据库/安全.py

在我看来,这里没有必要解释任何东西,基类本身非常简单。

获取应用程序()

让我们添加一个用于创建应用程序对象的函数 应用程序

扰流板

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

现在我们将创建最简单的应用程序,稍后我们将扩展它,但是,为了不让您等待,这里 参考 到App级。 我还建议您查看设置类,因为它负责大部分设置。

主要部分

收集和维护证券清单的代理

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

所以,首先我们得到 faust 应用程序对象 - 这非常简单。 接下来,我们显式地为我们的代理声明一个主题......这里值得一提的是它是什么,内部参数是什么以及如何以不同的方式安排它。

  1. kafka中的主题,如果我们想知道确切的定义,最好阅读 离开。 文档,或者你可以阅读 概要 关于俄语的 Habré,一切都反映得相当准确:)

  2. 参数内部在faust doc中描述得很好,允许我们直接在代码中配置主题,当然,这意味着faust开发人员提供的参数,例如:保留,保留策略(默认删除,但你可以设置 紧凑),每个主题的分区数(分数例如,做少于 全球意义 应用程序浮士德)。

  3. 一般来说,代理可以创建具有全局值的托管主题,但是,我喜欢显式声明所有内容。 另外,代理通告中主题的某些参数(例如分区数量或保留策略)无法配置。

    如果不手动定义主题,它可能如下所示:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

好吧,现在让我们描述一下我们的代理将做什么:)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

因此,在代理开始时,我们通过客户端打开一个 aiohttp 会话以接收请求。 因此,当启动一个工作程序时,当我们的代理启动时,将立即打开一个会话 - 一个,在工作程序运行的整个时间内(或多个,如果您更改参数 并发 来自具有默认单位的代理)。

接下来,我们跟随流(我们将消息放入 _,因为我们在这个代理中不关心来自我们主题的消息的内容,如果它们存在于当前偏移处,否则我们的周期将等待它们的到达。 好吧,在我们的循环中,我们记录消息的接收,获取活跃证券列表(get_securities 默认情况下仅返回活跃证券,请参阅客户端代码)并将其保存到数据库,检查是否存在具有相同股票代码的证券以及数据库中的交换,如果有,那么它(论文)将被简单地更新。

让我们开始我们的创作吧!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS特点 网络组件 我不会在文章中考虑浮士德,因此我们设置了适当的标志。

在我们的启动命令中,我们通过信息日志输出级别告诉 faust 在哪里查找应用程序对象以及如何处理它(启动工作程序)。 我们得到以下输出:

扰流板

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

它还活着!!!

我们来看看分区集。 正如我们所看到的,使用我们在代码中指定的名称创建了一个主题,默认分区数(8,取自 主题分区 - 应用程序对象参数),因为我们没有为我们的主题指定单独的值(通过分区)。 Worker 中启动的代理被分配了所有 8 个分区,因为它是唯一的一个,但这将在有关集群的部分中更详细地讨论。

好吧,现在我们可以转到另一个终端窗口并向我们的主题发送一条空消息:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS使用 @ 我们表明我们正在向名为“collect_securities”的主题发送消息。

在本例中,消息发送到分区 6 - 您可以通过转到 kafdrop 来检查这一点 localhost:9000

与我们的工作人员一起进入终端窗口,我们将看到使用 loguru 发送的一条快乐消息:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

我们还可以查看 mongo(使用 Robo3T 或 Studio3T)并查看数据库中的证券:

我不是亿万富翁,因此我们对第一个观看选项感到满意。

Faust 的后台任务,第二部分:代理和团队Faust 的后台任务,第二部分:代理和团队

幸福和快乐——第一个特工已经准备好了:)

代理准备就绪,新代理万岁!

是的,先生们,我们只走了本文准备的道路的 1/3,但不要灰心,因为现在会更容易。

所以现在我们需要一个代理来收集元信息并将其放入集合文档中:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

由于该代理将处理有关特定证券的信息,因此我们需要在消息中指示该证券的股票代码(符号)。 为此目的,在 fast 中有 记录 — 在代理主题中声明消息方案的类。

在这种情况下,我们去 记录.py 并描述该主题的消息应该是什么样子:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

正如您可能已经猜到的,faust 使用 python 类型注释来描述消息模式,这就是为什么该库支持的最低版本是 3.6.

让我们返回到代理,设置类型并添加它:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

正如您所看到的,我们将一个带有方案的新参数传递给主题初始化方法 - value_type。 此外,一切都遵循相同的方案,所以我认为没有必要纠缠于其他任何事情。

好吧,最后一步是将元信息收集代理的调用添加到collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

我们使用之前宣布的消息方案。 在本例中,我使用了 .cast 方法,因为我们不需要等待代理的结果,但值得一提的是 方式 向主题发送消息:

  1. cast - 不会阻塞,因为它不期望结果。 您不能将结果作为消息发送到另一个主题。

  2. send - 不会阻塞,因为它不期望结果。 您可以在结果将转到的主题中指定代理。

  3. 询问 - 等待结果。 您可以在结果将转到的主题中指定代理。

那么,今天的代理就到此为止了!

梦之队

我承诺在这部分中写的最后一件事是命令。 如前所述,faust 中的命令是点击的包装。 事实上,faust 只是在指定 -A 键时将我们的自定义命令附加到其界面

公布代理后 代理.py 添加带有装饰器的函数 应用程序命令调用方法 у 收集证券:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

因此,如果我们调用命令列表,我们的新命令将在其中:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

我们可以像其他人一样使用它,所以让我们重新启动 faust 工作程序并开始全面的证券收集:

> faust -A horton.agents start-collect-securities

接下来会发生什么?

在下一部分中,我们将以剩余的代理为例,考虑寻找当年交易收盘价极值的下沉机制以及代理的 cron 启动。

这就是今天的全部内容! 谢谢阅读 :)

这部分的代码

Faust 的后台任务,第二部分:代理和团队

PS 在最后一部分中,我被问及有关 faust 和 confluence kafka 的问题(confluence有什么特点?)。 看起来 confluence 在很多方面都更实用,但事实是 faust 并没有对 confluence 提供完整的客户端支持 - 这是从 文档中客户端限制的描述.

来源: habr.com

添加评论