Вазифаҳои замина дар бораи Фауст, Қисми II: Агентҳо ва дастаҳо

Вазифаҳои замина дар бораи Фауст, Қисми II: Агентҳо ва дастаҳо

Мундариҷа

  1. Қисми I: Муқаддима

  2. Қисми II: Агентҳо ва дастаҳо

Мо дар ин ҷо чӣ кор карда истодаем?

Ҳамин тавр, қисми дуюм. Тавре ки қаблан навишта шуда буд, дар он мо корҳои зеринро иҷро мекунем:

  1. Биёед як муштарии хурдеро барои alphavantage дар aiohttp бо дархостҳо барои нуқтаҳои ниҳоӣ ба мо нависед.

  2. Биёед агентеро эҷод кунем, ки маълумотро дар бораи коғазҳои қиматнок ва мета-маълумот дар бораи онҳо ҷамъоварӣ кунад.

Аммо, ин аст он чизе ки мо барои худи лоиҳа кор хоҳем кард ва дар робита ба таҳқиқоти фауст, мо мефаҳмем, ки чӣ гуна агентҳоеро, ки рӯйдодҳои ҷараёнро аз кафка коркард мекунанд ва инчунин чӣ гуна навиштани фармонҳоро (кликро пахш кунед), дар ҳолати мо - барои паёмҳои такони дастӣ ба мавзӯъе, ки агент назорат мекунад.

Омодагӣ

Мизоҷи AlphaVantage

Аввалан, биёед як муштарии хурди aiohttp барои дархостҳо ба alphavantage нависед.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Воқеан, аз он ҳама чиз равшан аст:

  1. API AlphaVantage хеле содда ва зебо тарҳрезӣ шудааст, аз ин рӯ ман қарор додам, ки ҳама дархостҳоро тавассути ин усул иҷро кунам construct_query дар он ҷо дар навбати худ занги http вуҷуд дорад.

  2. Ман тамоми майдонҳоро ба он меорам snake_case барои роҳат.

  3. Хуб, ороиши logger.catch барои баромади пайгирии зебо ва иттилоотӣ.

PS Фаромӯш накунед, ки аломати alphavantage ба таври маҳаллӣ ба config.yml илова кунед ё тағирёбандаи муҳити зистро содир кунед HORTON_SERVICE_APIKEY. Мо нишона мегирем дар ин ҷо.

Синфи CRUD

Мо коллексияи коғазҳои қиматнокро барои нигоҳ доштани мета маълумот дар бораи коғазҳои қиматнок хоҳем дошт.

database/security.py

Ба андешаи ман, дар ин ҷо ҳеҷ чизро шарҳ додан лозим нест ва худи синфи асосӣ хеле содда аст.

get_app()

Биёед функсияро барои сохтани объекти барнома дар app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Ҳоло мо соддатарин эҷоди барномаро дорем, каме дертар онро васеъ хоҳем кард, аммо барои он ки шуморо интизор нашавед, дар ин ҷо маълумотномаҳо ба App-синфи. Ман инчунин тавсия медиҳам, ки ба синфи танзимот нигаред, зеро он барои аксари танзимот масъул аст.

Қисми асосии он

Агент оид ба ҷамъоварӣ ва нигоҳдории рӯйхати коғазҳои қиматнок

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ҳамин тавр, аввал мо объекти барномаи faust-ро мегирем - ин хеле оддӣ аст. Баъдан, мо ба таври возеҳ мавзӯъро барои агенти худ эълон мекунем ... Дар ин ҷо бояд қайд кард, ки он чист, параметри дохилӣ чист ва чӣ гуна онро ба таври дигар ташкил кардан мумкин аст.

  1. Мавзӯъҳо дар кафка, агар бихоҳем таърифи дақиқро бидонем, хондан беҳтар аст хомӯш. ҳуҷҷат, ё шумо метавонед хонед маҷмӯа дар Habré бо забони русӣ, ки дар он ҳама чиз хеле дақиқ инъикос ёфтааст :)

  2. Параметри дохилӣ, ки дар faust doc хеле хуб тавсиф шудааст, ба мо имкон медиҳад, ки мавзӯъро мустақиман дар код танзим кунем, албатта ин маънои параметрҳоеро дорад, ки таҳиягарони faust пешниҳод кардаанд, масалан: нигоҳдорӣ, сиёсати нигоҳдорӣ (бо нобаёнӣ нест кардан, аммо шумо метавонед танзим кунед. зич), шумораи бахшҳо дар як мавзӯъ (қисмҳокардан, масалан, камтар аз ахамияти умумичахонй барномаҳои faust).

  3. Умуман, агент метавонад мавзӯи идорашавандаро бо арзишҳои глобалӣ эҷод кунад, аммо ман мехоҳам ҳама чизро ба таври возеҳ эълон кунам. Илова бар ин, баъзе параметрҳоро (масалан, шумораи қисмҳо ё сиёсати нигоҳдорӣ) мавзӯъ дар таблиғи агент танзим кардан мумкин нест.

    Ин аст он чизест, ки бидуни муайян кардани мавзӯъ ба таври дастӣ ба назар мерасад:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Хуб, ҳоло биёед тавсиф кунем, ки агенти мо чӣ кор хоҳад кард :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ҳамин тавр, дар оғози агент, мо барои дархостҳо тавассути муштарии худ сессияи aiohttp кушоем. Ҳамин тариқ, ҳангоми ба кор андохтани коргар, вақте ки агенти мо оғоз мешавад, дарҳол сеанс кушода мешавад - як, барои тамоми вақти коркунанда (ё якчанд, агар шумо параметрро тағир диҳед) ҳамзамон аз агенти дорои воҳиди пешфарз).

Баъдан, мо ҷараёнро пайгирӣ мекунем (мо паёмро дар _, зеро мо, дар ин агент, ба мундариҷа) паёмҳои мавзӯи мо аҳамият намедиҳем, агар онҳо дар ҷабҳаи ҷорӣ мавҷуд бошанд, вагарна давраи мо омадани онҳоро интизор мешавад. Хуб, дар дохили ҳалқаи мо, мо қабули паёмро сабт мекунем, рӯйхати коғазҳои қиматноки фаъолро мегирем (get_securities танҳо бо нобаёнӣ фаъол аст, рамзи муштариро бубинед) ва онро дар базаи маълумот захира карда, тафтиш мекунем, ки оё коғазҳои қиматнок бо ҳамон тикер вуҷуд дорад ва мубодила дар базаи маълумот, агар мавҷуд бошад, он (коғаз) танҳо нав карда мешавад.

Биёед эҷоди худро оғоз кунем!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Хусусиятҳои PS ҷузъи веб Ман дар мақолаҳо фаустро баррасӣ намекунам, бинобар ин мо парчами мувофиқро гузоштем.

Дар фармони оғози мо, мо ба faust гуфтем, ки объекти барномаро дар куҷо ҷустуҷӯ кардан лозим аст ва бо он чӣ бояд кард (коргарро оғоз кунед) бо сатҳи баромади иттилоот. Мо натиҷаи зеринро ба даст меорем:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ин зинда аст!!!

Биёед маҷмӯи тақсимотро бубинем. Тавре ки мо мебинем, мавзӯъ бо номе сохта шудааст, ки мо дар код таъин кардем, шумораи пешфарзҳои қисмҳо (8, аз мавзӯъ_қисмҳо - параметри объекти барнома), зеро мо барои мавзӯи худ арзиши инфиродӣ муайян накардаем (тавассути қисмҳо). Ба агенти ба кор андохташуда дар коргар ҳамаи 8 қисмат таъин карда мешавад, зеро он ягона аст, аммо ин дар қисмати кластеризатсия муфассалтар баррасӣ хоҳад шуд.

Хуб, ҳоло мо метавонем ба равзанаи дигари терминал равем ва ба мавзӯи худ паёми холӣ фиристем:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS истифода @ мо нишон медиҳем, ки мо ба мавзӯи бо номи "collect_securities" паём фиристода истодаем.

Дар ин ҳолат, паём ба қисмати 6 рафт - шумо метавонед инро тавассути рафтан ба kafdrop дар санҷед localhost:9000

Бо коргари худ ба равзанаи терминал рафта, мо паёми хушбахтеро мебинем, ки бо истифода аз loguru фиристода шудааст:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Мо инчунин метавонем ба mongo (бо истифода аз Robo3T ё Studio3T) назар кунем ва бубинем, ки коғазҳои қиматнок дар пойгоҳи додаҳо мавҷуданд:

Ман миллиардер нестам ва аз ин рӯ мо бо интихоби аввалини тамошо қаноатмандем.

Вазифаҳои замина дар бораи Фауст, Қисми II: Агентҳо ва дастаҳоВазифаҳои замина дар бораи Фауст, Қисми II: Агентҳо ва дастаҳо

Хушбахтӣ ва шодӣ - агенти аввал омода аст :)

Агент тайёр, зинда бод агенти нав!

Бале, ҷанобон, мо ҳамагӣ 1/3 роҳи омодакардаи ин мақоларо тай кардем, аммо рӯҳафтода нашавед, зеро акнун ин осонтар мешавад.

Ҳамин тавр, ҳоло ба мо агенте лозим аст, ки иттилооти метаро ҷамъоварӣ мекунад ва онро ба ҳуҷҷати ҷамъоварӣ мегузорад:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Азбаски ин агент маълумотро дар бораи коғазҳои қиматноки мушаххас коркард мекунад, мо бояд дар паём аломати (рамзи) ин коғазро нишон диҳем. Бо ин максад дар фауст мавчуданд сабтҳо — синфҳое, ки нақшаи паёмро дар мавзӯи агент эълон мекунанд.

Дар ин ҳолат, биёед ба records.py ва тасвир кунед, ки паёми ин мавзӯъ бояд чӣ гуна бошад:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Тавре ки шумо тахмин кардаед, faust барои тавсифи схемаи паём шарҳи навъи python-ро истифода мебарад, аз ин рӯ версияи ҳадди ақали аз ҷониби китобхона дастгирӣшаванда аст. 3.6.

Биёед ба агент баргардем, намудҳоро таъин кунед ва онро илова кунед:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Тавре ки шумо мебинед, мо параметри навро бо схема ба усули оғозкунии мавзӯъ мегузорем - value_type. Гузашта аз ин, ҳама чиз аз рӯи ҳамон нақша сурат мегирад, бинобар ин ман дар бораи чизи дигар чизе намебинам.

Хуб, ламси ниҳоӣ ин илова кардани занг ба агенти ҷамъоварии иттилоот барои collect_securitites аст:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Мо барои паём схемаи қаблан эълоншударо истифода мебарем. Дар ин ҳолат, ман усули .cast-ро истифода кардам, зеро ба мо лозим нест, ки натиҷаро аз агент интизор шавем, аммо бояд қайд кард, ки роҳҳои ба мавзӯъ паём фиристед:

  1. рехтан - блок намекунад, зеро он натиҷаро интизор нест. Шумо наметавонед натиҷаро ба мавзӯи дигар ҳамчун паём фиристед.

  2. ирсол - баста намешавад, зеро он натиҷаро интизор нест. Шумо метавонед агентеро дар мавзӯъе, ки натиҷа ба он меравад, муайян кунед.

  3. пурсед — натичаашро интизор аст. Шумо метавонед агентеро дар мавзӯъе, ки натиҷа ба он меравад, муайян кунед.

Ҳамин тавр, ин ҳама бо агентҳо барои имрӯз аст!

Дастаи орзуҳо

Охирин чизе, ки ман ваъда дода будам, ки дар ин бахш менависам, ин фармонҳост. Тавре ки қаблан зикр гардид, фармонҳо дар faust як печонидани клик мебошанд. Дар асл, faust ҳангоми муайян кардани калиди -A фармони фармоишии моро ба интерфейси худ замима мекунад

Пас аз он ки агентхои эълоншуда дар agents.py функсияро бо ороишгар илова кунед app.commandдаъват кардани усул данд у ҷамъоварии_амниятҳо:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ҳамин тариқ, агар мо рӯйхати фармонҳоро даъват кунем, фармони нави мо дар он хоҳад буд:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Мо метавонем онро мисли ҳар каси дигар истифода барем, аз ин рӯ биёед коргари фаустро бозоғоз кунем ва коллексияи мукаммали коғазҳои қиматнокро оғоз кунем:

> faust -A horton.agents start-collect-securities

Баъд чӣ мешавад?

Дар қисми оянда, бо истифода аз агентҳои боқимонда ҳамчун намуна, мо механизми таназзулро барои ҷустуҷӯи ифротӣ дар нархҳои пӯшидаи савдо дар сол ва оғози cron агентҳоро баррасӣ хоҳем кард.

Ин ҳама барои имрӯз аст! Ташаккур барои хондан :)

Рамзи ин қисм

Вазифаҳои замина дар бораи Фауст, Қисми II: Агентҳо ва дастаҳо

PS Дар қисми охир аз ман дар бораи кафкаи фауст ва омехта пурсиданд (конфлюент кадом хусусиятҳо дорад?). Чунин ба назар мерасад, ки confluent аз бисёр ҷиҳатҳо бештар функсионалӣ аст, аммо далел ин аст, ки faust дастгирии пурраи муштариро барои confluent надорад - ин аз он бармеояд тавсифи маҳдудиятҳои муштарӣ дар ҳуҷҷат.

Манбаъ: will.com

Илова Эзоҳ