Основни задаци о Фаусту, ИИ део: Агенти и тимови

Основни задаци о Фаусту, ИИ део: Агенти и тимови

Преглед садржаја

  1. Део И: Увод

  2. Део ИИ: Агенти и тимови

Шта ми радимо овде?

Дакле, тако, други део. Као што је раније написано, у њему ћемо урадити следеће:

  1. Хајде да напишемо мали клијент за алпхавантаге на аиохттп са захтевима за крајње тачке које су нам потребне.

  2. Хајде да направимо агента који ће прикупљати податке о хартијама од вредности и мета информације о њима.

Али, то је оно што ћемо урадити за сам пројекат, а у смислу фауст истраживања, научићемо како да напишемо агенте који обрађују стреам догађаје из кафке, као и како да напишемо команде (клик омотач), у нашем случају - за ручне пусх поруке на тему коју агент надгледа.

Обука

АлпхаВантаге Цлиент

Прво, хајде да напишемо мали аиохттп клијент за захтеве за алпхавантаге.

алпхавантаге.пи

Спојлер

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

У ствари, све је јасно из тога:

  1. АлпхаВантаге АПИ је прилично једноставно и лепо дизајниран, па сам одлучио да све захтеве упутим путем методе construct_query где заузврат постоји хттп позив.

  2. Доносим сва поља snake_case ради лакшег.

  3. Па, декорација логгер.цатцх за леп и информативан излаз за праћење.

ПС Не заборавите да додате алпхавантаге токен локално у цонфиг.имл, или извезите променљиву окружења HORTON_SERVICE_APIKEY. Добијамо жетон овде.

ЦРУД класа

Имаћемо колекцију хартија од вредности за чување мета информација о хартијама од вредности.

база података/безбедност.пи

По мом мишљењу, овде не треба ништа објашњавати, а сама основна класа је прилично једноставна.

гет_апп()

Хајде да додамо функцију за креирање објекта апликације у апп.пи

Спојлер

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

За сада ћемо имати најједноставнију израду апликације, мало касније ћемо је проширити, међутим, да не бисмо чекали, ево референце у Апп-класу. Такође препоручујем да погледате класу подешавања, пошто је она одговорна за већину подешавања.

Главни део

Агент за прикупљање и вођење листе хартија од вредности

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Дакле, прво добијамо објекат Фауст апликације - прилично је једноставно. Затим експлицитно декларишемо тему за нашег агента... Овде је вредно поменути шта је то, шта је интерни параметар и како се то може другачије уредити.

  1. Теме у кафки, ако желимо да знамо тачну дефиницију, боље је прочитати ван. документ, или можете читати компендијум на Хабреу на руском, где се све такође прилично тачно одражава :)

  2. Параметар интерни, који је прилично добро описан у фауст документу, омогућава нам да конфигуришемо тему директно у коду, наравно, то подразумева параметре које су обезбедили Фауст програмери, на пример: задржавање, политика задржавања (подразумевано брисање, али можете подесити компактан), број партиција по теми (резултатиучинити, на пример, мање од глобалног значаја апликације Фауст).

  3. Генерално, агент може да креира управљану тему са глобалним вредностима, међутим, ја волим да све експлицитно изјавим. Поред тога, неки параметри (на пример, број партиција или политика задржавања) теме у огласу агента не могу да се конфигуришу.

    Ево како би то могло да изгледа без ручног дефинисања теме:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Па, хајде да сада опишемо шта ће наш агент урадити :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Дакле, на почетку агента отварамо аиохттп сесију за захтеве преко нашег клијента. Дакле, при покретању радника, када се наш агент покрене, одмах ће се отворити сесија - једна, за све време рада радника (или неколико, ако промените параметар подударност од агента са подразумеваном јединицом).

Затим пратимо ток (стављамо поруку у _, пошто нас, у овом агенту, није брига за садржај) порука из наше теме, ако постоје на тренутном офсету, иначе ће наш циклус чекати њихов долазак. Па, унутар наше петље евидентирамо пријем поруке, добијамо листу активних (гет_сецурити враћа само активну подразумевано, види клијентски код) хартија од вредности и чувамо је у бази података, проверавамо да ли постоји хартија са истим тикером и размена у бази података, ако постоји, онда ће се она (папир) једноставно ажурирати.

Хајде да покренемо нашу креацију!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

ПС Феатурес веб компонента Фауста нећу разматрати у чланцима, па смо поставили одговарајућу заставу.

У нашој команди за покретање, рекли смо Фаусту где да тражи објекат апликације и шта да уради са њим (покрене радника) са нивоом излаза инфо дневника. Добијамо следећи излаз:

Спојлер

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Живо је!!!

Хајде да погледамо скуп партиција. Као што видимо, креирана је тема са именом које смо навели у коду, подразумеваним бројем партиција (8, преузето из топиц_партитионс - параметар објекта апликације), пошто нисмо навели појединачну вредност за нашу тему (преко партиција). Покренутом агенту у воркер-у је додељено свих 8 партиција, пошто је једини, али ће о томе бити детаљније у делу о кластеровању.

Па, сада можемо отићи до другог прозора терминала и послати празну поруку нашој теми:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

ПС користећи @ показујемо да шаљемо поруку теми под називом „цоллецт_сецуритиес“.

У овом случају, порука је отишла на партицију 6 - ово можете проверити тако што ћете отићи на кафдроп он localhost:9000

Одласком до прозора терминала са нашим радником, видећемо срећну поруку послату помоћу логуру:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Такође можемо да погледамо монго (користећи Робо3Т или Студио3Т) и видимо да су хартије од вредности у бази података:

Нисам милијардер и зато смо задовољни првом опцијом гледања.

Основни задаци о Фаусту, ИИ део: Агенти и тимовиОсновни задаци о Фаусту, ИИ део: Агенти и тимови

Срећа и радост - први агент је спреман :)

Агент спреман, живео нови агент!

Да, господо, прешли смо само 1/3 пута припремљеног овим чланком, али немојте се обесхрабрити, јер ће сада бити лакше.

Дакле, сада нам је потребан агент који прикупља мета информације и ставља их у документ за прикупљање:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Пошто ће овај агент обрадити информације о одређеној безбедности, потребно је да у поруци наведемо ознаку (симбол) овог обезбеђења. За ову сврху у Фаусту постоје Плоче — класе које декларишу шему порука у теми агента.

У овом случају, идемо на рецордс.пи и опишите како би порука за ову тему требало да изгледа:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Као што сте можда претпоставили, Фауст користи напомену типа питхон да опише шему поруке, због чега је минимална верзија коју библиотека подржава 3.6.

Вратимо се агенту, подесимо типове и додамо га:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Као што видите, методу иницијализације теме преносимо нови параметар са шемом - валуе_типе. Даље, све иде по истој шеми, тако да не видим смисла да се задржавам на било чему другом.

Па, последњи додир је додавање позива агенту за прикупљање мета информација за цоллецт_сецуриттес:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Користимо претходно најављену шему за поруку. У овом случају користио сам метод .цаст јер не треба да чекамо резултат од агента, али вреди напоменути да начине пошаљите поруку на тему:

  1. цаст - не блокира јер не очекује резултат. Не можете послати резултат на другу тему као поруку.

  2. послати - не блокира јер не очекује резултат. Можете одредити агента у теми на коју ће ићи резултат.

  3. питати - чека резултат. Можете одредити агента у теми на коју ће ићи резултат.

Дакле, то је све са агентима за данас!

Тим снова

Последње што сам обећао да ћу написати у овом делу су команде. Као што је раније поменуто, команде у Фаусту су омотач око клика. У ствари, фауст једноставно повезује нашу прилагођену команду свом интерфејсу када наведе -А кључ

Након што су најављени агенти у агенти.пи додајте функцију са декоратером апп.цоммандпозивање методе бацити у цоллецт_сецуритес:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Дакле, ако позовемо листу команди, наша нова команда ће бити у њој:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Можемо да га користимо као и било ко други, па хајде да поново покренемо фауст радника и започнемо пуну колекцију хартија од вредности:

> faust -A horton.agents start-collect-securities

Шта ће се даље дешавати?

У наредном делу, користећи преостале агенте као пример, размотрићемо механизам понора за тражење екстрема у ценама затварања трговања за годину и црон лансирање агената.

То је све за данас! Хвала за читање :)

Шифра за овај део

Основни задаци о Фаусту, ИИ део: Агенти и тимови

ПС У последњем делу су ме питали о Фаусту и конфлуентној кафки (које карактеристике има конфлуент?). Чини се да је конфлуент функционалнији на много начина, али чињеница је да Фауст нема потпуну клијентску подршку за конфлуент – ово следи из описи ограничења клијената у док.

Извор: ввв.хабр.цом

Додај коментар