Фонавыя задачы на ​​Faust, Частка II: Агенты і Каманды

Фонавыя задачы на ​​Faust, Частка II: Агенты і Каманды

Змест

  1. Частка I: Увядзенне

  2. Частка II: Агенты і Каманды

Што мы тут робім?

Дык вось, другая частка. Як і пісалася раней, у ёй мы зробім наступнае:

  1. Напішам невялікі кліентык для alphavantage на aiohttp з запытамі на патрэбныя нам эндпаінты.

  2. Зробім агента, які будзе збіраць дадзеныя аб каштоўных паперах і мэта інфармацыю па іх.

Але, гэта тое, што мы зробім для самога праекту, а ў плане даследавання faust мы даведаемся, як пісаць агентаў, якія апрацоўваюць стрым падзей з kafka, а гэтак жа як напісаць каманды (абгортка на click), у нашым выпадкі – для ручнога пуша паведамлення у топік, за якім сочыць агент.

Падрыхтоўка

Кліент AlphaVantage

Для пачатку, напішам невялікі aiohttp кліентык для запытаў на alphavantage.

alphavantage.py

Спойлер

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Уласна па ім усё зразумела:

  1. API AlphaVantage дастаткова проста і прыгожа спраектавана, таму ўсе запыты я вырашыў праводзіць праз метад construct_query дзе ў сваю чаргу ідзе http выклік.

  2. Усе палі я прыводжу да snake_case для зручнасці.

  3. Ну і дэкарацыя logger.catch для прыгожай і інфарматыўнай высновы трэйсбека.

PS Незабыўны лакальна дадаць токен alphavantage у config.yml, альбо экспартаваць зменную асяроддзі HORTON_SERVICE_APIKEY. Атрымліваем токен тут.

CRUD-клас

У нас будзе калекцыя securities для захоўвання мета інфармацыі аб каштоўных паперах.

database/security.py

Тут па-мойму нічога тлумачыць не трэба, а базавы клас сам па сабе дастаткова просты.

get_app()

Дадамо функцыю стварэння аб'екта прыкладання ў app.py

Спойлер

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Пакуль у нас будзе самае простае стварэнне прыкладання, крыху пазней мы яго пашырым, аднак, каб не прымушаць вас чакаць, вось рэферэнсы на App-клас. На клас settings таксама раю зірнуць, бо менавіта ён адказвае за большую частку налад.

асноўная частка

Агент збору і захаванні спісу каштоўных папер

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Так, спачатку атрымліваем аб'ект faust-прыкладанні - гэта досыць проста. Далей, мы відавочна аб'яўляем топік для нашага агента… Тут варта згадаць, што гэта такое, што за параметр internal і як гэта можна задаволіць па-іншаму.

  1. Топікі ў kafka, калі мы хочам даведацца дакладнае вызначэнне, то лепш прачытаць оф. доку, альбо можна прачытаць кампендыум на хабры на рускай, дзе гэтак жа ўсё дастаткова дакладна адлюстравана 🙂

  2. Параметр internal, досыць добра апісаны ў доку faust, дазваляе нам наладжваць топік прама ў кодзе, натуральна, маюцца на ўвазе параметры, прадугледжаныя распрацоўшчыкамі faust, напрыклад: retention, retention policy (па-змаўчанні delete, але можна ўсталяваць і кампактны), кол-ць партыцый на топік (мноства, каб зрабіць, напрыклад, меншае чым глабальнае значэнне прыкладанні faust).

  3. Наогул, агент можа ствараць сам кіраваны топік з глабальнымі значэннямі, аднак, я кахаю аб'яўляць усё відавочна. Да таго ж, некаторыя параметры (напрыклад, кол-ць партыцый ці retention policy) топіка ў аб'яве агента наладзіць нельга.

    Вось як гэта магло было выглядаць без ручнога азначэння топіка:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ну а зараз, апішам, што будзе рабіць наш агент 🙂

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Такім чынам, у пачатку агента мы адчыняем aiohttp сесію для запытаў праз наш кліент. Такім чынам, пры запуску воркера, калі будзе запушчаны наш агент, адразу ж будзе адкрыта сесія - адна, на ўвесь час працы воркера (ці некалькі, калі змяніць параметр concurrency у агента з дэфолтнай адзінкі).

Далей, мы ідзем па стрыму (паведамленне мы змяшчаем у _, так як нам, у дадзеным агенце, абыякава змест) паведамленняў з нашага топіка, калі яны ёсць пры бягучым зруху (offset), інакш, наш цыкл будзе чакаць іх паступлення. Ну а ўсярэдзіне нашага цыклу, мы лагуем паступленне паведамлення, атрымліваем спіс актыўных (get_securities вяртае па-змаўчанні толькі active, гл. код кліента) каштоўных папер і захоўваем яго ў базу, правяраючы пры гэтым, ці ёсць папера з такім Біржавы сімвал і біржай у БД , калі ёсць, то яна (папера) проста абновіцца.

Запусцім наша тварэнне!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Магчымасці вэб-кампанента faust я разглядаць у артыкулах не буду, таму выстаўляемы адпаведны сцяг.

У нашай камандзе запуску мы паказалі faust'у, дзе шукаць аб'ект прыкладання і што рабіць з ім (запусціць воркер) з узроўнем вываду логаў info. Атрымліваем наступную выснову:

Спойлер

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Яно жывое!!!

Паглядзім на partition set. Як мы бачым, быў створаны топік з імем, якое мы пазначылі ў кодзе, у партый дэфолтнае (8, узятае з topic_partitions - Параметра аб'екта прыкладання), так як у нашага топіка мы індывідуальнае значэнне (праз partitions) не паказвалі. Запушчанаму агенту ў воркеры адведзены ўсе 8 партыцыцый, бо ён адзіны, але пра гэта будзе падрабязней у частцы пра кластэрынг.

Што ж, зараз можам зайсці ў іншае акно тэрмінала і адправіць пустое паведамленне ў наш топік:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS з дапамогай @ мы паказваем, што дасылаем паведамленне ў топік з імем «collect_securities».

У дадзеным выпадку, паведамленне сышло ў 6 партыцыю - гэта можна праверыць, зайшоўшы ў kafdrop на localhost:9000

Пяройдучы ў акно тэрмінала з нашым воркерам, мы ўбачым радаснае паведамленне, дасланае з дапамогай loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Гэтак жа, можам зазірнуць у mongo (з дапамогай Robo3T ці Studio3T) і ўбачыць, што каштоўныя паперы ў базе:

Я не мільярдэр, а таму, здавольваемся першым варыянтам прагляду.

Фонавыя задачы на ​​Faust, Частка II: Агенты і КамандыФонавыя задачы на ​​Faust, Частка II: Агенты і Каманды

Шчасце і радасць - першы агент гатовы 🙂

Агент гатовы, ды жыве новы агент!

Так, спадары, намі пройдзена толькі 1/3 шляху, прыгатаванага гэтым артыкулам, але не падайце духам, бо цяпер будзе ўжо лягчэй.

Такім чынам, зараз нам патрэбен агент, які збірае мета інфармацыю і складае яе ў дакумент калекцыі:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Так як гэты агент будзе апрацоўваць інфармацыю аб канкрэтнай security, нам трэба ў паведамленні ўказаць Біржавы сімвал (symbol) гэтай паперы. Для гэтага ў faust існуюць Ўлік - Класы, якія дэкларуюць схему паведамлення ў топіцы агента.

У такім выпадку пяройдзем у records.py і апішам, як павінна выглядаць паведамленне ў гэтага топіка:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Як вы ўжо маглі здагадацца, faust для апісання схемы паведамлення выкарыстоўвае анатацыю тыпаў у python, таму і мінімальная версія, якая падтрымліваецца бібліятэкай. 3.6.

Вернемся да агента, усталюем тыпы і дапішам яго:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Як бачыце, мы перадаем у метад ініцыялізацыі топіка новы параметр са схемай - value_type. Далей, усё па той жа самай схеме, таму спыняцца на чым тое яшчэ - сэнсу не бачу.

Ну што ж, апошні штрых - дадамо ў collect_securitites выклік агента збору мета інфармацыі:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Выкарыстоўваны раней аб'яве схему для паведамлення. У дадзеным выпадку, я выкарыстаў метад .cast, бо нам не трэба чакаць вынік ад агента, але варта згадаць, што спосабаў паслаць паведамленне ў топік:

  1. cast - не блакуе, так як не чакае выніку. Нельга даслаць вынік у іншы топік паведамленнем.

  2. send - не блакуе, так як не чакае выніку. Можна паказаць агента ў топік якога сыдзе вынік.

  3. ask - чакае выніку. Можна паказаць агента ў топік якога сыдзе вынік.

Дык вось, на гэтым з агентамі на сёння ўсё!

Каманда мары

Апошняе, што я абяцаў напісаць у гэтай частцы - каманды. Як ужо гаварылася раней, каманды ў faust - гэта абгортка над click. Фактычна faust проста далучае нашу кастамную каманду да свайго інтэрфейсу пры ўказанні ключа -A

Пасля абвешчаных агентаў у agents.py дадамо функцыю з дэкаратарам app.command, якая выклікае метад кінуць у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Такім чынам, калі мы выклічам спіс каманд, у ім будзе і наша новая каманда:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ёю мы можам скарыстацца, як любы іншы, таму перазапусцім faust воркер і пачнем паўнавартасны збор каштоўных папер:

> faust -A horton.agents start-collect-securities

Што будзе далей?

У наступнай частцы мы, на прыкладзе пакінутых агентаў, разгледзім, механізм sink для пошуку экстрэмуму ў коштах зачынення таргоў за год і cron-запуск агентаў.

На сёння ўсё! Дзякуй за чытанне 🙂

Код гэтай часткі

Фонавыя задачы на ​​Faust, Частка II: Агенты і Каманды

PS Пад мінулай часткай мяне спыталі пра faust і confluent kafka (якія ёсць у confluent фічы). Здаецца, што confluent шмат у чым функцыянальней, але справа ў тым, што faust не мае паўнавартаснай падтрымкі кліента для confluent – ​​гэта вынікае з апісання абмежаванняў кліентаў у доку.

Крыніца: habr.com

Дадаць каментар