Фонові завдання на Faust, Частина II: Агенти та Команди

Фонові завдання на Faust, Частина II: Агенти та Команди

Зміст

  1. Частина I: Вступ

  2. Частина II: Агенти та Команди

Що ми тут робимо?

Отже, друга частина. Як і писалося раніше, у ній ми зробимо таке:

  1. Напишемо невеликий клієнтик для alphavantage на aiohttp із запитами на потрібні нам ендпоінти.

  2. Зробимо агента, який збиратиме дані про цінні папери та мета інформацію щодо них.

Але це те, що ми зробимо для самого проекту, а в плані дослідження faust ми дізнаємося, як писати агентів, які обробляють стрим подій з kafka, а так само як написати команди (обгортка на click), у нашому випадку — для ручного пуша повідомлення у топік, за яким стежить агент.

Підготовка

Клієнт AlphaVantage

Для початку напишемо невеликий aiohttp клієнтик для запитів на alphavantage.

alphavantage.py

Спойлер

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Власне по ньому все ясно:

  1. API AlphaVantage досить просто та красиво спроектована, тому всі запити я вирішив проводити через метод construct_query де своє чергу йде http виклик.

  2. Усі поля я приводжу до snake_case для зручності.

  3. Ну і декорація logger.catch для гарного та інформативного виведення трейсбеку.

PS Незабутнє локально додати токен alphavantage в config.yml, або експортувати змінне середовище HORTON_SERVICE_APIKEY. Отримуємо токен тут.

CRUD-клас

У нас буде колекція securities для зберігання мета інформації про цінні папери.

database/security.py

Тут нічого пояснювати не потрібно, а базовий клас сам по собі досить простий.

get_app()

Додамо функцію створення об'єкта програми в app.py

Спойлер

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Поки в нас буде найпростіше створення програми, трохи пізніше ми його розширимо, однак, щоб не змушувати вас чекати, ось референси на App-клас. На клас settings теж раджу подивитись, тому що саме він відповідає за більшу частину налаштувань.

Основна частина

Агент збору та збереження списку цінних паперів

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Так, спочатку отримуємо об'єкт faust-додатку – це досить просто. Далі, ми явно оголошуємо топік для нашого агента ... Тут варто згадати, що це таке, що за параметр internal і як це можна влаштувати по-іншому.

  1. Топики в kafka, якщо ми хочемо дізнатися точне визначення, краще прочитати оф. доку, або можна прочитати конспект на хабре російською, де так само все досить точно відображено 🙂

  2. Параметр internal, Досить добре описаний в доці faust, дозволяє нам налаштовувати топік прямо в коді, природно, маються на увазі параметри, передбачені розробниками faust, наприклад: retention, retention policy (за замовчуванням delete, але можна встановити і компактний), кількість партицій на топік (безліч, щоб зробити, наприклад, менше ніж глобальне значення програми faust).

  3. Взагалі, агент може створювати сам керований топік з глобальними значеннями, проте, я люблю оголошувати все явно. До того ж, деякі параметри (наприклад, кількість партицій або retention policy) топіка в оголошенні агента налаштувати не можна.

    Ось як це могло виглядати без ручного визначення топіка:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ну а тепер опишемо, що робитиме наш агент 🙂

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Отже, спочатку агента ми відкриваємо aiohttp сесію для запитів через наш клієнт. Таким чином, при запуску воркера, коли буде запущено наш агент, відразу ж буде відкрито сесію — одну, на весь час роботи воркера (або кілька, якщо змінити параметр одночасність у агента з дефолтної одиниці).

Далі, ми йдемо по стриму (повідомлення ми поміщаємо в _, тому що нам, в даному агенті, байдуже зміст) повідомлень з нашого топіка, якщо вони є при поточному зрушенні (offset), інакше, наш цикл чекатиме їх надходження. Ну а всередині нашого циклу, ми логуємо надходження повідомлення, отримуємо список активних цінних паперів і зберігаємо його в базу, перевіряючи при цьому, чи є папір з таким тикером і біржею в БД. якщо є, то вона (папір) просто оновиться.

Запустимо наше творіння!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Можливості веб-компонента faust я розглядати у статтях не буду, тож виставляємо відповідний прапор.

У нашій команді запуску ми вказали faust'у, де шукати об'єкт програми та що робити з ним (запустити воркер) з рівнем виведення логів info. Отримуємо наступний висновок:

Спойлер

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Воно живе!

Подивимося на partition set. Як ми бачимо, був створений топік з ім'ям, яке ми позначили в коді, кількість партицій дефолтна (8, взята з topic_partitions - Параметра об'єкта докладання), так як у нашого топіка ми індивідуальне значення (через partitions) не вказували. Запущеному агенту у воркері відведено всі 8 партицицій, оскільки він єдиний, але про це буде детальніше щодо кластерингу.

Що ж, тепер можемо зайти в інше вікно терміналу і відправити порожнє повідомлення до нашого топіка:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS за допомогою @ ми показуємо, що надсилаємо повідомлення в топік з ім'ям «collect_securities».

В даному випадку, повідомлення пішло в 6 партицію - це можна перевірити, зайшовши в kafdrop localhost:9000

Перейшовши у вікно терміналу з нашим воркером, ми побачимо радісне повідомлення надіслане за допомогою loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Також, можемо заглянути в mongo (за допомогою Robo3T або Studio3T) і побачити, що цінні папери в базі:

Я не мільярдер, а тому задовольняємося першим варіантом перегляду.

Фонові завдання на Faust, Частина II: Агенти та КомандиФонові завдання на Faust, Частина II: Агенти та Команди

Щастя та радість - перший агент готовий 🙂

Агент готовий, нехай живе новий агент!

Так, панове, нами пройдено лише 1/3 шляху, приготованого цією статтею, але не сумуйте, тому що зараз буде вже легше.

Отже, тепер нам потрібен агент, який збирає інформацію і складає її в документ колекції:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Так як цей агент оброблятиме інформацію про конкретну security, нам потрібно в повідомленні вказати тикер цього паперу. Для цього у faust існують Records - Класи, що декларують схему повідомлення в топіці агента.

У такому разі перейдемо в records.py і опишемо, як має виглядати повідомлення у цього топіка:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Як ви вже могли здогадатися, faust для опису схеми повідомлення використовує анотацію типів у python, тому й мінімальна версія, яку підтримує бібліотека. 3.6.

Повернемося до агента, встановимо типи та допишемо його:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Як бачите, ми передаємо метод ініціалізації топіка новий параметр зі схемою — value_type. Далі, все за тією ж схемою, тому зупинятися на чомусь ще — сенсу не бачу.

Ну що ж, останній штрих — додамо в collect_securitites виклик агента збору інформації:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Використовуємо оголошення раніше схему для повідомлення. В даному випадку, я використовував метод .cast, тому що нам не потрібно очікувати результату від агента, але варто згадати, що способів надіслати повідомлення в топік:

  1. cast — не блокує, оскільки не очікує на результат. Не можна надіслати результат в інший топік повідомленням.

  2. send - не блокує, тому що не очікує результату. Можна вказати агента в топік якого піде результат.

  3. ask - очікує на результат. Можна вказати агента в топік якого піде результат.

Отже, на цьому з агентами сьогодні все!

Команда мрії

Останнє, що я обіцяв написати в цій частині команди. Як уже говорилося раніше, команди faust - це обгортка над click. Фактично faust просто приєднує нашу кастомну команду до свого інтерфейсу за вказівкою ключа -A

Після оголошених агентів у agents.py додамо функцію з декоратором app.command, що викликає метод кинути у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Таким чином, якщо ми викличемо список команд, у ньому буде і наша нова команда:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Нею ми можемо скористатися як будь-який інший, тому перезапустимо faust воркер і почнемо повноцінний збір цінних паперів:

> faust -A horton.agents start-collect-securities

Що буде далі?

У наступній частині ми, на прикладі агентів, що залишилися, розглянемо, механізм sink для пошуку екстремум у цінах закриття торгів за рік і cron-запуск агентів.

На сьогодні все! Дякую за прочитання 🙂

Код цієї частини

Фонові завдання на Faust, Частина II: Агенти та Команди

PS Під минулою частиною мене запитали про faust та confluent kafka (які є у confluent фічі). Здається, що confluent багато в чому функціональніший, але річ у тому, що faust не має повноцінної підтримки клієнта для confluent — це випливає з описи обмежень клієнтів у доку.

Джерело: habr.com

Додати коментар або відгук