Основни задачи за Faust, част II: Агенти и екипи

Основни задачи за Faust, част II: Агенти и екипи

Таблица на съдържанието

  1. Част I: Въведение

  2. Част II: Агенти и екипи

какво правим тук

Така, така, втората част. Както беше написано по-рано, в него ще направим следното:

  1. Нека напишем малък клиент за alphavantage на aiohttp със заявки за крайните точки, от които се нуждаем.

  2. Нека създадем агент, който ще събира данни за ценни книжа и мета информация за тях.

Но това е, което ще направим за самия проект и по отношение на изследването на faust, ще се научим как да пишем агенти, които обработват събития от kafka, както и как да пишем команди (click wrapper), в нашия случай - за ръчно насочени съобщения към темата, която агентът наблюдава.

Обучение

AlphaVantage клиент

Първо, нека напишем малък aiohttp клиент за заявки към alphavantage.

alphavantage.py

Спойлер

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Всъщност всичко е ясно от него:

  1. AlphaVantage API е доста прост и красиво проектиран, така че реших да направя всички заявки чрез метода construct_query където на свой ред има http повикване.

  2. Нося всички полета snake_case за комфорт.

  3. Е, декорацията logger.catch за красив и информативен изход за проследяване.

PS Не забравяйте да добавите маркера alphavantage локално към config.yml или да експортирате променливата на средата HORTON_SERVICE_APIKEY. Получаваме жетон тук.

CRUD клас

Ще имаме колекция от ценни книжа, за да съхраняваме мета информация за ценни книжа.

база данни/security.py

Според мен няма нужда да обяснявам нищо тук, а самият базов клас е доста прост.

get_app()

Нека добавим функция за създаване на обект на приложение app.py

Спойлер

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Засега ще имаме най-простото създаване на приложение, малко по-късно ще го разширим, но за да не ви караме да чакате, тук препратки към App-клас. Също така ви съветвам да разгледате класа на настройките, тъй като той отговаря за повечето настройки.

Основна част

Агент за събиране и поддържане на списък с ценни книжа

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

И така, първо получаваме обекта на приложението faust - това е доста просто. След това изрично декларираме тема за нашия агент... Тук си струва да споменем какво представлява, какъв е вътрешният параметър и как това може да бъде подредено по различен начин.

  1. Темите в kafka, ако искаме да знаем точната дефиниция, по-добре е да прочетем изключено. документ, или можете да прочетете абстрактен на хъба на руски, където всичко също е отразено доста точно :)

  2. Параметър вътрешен, описан доста добре в faust doc, ни позволява да конфигурираме темата директно в кода, разбира се, това означава параметрите, предоставени от разработчиците на faust, например: задържане, политика за задържане (по подразбиране изтриване, но можете да зададете компактен), брой дялове на тема (преградида направите, например, по-малко от световно значение приложения faust).

  3. По принцип агентът може да създаде управлявана тема с глобални стойности, но обичам да декларирам всичко изрично. Освен това някои параметри (например броя на дяловете или политиката за задържане) на темата в рекламата на агента не могат да бъдат конфигурирани.

    Ето как може да изглежда без ръчно дефиниране на темата:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Е, сега нека опишем какво ще направи нашият агент :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

И така, в началото на агента отваряме aiohttp сесия за заявки през нашия клиент. Така при стартиране на воркър, когато се стартира нашият агент, веднага ще се отвори сесия - една, за цялото време, докато работи воркърът (или няколко, ако промените параметъра едновременност от агент с единица по подразбиране).

След това следваме потока (поставяме съобщението в _, тъй като ние, в този агент, не се интересуваме от съдържанието) на съобщенията от нашата тема, ако те съществуват в текущото отместване, в противен случай нашият цикъл ще изчака пристигането им. Е, вътре в нашия цикъл регистрираме получаването на съобщението, получаваме списък с активни (get_securities връща само активни по подразбиране, вижте клиентския код) ценни книжа и го записваме в базата данни, като проверяваме дали има ценна книга със същия тикер и обмен в базата данни, ако има, тогава тя (хартията) просто ще бъде актуализирана.

Нека стартираме нашето творение!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Характеристики уеб компонент Няма да разглеждам фауст в статиите, така че поставихме съответния флаг.

В нашата команда за стартиране казахме на faust къде да търси обекта на приложението и какво да прави с него (стартира работно средство) с изходното ниво на информационния журнал. Получаваме следния изход:

Спойлер

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Живо е!!!

Нека разгледаме комплекта дялове. Както виждаме, беше създадена тема с името, което посочихме в кода, броят дялове по подразбиране (8, взето от topic_partitions - параметър на обект на приложение), тъй като не сме посочили индивидуална стойност за нашата тема (чрез дялове). На стартирания агент в работника се присвояват всичките 8 дяла, тъй като той е единственият, но това ще бъде обсъдено по-подробно в частта за клъстерирането.

Е, сега можем да отидем до друг терминален прозорец и да изпратим празно съобщение до нашата тема:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS използване @ показваме, че изпращаме съобщение до тема с име „collect_securities“.

В този случай съобщението отиде в дял 6 - можете да проверите това, като отидете на kafdrop on localhost:9000

Отивайки до прозореца на терминала с нашия работник, ще видим щастливо съобщение, изпратено чрез loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Можем също да разгледаме mongo (използвайки Robo3T или Studio3T) и да видим, че ценните книжа са в базата данни:

Аз не съм милиардер и затова се задоволяваме с първата опция за гледане.

Основни задачи за Faust, част II: Агенти и екипиОсновни задачи за Faust, част II: Агенти и екипи

Щастие и радост - първият агент е готов :)

Агент готов, да живее новият агент!

Да, господа, изминахме само 1/3 от пътя, подготвен от тази статия, но не се обезсърчавайте, защото сега ще бъде по-лесно.

Така че сега имаме нужда от агент, който събира мета информация и я поставя в документ за събиране:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Тъй като този агент ще обработва информация за конкретна ценна книга, трябва да посочим тикера (символа) на тази ценна книга в съобщението. За тази цел във фауст има Records — класове, които декларират схемата на съобщението в темата на агента.

В този случай нека да отидем на records.py и опишете как трябва да изглежда съобщението за тази тема:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Както може би се досещате, faust използва анотацията тип python, за да опише схемата на съобщението, поради което минималната версия, поддържана от библиотеката, е 3.6.

Да се ​​върнем към агента, да зададем типовете и да го добавим:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Както можете да видите, предаваме нов параметър със схема към метода за инициализация на темата - value_type. Освен това всичко следва същата схема, така че не виждам смисъл да се спирам на нещо друго.

Е, последният щрих е да добавите извикване към агента за събиране на мета информация към collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Използваме предварително обявената схема за съобщение. В този случай използвах метода .cast, тъй като не е необходимо да чакаме резултата от агента, но си струва да споменем, че начини изпрати съобщение до темата:

  1. cast - не блокира, защото не очаква резултат. Не можете да изпратите резултата в друга тема като съобщение.

  2. изпрати - не блокира, защото не очаква резултат. Можете да посочите агент в темата, към която да отиде резултатът.

  3. питай - чака резултат. Можете да посочите агент в темата, към която да отиде резултатът.

И така, това е всичко с агентите за днес!

Мечтаният отбор

Последното нещо, което обещах да напиша в тази част са командите. Както споменахме по-рано, командите във faust са обвивка около кликване. Всъщност faust просто прикачва нашата персонализирана команда към своя интерфейс, когато указва ключа -A

След обявените агенти в агенти.py добавете функция с декоратор app.commandизвикване на метода хвърлиха у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Така, ако извикаме списъка с команди, нашата нова команда ще бъде в него:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Можем да го използваме като всеки друг, така че нека рестартираме faust worker и да започнем пълноценно събиране на ценни книжа:

> faust -A horton.agents start-collect-securities

Какво ще стане след това?

В следващата част, като използваме останалите агенти като пример, ще разгледаме механизма за потъване за търсене на крайности в цените на затваряне на търговията за годината и стартирането на cron на агенти.

Това е всичко за днес! Благодаря за четенето :)

Код за тази част

Основни задачи за Faust, част II: Агенти и екипи

PS Под последната част ме попитаха за фауст и конфлуентна кафка (какви функции има конфлуентът?). Изглежда, че confluent е по-функционален по много начини, но факт е, че faust няма пълна клиентска поддръжка за confluent - това следва от описания на клиентските ограничения в док.

Източник: www.habr.com

Добавяне на нов коментар