Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Tusmada

  1. Часть I: Введение

  2. Qaybta II: Wakiilada iyo Kooxaha

Maxaan ka sameyneynaa halkan?

Haddaba, qaybta labaad. Sidii hore loogu qoray, waxaan ku samayn doonaa kuwan soo socda:

  1. Aynu ku qorno macmiil yar oo alfavantage ah aiohttp oo ay ku jiraan codsiyada dhamaadka-dhamaadka ee aan u baahanahay.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи — для ручного пуша сообщения в топик, за которым следит агент.

Tababarka

Macmiilka AlphaVantage

Marka hore, aynu u qorno macmiil yar oo aiohttp ah codsiyada alfavantage.

alphavantage.py

Baabbi'iyihii

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Dhab ahaantii, wax walba way iska cad yihiin:

  1. AlphaVantage API si fudud oo qurux badan ayaa loo qaabeeyey, marka waxaan go'aansaday inaan dhammaan codsiyada ku sameeyo habka construct_query где в свою очередь идёт http вызов.

  2. Waxaan keenayaa beeraha oo dhan snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен halkan.

fasalka CRUD

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в barnaamijka.py

Baabbi'iyihii

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Hadda waxaan heli doonaa abuurista codsiga ugu fudud, wax yar ka dib waan ballaarin doonaa, si kastaba ha ahaatee, si aan kuu sugin, halkan референсы App-class. Waxaan sidoo kale kugula talinayaa inaad eegto fasalka dejinta, maadaama ay mas'uul ka tahay inta badan goobaha.

Qaybta ugu muhiimsan

Агент сбора и сохранения списка ценных бумаг

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Marka, marka hore waxaan helnaa shayga codsiga degdega ah - waa wax fudud. Marka xigta, waxaan si cad ugu dhawaaqeynaa mawduuc loogu talagalay wakiilkayaga ... Halkan waxaa habboon in la sheego waxa ay tahay, waa maxay qiyaasta gudaha iyo sida tan loo habeyn karo si ka duwan.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать off dukumeenti, либо можно прочитать compendium Habré oo Ruush ah, halkaas oo wax waliba si sax ah uga muuqdaan :)

  2. Parameter gudaha, oo si fiican loogu sharraxay dokumentiga faust, wuxuu noo ogolaanayaa inaan mawduuca si toos ah u habeyno koodhka, dabcan, tani waxay ka dhigan tahay cabbirrada ay bixiyaan soo-saareyaasha xun, tusaale ahaan: sii-haynta, siyaasadda sii-haynta (sida caadiga ah tirtir, laakiin waad dejin kartaa isafgaradka), tirada qaybaha mowduuc kasta (qaybood, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Guud ahaan, wakiilku wuxuu abuuri karaa mawduuc la maareeyay oo leh qiyam caalami ah, si kastaba ha ahaatee, waxaan jeclahay inaan wax walba si cad u sheego. Intaa waxaa dheer, qaar ka mid ah xuduudaha (tusaale, tirada qaybaha ama siyaasadda sii haynta) ee mawduuca ku jira xayeysiiska wakiilka lama habeyn karo.

    Waa kuwan sida ay u ekaan karto iyada oo aan gacanta lagu qeexin mawduuca:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Hagaag, hadda aan sharaxno waxa wakiilkeenu samayn doono :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия — одна, на всё время работы воркера (или несколько, если изменить параметр isku-duwidda у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, maadaama anaga, wakiilkan, aan dan ka lahayn nuxurka) ee fariimaha mawduuceena, haddii ay ku jiraan xajinta hadda, haddii kale wareeggayagu wuxuu sugi doonaa imaatinkooda. Hagaag, gudaha loop-keena, waxaanu galnaa risiidhka fariinta, waxaanu helnaa liis firfircoon (get_securities soo celinta oo kaliya firfircoonida caadiga ah, eeg code-ka macmiilka) oo kaydiya kaydinta xogta, anagoo hubinayna haddii uu jiro ammaan leh calaamad isku mid ah iyo ku beddelashada kaydka xogta , haddii ay jirto, markaa (warqad) si fudud ayaa loo cusboonaysiin doonaa.

Aan bilowno abuurkeena!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Tilmaamaha PS qaybta webka Ma tixgelin doono khaladka maqaallada, markaa waxaan dejineynaa calanka ku habboon.

В нашей команде запуска мы указали faust’у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Baabbi'iyihii

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Оно живое!!!

Aynu eegno qaybta qaybinta. Sida aan arki karno, mowduuc ayaa la sameeyay magaca aan ku magacownay koodhka, tirada qaybta caadiga ah (8, laga soo qaatay mawduuc_qodob — параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ waxaan tuseynaa inaan fariin u direyno mowduuc la yiraahdo "collect_securities".

Xaaladdan oo kale, fariintu waxay tagtay qaybta 6 - waxaad ku hubin kartaa tan adoo aadaya kafdrop on localhost:9000

Markaan shaqaalahayada la aadno daaqadda terminalka, waxaan arki doonaa fariin farxad leh oo loo diray loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Anigu ma ihi bilyaneer, sidaas darteed waxaan ku qanacsanahay ikhtiyaarka daawashada koowaad.

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo KooxahaHawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Счастье и радость — первый агент готов 🙂

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - fasallada ku dhawaaqaya nidaamka fariinta mawduuca wakiilka.

Xaaladdan oo kale, aan aado diiwaanada.py и опишем, как должно выглядеть сообщение у этого топика:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой — 3.6.

Aan ku soo laabano wakiilka, dejino noocyada oo aan ku darno:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой — value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё — смысла не вижу.

Hagaag, taabashada kama dambaysta ahi waa in lagu daro wicitaan wakiilka xog ururinta meta si ay u ururiyaan_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Waxaan fariinta u isticmaalnaa nidaamka hore loo sheegay. Xaaladdan oo kale, waxaan isticmaalay habka .cast tan iyo markii aan u baahnayn inaan sugno natiijada wakiilka, laakiin waxaa habboon in la sheego taas siyaabo fariin u dir mawduuca:

  1. kabka - ma xannibo sababtoo ah ma filayo natiijo. Uma diri kartid natiijada mawduuc kale fariin ahaan.

  2. dir - ma xannibo sababtoo ah ma filayo natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.

  3. weydii - waxay sugaysaa natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.

Итак, на этом с агентами на сегодня всё!

Kooxda riyada

Последнее, что я обещал написать в этой части — команды. Как уже говорилось ранее, команды в faust — это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

Ka dib markii wakiilada lagu dhawaaqay in wakiilada.py ku dar shaqo leh qurxin app.command, вызывающую метод tuuro у ururin_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Waxaan u isticmaali karnaa sida qof kasta oo kale, markaa aan dib u bilowno shaqaalaha khaldan oo aan bilowno ururinta dammaanadda oo dhammaystiran:

> faust -A horton.agents start-collect-securities

Maxaa xigi doona?

Qaybta soo socota, annagoo adeegsanayna wakiillada soo hadhay tusaale ahaan, waxaynu tixgelin doonaa habka saxanka ee lagu raadinayo meelaha ugu daran ee qiimaha xidhitaanka ganacsiga ee sanadka iyo furitaanka wakiilada.

На сегодня всё! Спасибо за прочтение 🙂

Koodhka qaybtan

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

P.S. Под прошлой частью меня спросили про faust и confluent kafka (waa maxay sifooyinka uu leeyahay isku-darka?). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent — это следует из sharaxaada xayiraadaha macmiilka ee dukumeentiga.

Source: www.habr.com

Add a comment