เชซเซ‹เชธเซเชŸ, เชญเชพเช— II เชชเชฐ เชชเซƒเชทเซเช เชญเซ‚เชฎเชฟ เช•เชพเชฐเซเชฏเซ‹: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹

เชซเซ‹เชธเซเชŸ, เชญเชพเช— II เชชเชฐ เชชเซƒเชทเซเช เชญเซ‚เชฎเชฟ เช•เชพเชฐเซเชฏเซ‹: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹

เชธเชฎเชพเชตเชฟเชทเซเชŸเซ‹เชจเซเช‚ เช•เซ‹เชทเซเชŸเช•

  1. เชญเชพเช— I: เชชเชฐเชฟเชšเชฏ

  2. เชญเชพเช— II: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹

เช†เชชเชฃเซ‡ เช…เชนเซ€เช‚ เชถเซเช‚ เช•เชฐเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช?

เชคเซ‡เชฅเซ€, เชคเซ‡เชฅเซ€, เชฌเซ€เชœเซ‹ เชญเชพเช—. เช…เช—เชพเช‰ เชฒเช–เซเชฏเชพ เชฎเซเชœเชฌ, เชคเซ‡เชฎเชพเช‚ เช†เชชเชฃเซ‡ เชจเซ€เชšเซ‡ เชฎเซเชœเชฌ เช•เชฐเซ€เชถเซเช‚:

  1. เชšเชพเชฒเซ‹ aiohttp เชชเชฐ เช†เชฒเซเชซเชพเชตเซ‡เชจเซเชŸเซ‡เชœ เชฎเชพเชŸเซ‡ เชเช• เชจเชพเชจเซ‹ เช•เซเชฒเชพเชฏเช‚เชŸ เชฒเช–เซ€เช เช…เชจเซ‡ เช†เชชเชฃเชจเซ‡ เชœเชฐเซ‚เชฐเซ€ เชเชจเซเชกเชชเซ‹เชˆเชจเซเชŸเชจเซ€ เชตเชฟเชจเช‚เชคเซ€เช“ เชธเชพเชฅเซ‡.

  2. เชšเชพเชฒเซ‹ เชเช• เชเชœเชจเซเชŸ เชฌเชจเชพเชตเซ€เช เชœเซ‡ เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เช เชชเชฐเชจเซ‹ เชกเซ‡เชŸเชพ เช…เชจเซ‡ เชคเซ‡เชจเชพ เชชเชฐเชจเซ€ เชฎเซ‡เชŸเชพ เชฎเชพเชนเชฟเชคเซ€ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเชถเซ‡.

เชชเชฐเช‚เชคเซ, เช…เชฎเซ‡ เช† เชชเซเชฐเซ‹เชœเซ‡เช•เซเชŸ เชฎเชพเชŸเซ‡ เชœ เช•เชฐเซ€เชถเซเช‚, เช…เชจเซ‡ เชซเซ‰เชธเซเชŸ เชธเช‚เชถเซ‹เชงเชจเชจเซ€ เชฆเซเชฐเชทเซเชŸเชฟเช, เช…เชฎเซ‡ เช•เชพเชซเช•เชพเชฎเชพเช‚เชฅเซ€ เชธเซเชŸเซเชฐเซ€เชฎ เช‡เชตเซ‡เชจเซเชŸเซเชธ เชชเชฐ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชคเชพ เชเชœเชจเซเชŸเซ‹เชจเซ‡ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เชฒเช–เชตเชพ เชคเซ‡ เชถเซ€เช–เซ€เชถเซเช‚, เชคเซ‡เชฎเชœ เช…เชฎเชพเชฐเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚ เช†เชฆเซ‡เชถเซ‹ (เช•เซเชฒเชฟเช• เชฐเซ‡เชชเชฐ) เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เชฒเช–เชตเชพ - เชเชœเชจเซเชŸ เชฆเซเชตเชพเชฐเชพ เชฆเซ‡เช–เชฐเซ‡เช– เชฐเชพเช–เชคเชพ เชตเชฟเชทเชฏ เชชเชฐ เชฎเซ‡เชจเซเชฏเซเช…เชฒ เชชเซเชถ เชธเช‚เชฆเซ‡เชถเชพเช“ เชฎเชพเชŸเซ‡.

เชคเชพเชฒเซ€เชฎ

เช†เชฒเซเชซเชพเชตเซ‡เชจเซเชŸเซ‡เชœ เช•เซเชฒเชพเชฏเช‚เชŸ

เชชเซเชฐเชฅเชฎ, เชšเชพเชฒเซ‹ เช†เชฒเซเชซเชพเชตเชจเซเชŸเซ‡เชœเชจเซ€ เชตเชฟเชจเช‚เชคเซ€เช“ เชฎเชพเชŸเซ‡ เชเช• เชจเชพเชจเซ‹ aiohttp เช•เซเชฒเชพเชฏเช‚เชŸ เชฒเช–เซ€เช.

alphavantage.py

เชธเซเชชเซ‹เช‡เชฒเชฐ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

เช–เชฐเซ‡เช–เชฐ, เชคเซ‡เชฎเชพเช‚เชฅเซ€ เชฌเชงเซเช‚ เชธเซเชชเชทเซเชŸ เช›เซ‡:

  1. AlphaVantage API เชเช•เชฆเชฎ เชธเชฐเชณ เช…เชจเซ‡ เชธเซเช‚เชฆเชฐ เชฐเซ€เชคเซ‡ เชกเชฟเชเชพเช‡เชจ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซเช‚ เช›เซ‡, เชคเซ‡เชฅเซ€ เชฎเซ‡เช‚ เชชเชฆเซเชงเชคเชฟ เชฆเซเชตเชพเชฐเชพ เชฌเชงเซ€ เชตเชฟเชจเช‚เชคเซ€เช“ เช•เชฐเชตเชพเชจเซเช‚ เชจเช•เซเช•เซ€ เช•เชฐเซเชฏเซเช‚ construct_query เชœเซเชฏเชพเช‚ เชฌเชฆเชฒเชพเชฎเชพเช‚ เชเช• http เช•เซ‰เชฒ เช›เซ‡.

  2. เชนเซเช‚ เชฌเชงเชพ เช•เซเชทเซ‡เชคเซเชฐเซ‹ เชชเชฐ เชฒเชพเชตเซเช‚ เช›เซเช‚ snake_case เช†เชฐเชพเชฎ เชฎเชพเชŸเซ‡.

  3. เชธเชพเชฐเซเช‚, เชธเซเช‚เชฆเชฐ เช…เชจเซ‡ เชฎเชพเชนเชฟเชคเซ€เชชเซเชฐเชฆ เชŸเซเชฐเซ‡เชธเชฌเซ‡เช• เช†เช‰เชŸเชชเซเชŸ เชฎเชพเชŸเซ‡ logger.catch เชถเชฃเช—เชพเชฐ.

เชชเซ€.เชเชธ. config.yml เชฎเชพเช‚ เชธเซเชฅเชพเชจเชฟเช• เชฐเซ€เชคเซ‡ เช†เชฒเซเชซเชพเชตเชจเซเชŸเซ‡เชœ เชŸเซ‹เช•เชจ เช‰เชฎเซ‡เชฐเชตเชพเชจเซเช‚ เชญเซ‚เชฒเชถเซ‹ เชจเชนเซ€เช‚, เช…เชฅเชตเชพ เชชเชฐเซเชฏเชพเชตเชฐเชฃ เชตเซ‡เชฐเซ€เชเชฌเชฒเชจเซ€ เชจเชฟเช•เชพเชธ เช•เชฐเซ‹. HORTON_SERVICE_APIKEY. เช…เชฎเชจเซ‡ เชŸเซ‹เช•เชจ เชฎเชณเซ‡ เช›เซ‡ เช…เชนเซ€เช‚.

CRUD เชตเชฐเซเช—

เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เช เชตเชฟเชถเซ‡ เชฎเซ‡เชŸเชพ เชฎเชพเชนเชฟเชคเซ€ เชธเช‚เช—เซเชฐเชนเชฟเชค เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เช เช•เชฒเซ‡เช•เซเชถเชจ เชนเชถเซ‡.

database/security.py

เชฎเชพเชฐเชพ เชฎเชคเซ‡, เช…เชนเซ€เช‚ เช•เช‚เชˆเชชเชฃ เชธเชฎเชœเชพเชตเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เชจเชฅเซ€, เช…เชจเซ‡ เชฌเซ‡เช เช•เซเชฒเชพเชธ เชชเซ‹เชคเซ‡ เชเช•เชฆเชฎ เชธเชฐเชณ เช›เซ‡.

get_app()

เชฎเชพเช‚ เชเชชเซเชฒเชฟเช•เซ‡เชถเชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡ เชเช• เชซเช‚เช•เซเชถเชจ เช‰เชฎเซ‡เชฐเซ€เช app.py

เชธเซเชชเซ‹เช‡เชฒเชฐ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

เชนเชฎเชฃเชพเช‚ เชฎเชพเชŸเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชธเซŒเชฅเซ€ เชธเชฐเชณ เชเชชเซเชฒเชฟเช•เซ‡เชถเชจ เชฌเชจเชพเชตเชŸ เชนเชถเซ‡, เชฅเซ‹เชกเซ€ เชตเชพเชฐ เชชเช›เซ€ เช…เชฎเซ‡ เชคเซ‡เชจเซ‡ เชตเชฟเชธเซเชคเซƒเชค เช•เชฐเซ€เชถเซเช‚, เชœเซ‹ เช•เซ‡, เชคเชฎเชจเซ‡ เชฐเชพเชน เชœเซ‹เชตเซ€ เชจ เชชเชกเซ‡ เชคเซ‡ เชฎเชพเชŸเซ‡, เช…เชนเซ€เช‚ เชธเช‚เชฆเชฐเซเชญ เชเชช-เช•เซเชฒเชพเชธ เชฎเชพเชŸเซ‡. เชนเซเช‚ เชคเชฎเชจเซ‡ เชธเซ‡เชŸเชฟเช‚เช—เซเชธ เชตเชฐเซเช— เชชเชฐ เชเช• เชจเชœเชฐ เช•เชฐเชตเชพเชจเซ€ เชธเชฒเชพเชน เชชเชฃ เช†เชชเซเช‚ เช›เซเช‚, เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชฎเซ‹เชŸเชพเชญเชพเช—เชจเซ€ เชธเซ‡เชŸเชฟเช‚เช—เซเชธ เชฎเชพเชŸเซ‡ เชœเชตเชพเชฌเชฆเชพเชฐ เช›เซ‡.

เชฎเซเช–เซเชฏ เชถเชฐเซ€เชฐ

เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เชเชจเซ€ เชฏเชพเชฆเซ€ เชเช•เชคเซเชฐ เช•เชฐเชตเชพ เช…เชจเซ‡ เชœเชพเชณเชตเชตเชพ เชฎเชพเชŸเซ‡เชจเซ‹ เชเชœเชจเซเชŸ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เชคเซ‡เชฅเซ€, เชชเซเชฐเชฅเชฎ เช†เชชเชฃเซ‡ เชซเซ‹เชธเซเชŸ เชเชชเซเชฒเชฟเช•เซ‡เชถเชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เชฎเซ‡เชณเชตเซ€เช เช›เซ€เช - เชคเซ‡ เชเช•เชฆเชฎ เชธเชฐเชณ เช›เซ‡. เช†เช—เชณ, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชเชœเชจเซเชŸ เชฎเชพเชŸเซ‡ เชเช• เชตเชฟเชทเชฏ เชธเซเชชเชทเซเชŸเชชเชฃเซ‡ เชœเชพเชนเซ‡เชฐ เช•เชฐเซ€เช เช›เซ€เช... เช…เชนเซ€เช‚ เชคเซ‡ เชถเซเช‚ เช›เซ‡, เช†เช‚เชคเชฐเชฟเช• เชชเชฐเชฟเชฎเชพเชฃ เชถเซเช‚ เช›เซ‡ เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เช…เชฒเช— เชฐเซ€เชคเซ‡ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เช—เซ‹เช เชตเซ€ เชถเช•เชพเชฏ เชคเซ‡ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเชตเซ‹ เชฏเซ‹เช—เซเชฏ เช›เซ‡.

  1. เช•เชพเชซเช•เชพเชจเชพ เชตเชฟเชทเชฏเซ‹, เชœเซ‹ เช†เชชเชฃเซ‡ เชšเซ‹เช•เซเช•เชธ เชตเซเชฏเชพเช–เซเชฏเชพ เชœเชพเชฃเชตเซ€ เชนเซ‹เชฏ, เชคเซ‹ เชคเซ‡ เชตเชพเช‚เชšเชตเซเช‚ เชตเชงเซ เชธเชพเชฐเซเช‚ เช›เซ‡ เชฌเช‚เชง. เชฆเชธเซเชคเชพเชตเซ‡เชœ, เช…เชฅเชตเชพ เชคเชฎเซ‡ เชตเชพเช‚เชšเซ€ เชถเช•เซ‹ เช›เซ‹ เชธเช‚เช•เชฒเชจ เชฐเชถเชฟเชฏเชจเชฎเชพเช‚ เชนเซ‡เชฌเซเชฐเซ‡ เชชเชฐ, เชœเซเชฏเชพเช‚ เชฌเชงเซเช‚ เชชเชฃ เชเช•เชฆเชฎ เชธเชšเซ‹เชŸ เชฐเซ€เชคเซ‡ เชชเซเชฐเชคเชฟเชฌเชฟเช‚เชฌเชฟเชค เชฅเชพเชฏ เช›เซ‡ :)

  2. เช†เช‚เชคเชฐเชฟเช• เชชเชฐเชฟเชฎเชพเชฃ, เชซเซ‰เชธเซเชŸ เชกเซ‰เช•เชฎเชพเช‚ เช–เซ‚เชฌ เชธเชพเชฐเซ€ เชฐเซ€เชคเซ‡ เชตเชฐเซเชฃเชตเซ‡เชฒ เช›เซ‡, เชœเซ‡ เช…เชฎเชจเซ‡ เช•เซ‹เชกเชฎเชพเช‚ เชธเซ€เชงเชพ เชœ เชตเชฟเชทเชฏเชจเซ‡ เช—เซ‹เช เชตเชตเชพเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ‡ เช›เซ‡, เช…เชฒเชฌเชคเซเชค, เช†เชจเซ‹ เช…เชฐเซเชฅ เชซเซ‰เชธเซเชŸ เชกเซ‡เชตเชฒเชชเชฐเซเชธ เชฆเซเชตเชพเชฐเชพ เชชเซเชฐเชฆเชพเชจ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡เชฒเชพ เชชเชฐเชฟเชฎเชพเชฃเซ‹ เช›เซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡: เชฐเซ€เชŸเซ‡เชจเซเชถเชจ, เชฐเซ€เชŸเซ‡เชจเซเชถเชจ เชชเซ‰เชฒเชฟเชธเซ€ (เชกเชฟเชซเซ‰เชฒเซเชŸ เชฐเซ‚เชชเซ‡ เช•เชพเชขเซ€ เชจเชพเช–เซ‹, เชชเชฐเช‚เชคเซ เชคเชฎเซ‡ เชธเซ‡เชŸ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹. เช•เซ‹เชฎเซเชชเซ‡เช•เซเชŸ), เชตเชฟเชทเชฏ เชฆเซ€เช  เชชเชพเชฐเซเชŸเซ€เชถเชจเซ‹เชจเซ€ เชธเช‚เช–เซเชฏเชพ (เชชเชพเชฐเซเชŸเซ€เชถเชจเซ‹เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เช•เชฐเชคเชพเช‚ เช“เช›เซเช‚ เชตเซˆเชถเซเชตเชฟเช• เชฎเชนเชคเซเชต เชเชชเซเชฒเชฟเช•เซ‡เชถเชจเซเชธ เชซเซ‹เชธเซเชŸ).

  3. เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡, เชเชœเชจเซเชŸ เชตเซˆเชถเซเชตเชฟเช• เชฎเซ‚เชฒเซเชฏเซ‹ เชธเชพเชฅเซ‡ เชฎเซ‡เชจเซ‡เชœ เช•เชฐเซ‡เชฒ เชตเชฟเชทเชฏ เชฌเชจเชพเชตเซ€ เชถเช•เซ‡ เช›เซ‡, เชœเซ‹ เช•เซ‡, เชนเซเช‚ เชฌเชงเซเช‚ เชธเซเชชเชทเซเชŸเชชเชฃเซ‡ เชœเชพเชนเซ‡เชฐ เช•เชฐเชตเชพเชจเซเช‚ เชชเชธเช‚เชฆ เช•เชฐเซเช‚ เช›เซเช‚. เชตเชงเซเชฎเชพเช‚, เชเชœเชจเซเชŸเชจเซ€ เชœเชพเชนเซ‡เชฐเชพเชคเชฎเชพเช‚ เชตเชฟเชทเชฏเชจเชพ เช•เซ‡เชŸเชฒเชพเช• เชชเชฐเชฟเชฎเชพเชฃเซ‹ (เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชชเชพเชฐเซเชŸเซ€เชถเชจเชจเซ€ เชธเช‚เช–เซเชฏเชพ เช…เชฅเชตเชพ เชฐเซ€เชŸเซ‡เชจเซเชถเชจ เชชเซ‹เชฒเชฟเชธเซ€) เช—เซ‹เช เชตเซ€ เชถเช•เชพเชคเชพ เชจเชฅเซ€.

    เชตเชฟเชทเชฏเชจเซ‡ เชฎเซ‡เชจเซเชฏเซเช…เชฒเซ€ เชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เช•เชฐเซเชฏเชพ เชตเชฟเชจเชพ เชคเซ‡ เช•เซ‡เชตเซ‹ เชฆเซ‡เช–เชพเชˆ เชถเช•เซ‡ เช›เซ‡ เชคเซ‡ เช…เชนเซ€เช‚ เช›เซ‡:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เชธเชพเชฐเซเช‚, เชนเชตเซ‡ เชšเชพเชฒเซ‹ เชตเชฐเซเชฃเชจ เช•เชฐเซ€เช เช•เซ‡ เช…เชฎเชพเชฐเซ‹ เชเชœเชจเซเชŸ เชถเซเช‚ เช•เชฐเชถเซ‡ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

เชคเซ‡เชฅเซ€, เชเชœเชจเซเชŸเชจเซ€ เชถเชฐเซ‚เช†เชคเชฎเชพเช‚, เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เช•เซเชฒเชพเชฏเชจเซเชŸ เชฆเซเชตเชพเชฐเชพ เชตเชฟเชจเช‚เชคเซ€เช“ เชฎเชพเชŸเซ‡ aiohttp เชธเชคเซเชฐ เช–เซ‹เชฒเซ€เช เช›เซ€เช. เช†เชฎ, เช•เชพเชฐเซเชฏเช•เชฐ เชถเชฐเซ‚ เช•เชฐเชคเซ€ เชตเช–เชคเซ‡, เชœเซเชฏเชพเชฐเซ‡ เช…เชฎเชพเชฐเซ‹ เชเชœเชจเซเชŸ เชฒเซ‰เชจเซเชš เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡, เชคเซเชฏเชพเชฐเซ‡ เชคเชฐเชค เชœ เชเช• เชธเชคเซเชฐ เช–เซ‹เชฒเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡ - เชเช•, เช•เชพเชฐเซเชฏเช•เชฐเชจเชพ เชธเช‚เชชเซ‚เชฐเซเชฃ เชธเชฎเชฏ เชฎเชพเชŸเซ‡ (เช…เชฅเชตเชพ เช…เชจเซ‡เช•, เชœเซ‹ เชคเชฎเซ‡ เชชเชฐเชฟเชฎเชพเชฃ เชฌเชฆเชฒเซ‹ เช›เซ‹ เชธเช‚เชฎเชคเชฟ เชกเชฟเชซเซ‹เชฒเซเชŸ เชฏเซเชจเชฟเชŸ เชธเชพเชฅเซ‡ เชเชœเชจเซเชŸ เชชเชพเชธเซ‡เชฅเซ€).

เช†เช—เชณ, เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎเชจเซ‡ เช…เชจเซเชธเชฐเซ€เช เช›เซ€เช (เช…เชฎเซ‡ เชธเช‚เชฆเซ‡เชถ เชฎเซ‚เช•เซ€เช เช›เซ€เช _, เช•เชพเชฐเชฃ เช•เซ‡ เช…เชฎเซ‡, เช† เชเชœเชจเซเชŸเชฎเชพเช‚, เช…เชฎเชพเชฐเชพ เชตเชฟเชทเชฏเชจเชพ เชธเช‚เชฆเซ‡เชถเชพเช“เชจเซ€ เชธเชพเชฎเช—เซเชฐเซ€เชจเซ€ เช•เชพเชณเชœเซ€ เชฒเซ‡เชคเชพ เชจเชฅเซ€, เชœเซ‹ เชคเซ‡เช“ เชตเชฐเซเชคเชฎเชพเชจ เช‘เชซเชธเซ‡เชŸ เชชเชฐ เช…เชธเซเชคเชฟเชคเซเชตเชฎเชพเช‚ เช›เซ‡, เช…เชจเซเชฏเชฅเชพ เช…เชฎเชพเชฐเซเช‚ เชšเช•เซเชฐ เชคเซ‡เชฎเชจเชพ เช†เช—เชฎเชจเชจเซ€ เชฐเชพเชน เชœเซ‹เชถเซ‡. เช เซ€เช• เช›เซ‡, เช…เชฎเชพเชฐเชพ เชฒเซ‚เชชเชจเซ€ เช…เช‚เชฆเชฐ, เช…เชฎเซ‡ เชธเช‚เชฆเซ‡เชถเชจเซ€ เชฐเชธเซ€เชฆเชจเซ‡ เชฒเซ‰เช— เช•เชฐเซ€เช เช›เซ€เช, เชธเช•เซเชฐเชฟเชฏ (get_securities เชฐเชฟเชŸเชฐเซเชจ เชฎเชพเชคเซเชฐ เชกเชฟเชซเซ‰เชฒเซเชŸ เชฐเซ‚เชชเซ‡ เชธเช•เซเชฐเชฟเชฏ เชฅเชพเชฏ เช›เซ‡, เช•เซเชฒเชพเชฏเช‚เชŸ เช•เซ‹เชก เชœเซเช“) เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เชเชจเซ€ เชธเซ‚เชšเชฟ เชฎเซ‡เชณเชตเซ€เช เช›เซ€เช เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เชกเซ‡เชŸเชพเชฌเซ‡เชเชฎเชพเช‚ เชธเชพเชšเชตเซ€เช เช›เซ€เช, เชคเซ‡ เชœ เชŸเชฟเช•เชฐ เชธเชพเชฅเซ‡ เช•เซ‹เชˆ เชธเซเชฐเช•เซเชทเชพ เช›เซ‡ เช•เซ‡ เช•เซ‡เชฎ เชคเซ‡ เชคเชชเชพเชธเซ€เช เช›เซ€เช เช…เชจเซ‡ เชกเซ‡เชŸเชพเชฌเซ‡เชเชฎเชพเช‚ เชตเชฟเชจเชฟเชฎเชฏ , เชœเซ‹ เชคเซเชฏเชพเช‚ เชนเซ‹เชฏ, เชคเซ‹ เชคเซ‡ (เช•เชพเช—เชณ) เชซเช•เซเชค เช…เชชเชกเซ‡เชŸ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡.

เชšเชพเชฒเซ‹ เช…เชฎเชพเชฐเซ€ เชฐเชšเชจเชพ เชถเชฐเซ‚ เช•เชฐเซ€เช!

> docker-compose up -d
... ะ—ะฐะฟัƒัะบ ะบะพะฝั‚ะตะนะฝะตั€ะพะฒ ...
> faust -A horton.agents worker --without-web -l info

เชชเซ€.เชเชธ. เชถเช•เซเชฏเชคเชพเช“ เชตเซ‡เชฌ เช˜เชŸเช• เชนเซเช‚ เชฒเซ‡เช–เซ‹เชฎเชพเช‚ เชซเซ‹เชธเซเชŸเชจเซ‡ เชงเซเชฏเชพเชจเชฎเชพเช‚ เชฒเชˆเชถ เชจเชนเซ€เช‚, เชคเซ‡เชฅเซ€ เช…เชฎเซ‡ เชฏเซ‹เช—เซเชฏ เชงเซเชตเชœ เชธเซ‡เชŸ เช•เชฐเซเชฏเซ‹ เช›เซ‡.

เช…เชฎเชพเชฐเชพ เชฒเซ‹เชจเซเชš เช•เชฎเชพเชจเซเชกเชฎเชพเช‚, เช…เชฎเซ‡ เชฎเชพเชนเชฟเชคเซ€ เชฒเซ‹เช— เช†เช‰เชŸเชชเซเชŸ เชธเซเชคเชฐ เชธเชพเชฅเซ‡ เชเชชเซเชฒเซ€เช•เซ‡เชถเชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เช•เซเชฏเชพเช‚ เชœเซ‹เชตเซเช‚ เช…เชจเซ‡ เชคเซ‡เชจเซ€ เชธเชพเชฅเซ‡ เชถเซเช‚ เช•เชฐเชตเซเช‚ (เชเช• เช•เชพเชฐเซเชฏเช•เชฐเชจเซ‡ เชฒเซ‹เช‚เชš เช•เชฐเซ‹) เชซเซ‹เชธเซเชŸ เชœเชฃเชพเชตเซเชฏเซเช‚. เช…เชฎเชจเซ‡ เชจเซ€เชšเซ‡เชจเซเช‚ เช†เช‰เชŸเชชเซเชŸ เชฎเชณเซ‡ เช›เซ‡:

เชธเซเชชเซ‹เช‡เชฒเชฐ

โ”Œฦ’aยตSโ€  v1.10.4โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ id          โ”‚ horton                                            โ”‚
โ”‚ transport   โ”‚ [URL('kafka://localhost:9092')]                   โ”‚
โ”‚ store       โ”‚ memory:                                           โ”‚
โ”‚ log         โ”‚ -stderr- (info)                                   โ”‚
โ”‚ pid         โ”‚ 1271262                                           โ”‚
โ”‚ hostname    โ”‚ host-name                                         โ”‚
โ”‚ platform    โ”‚ CPython 3.8.2 (Linux x86_64)                      โ”‚
โ”‚ drivers     โ”‚                                                   โ”‚
โ”‚   transport โ”‚ aiokafka=1.1.6                                    โ”‚
โ”‚   web       โ”‚ aiohttp=3.6.2                                     โ”‚
โ”‚ datadir     โ”‚ /path/to/project/horton-data                      โ”‚
โ”‚ appdir      โ”‚ /path/to/project/horton-data/v1                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...

โ”ŒTopic Partition Setโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ topic                      โ”‚ partitions โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ collect_securities         โ”‚ {0-7}      โ”‚
โ”‚ horton-__assignor-__leader โ”‚ {0}        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 

เชคเซ‡ เชœเซ€เชตเช‚เชค เช›เซ‡ !!!

เชšเชพเชฒเซ‹ เชชเชพเชฐเซเชŸเซ€เชถเชจ เชธเซ‡เชŸ เชœเซ‹เชˆเช. เชœเซ‡เชฎ เช†เชชเชฃเซ‡ เชœเซ‹เชˆ เชถเช•เซ€เช เช›เซ€เช, เช•เซ‹เชกเชฎเชพเช‚ เช…เชฎเซ‡ เชจเชฟเชฏเซเช•เซเชค เช•เชฐเซ‡เชฒเชพ เชจเชพเชฎ เชธเชพเชฅเซ‡ เชเช• เชตเชฟเชทเชฏ เชฌเชจเชพเชตเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซ‹ เชนเชคเซ‹, เชชเชพเชฐเซเชŸเซ€เชถเชจเซ‹เชจเซ€ เชกเชฟเชซเซ‹เชฒเซเชŸ เชธเช‚เช–เซเชฏเชพ (8, เชฎเชพเช‚เชฅเซ€ เชฒเซ‡เชตเชพเชฎเชพเช‚ เช†เชตเซ€ เชนเชคเซ€. topic_partitions - เชเชชเซเชฒเชฟเช•เซ‡เชถเชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เชชเซ‡เชฐเชพเชฎเซ€เชŸเชฐ), เช•เชพเชฐเชฃ เช•เซ‡ เช…เชฎเซ‡ เช…เชฎเชพเชฐเชพ เชตเชฟเชทเชฏ เชฎเชพเชŸเซ‡ เชตเซเชฏเช•เซเชคเชฟเช—เชค เชฎเซ‚เชฒเซเชฏเชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเซเชฏเซ‹ เชจเชฅเซ€ (เชชเชพเชฐเซเชŸเซ€เชถเชจเซ‹ เชฆเซเชตเชพเชฐเชพ). เชตเชฐเซเช•เชฐเชฎเชพเช‚ เชฒเซ‹เช‚เชš เช•เชฐเชพเชฏเซ‡เชฒ เชเชœเชจเซเชŸเชจเซ‡ เชคเชฎเชพเชฎ 8 เชชเชพเชฐเซเชŸเซ€เชถเชจเซ‹ เชธเซ‹เช‚เชชเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเชพ เช›เซ‡, เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชเช•เชฎเชพเชคเซเชฐ เช›เซ‡, เชชเชฐเช‚เชคเซ เช•เซเชฒเชธเซเชŸเชฐเชฟเช‚เช— เชตเชฟเชถเซ‡เชจเชพ เชญเชพเช—เชฎเชพเช‚ เช†เชจเซ€ เชตเชงเซ เชตเชฟเช—เชคเชตเชพเชฐ เชšเชฐเซเชšเชพ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡.

เชธเชพเชฐเซเช‚, เชนเชตเซ‡ เช†เชชเชฃเซ‡ เชฌเซ€เชœเซ€ เชŸเชฐเซเชฎเชฟเชจเชฒ เชตเชฟเชจเซเชกเซ‹ เชชเชฐ เชœเชˆ เชถเช•เซ€เช เช›เซ€เช เช…เชจเซ‡ เช…เชฎเชพเชฐเชพ เชตเชฟเชทเชฏ เชชเชฐ เช–เชพเชฒเซ€ เชธเช‚เชฆเซ‡เชถ เชฎเซ‹เช•เชฒเซ€ เชถเช•เซ€เช เช›เซ€เช:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

เชชเซ€.เชเชธ. เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ @ เช…เชฎเซ‡ เชฌเชคเชพเชตเซ€เช เช›เซ€เช เช•เซ‡ เช…เชฎเซ‡ โ€œcollect_securitiesโ€ เชจเชพเชฎเชจเชพ เชตเชฟเชทเชฏ เชชเชฐ เชธเช‚เชฆเซ‡เชถ เชฎเซ‹เช•เชฒเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช.

เช† เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชธเช‚เชฆเซ‡เชถ เชชเชพเชฐเซเชŸเซ€เชถเชจ 6 เชชเชฐ เช—เชฏเซ‹ เชนเชคเซ‹ - เชคเชฎเซ‡ kafdrop on เชชเชฐ เชœเชˆเชจเซ‡ เช† เชคเชชเชพเชธเซ€ เชถเช•เซ‹ เช›เซ‹ localhost:9000

เช…เชฎเชพเชฐเชพ เช•เชพเชฐเซเชฏเช•เชฐ เชธเชพเชฅเซ‡ เชŸเชฐเซเชฎเชฟเชจเชฒ เชตเชฟเชจเซเชกเซ‹ เชชเชฐ เชœเชˆเชจเซ‡, เช…เชฎเซ‡ เชฒเซ‹เช—เซเชฐเซเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชฎเซ‹เช•เชฒเซ‡เชฒ เช–เซเชถ เชธเช‚เชฆเซ‡เชถ เชœเซ‹เชถเซเช‚:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

เช…เชฎเซ‡ เชฎเซ‹เช‚เช—เซ‹ (Robo3T เช…เชฅเชตเชพ Studio3T เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡) เชฎเชพเช‚ เชชเชฃ เชœเซ‹เชˆ เชถเช•เซ€เช เช›เซ€เช เช…เชจเซ‡ เชœเซ‹เชˆ เชถเช•เซ€เช เช›เซ€เช เช•เซ‡ เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เช เชกเซ‡เชŸเชพเชฌเซ‡เชเชฎเชพเช‚ เช›เซ‡:

เชนเซเช‚ เช…เชฌเชœเซ‹เชชเชคเชฟ เชจเชฅเซ€, เช…เชจเซ‡ เชคเซ‡เชฅเซ€ เช…เชฎเซ‡ เชชเซเชฐเชฅเชฎ เชœเซ‹เชตเชพเชจเชพ เชตเชฟเช•เชฒเซเชชเชฅเซ€ เชธเช‚เชคเซเชทเซเชŸ เช›เซ€เช.

เชซเซ‹เชธเซเชŸ, เชญเชพเช— II เชชเชฐ เชชเซƒเชทเซเช เชญเซ‚เชฎเชฟ เช•เชพเชฐเซเชฏเซ‹: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹เชซเซ‹เชธเซเชŸ, เชญเชพเช— II เชชเชฐ เชชเซƒเชทเซเช เชญเซ‚เชฎเชฟ เช•เชพเชฐเซเชฏเซ‹: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹

เชธเซเช– เช…เชจเซ‡ เช†เชจเช‚เชฆ - เชชเซเชฐเชฅเชฎ เชเชœเชจเซเชŸ เชคเซˆเชฏเชพเชฐ เช›เซ‡ :)

เชเชœเชจเซเชŸ เชคเซˆเชฏเชพเชฐ, เชจเชตเชพ เชเชœเชจเซเชŸ เชฒเชพเช‚เชฌเซเช‚ เชœเซ€เชตเซ‹!

เชนเชพ, เชธเชœเซเชœเชจเซ‹, เช…เชฎเซ‡ เช† เชฒเซ‡เช– เชฆเซเชตเชพเชฐเชพ เชคเซˆเชฏเชพเชฐ เช•เชฐเซ‡เชฒเชพ เชฎเชพเชฐเซเช—เชจเซ‹ เชฎเชพเชคเซเชฐ 1/3 เชญเชพเช— เช†เชตเชฐเซ€ เชฒเซ€เชงเซ‹ เช›เซ‡, เชชเชฐเช‚เชคเซ เชจเชฟเชฐเชพเชถ เชฅเชถเซ‹ เชจเชนเซ€เช‚, เช•เชพเชฐเชฃ เช•เซ‡ เชนเชตเซ‡ เชคเซ‡ เชธเชฐเชณ เชฌเชจเชถเซ‡.

เชคเซ‡เชฅเซ€ เชนเชตเซ‡ เช…เชฎเชจเซ‡ เชเช• เชเชœเชจเซเชŸเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เชœเซ‡ เชฎเซ‡เชŸเชพ เชฎเชพเชนเชฟเชคเซ€ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ‡ เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เชธเช‚เช—เซเชฐเชน เชฆเชธเซเชคเชพเชตเซ‡เชœเชฎเชพเช‚ เชฎเซ‚เช•เซ‡:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

เช•เชพเชฐเชฃ เช•เซ‡ เช† เชเชœเชจเซเชŸ เชšเซ‹เช•เซเช•เชธ เชธเซเชฐเช•เซเชทเชพ เชตเชฟเชถเซ‡เชจเซ€ เชฎเชพเชนเชฟเชคเซ€ เชชเชฐ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชถเซ‡, เช…เชฎเชพเชฐเซ‡ เชธเช‚เชฆเซ‡เชถเชฎเชพเช‚ เช† เชธเซเชฐเช•เซเชทเชพเชจเซเช‚ เชŸเซ€เช•เชฐ (เชชเซเชฐเชคเซ€เช•) เชฆเชฐเซเชถเชพเชตเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡. เชซเซ‹เชธเซเชŸเชฎเชพเช‚ เช† เชนเซ‡เชคเซ เชฎเชพเชŸเซ‡ เชคเซเชฏเชพเช‚ เช›เซ‡ เชฐเซ‡เช•เซ‹เชฐเซเชกเซเชธ โ€” เชตเชฐเซเช—เซ‹ เช•เซ‡ เชœเซ‡ เชเชœเชจเซเชŸ เชตเชฟเชทเชฏเชฎเชพเช‚ เชธเช‚เชฆเซ‡เชถ เชฏเซ‹เชœเชจเชพ เชœเชพเชนเซ‡เชฐ เช•เชฐเซ‡ เช›เซ‡.

เช† เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชšเชพเชฒเซ‹ เชœเชˆเช records.py เช…เชจเซ‡ เช† เชตเชฟเชทเชฏ เชฎเชพเชŸเซ‡เชจเซ‹ เชธเช‚เชฆเซ‡เชถ เช•เซ‡เชตเซ‹ เชนเซ‹เชตเซ‹ เชœเซ‹เชˆเช เชคเซ‡เชจเซเช‚ เชตเชฐเซเชฃเชจ เช•เชฐเซ‹:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

เชœเซ‡เชฎ เชคเชฎเซ‡ เช…เชจเซเชฎเชพเชจ เช•เชฐเซเชฏเซเช‚ เชนเชถเซ‡, เชซเซ‰เชธเซเชŸ เชฎเซ‡เชธเซ‡เชœ เชธเซเช•เซ€เชฎเชพเชจเซเช‚ เชตเชฐเซเชฃเชจ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชชเชพเชฏเชฅเซ‹เชจ เชŸเชพเช‡เชช เชเชจเซ‹เชŸเซ‡เชถเชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ‡ เช›เซ‡, เชคเซ‡เชฅเซ€ เชœ เชฒเชพเช‡เชฌเซเชฐเซ‡เชฐเซ€ เชฆเซเชตเชพเชฐเชพ เชธเชชเซ‹เชฐเซเชŸเซ‡เชก เชจเซเชฏเซ‚เชจเชคเชฎ เชธเช‚เชธเซเช•เชฐเชฃ 3.6.

เชšเชพเชฒเซ‹ เชเชœเชจเซเชŸ เชชเชฐ เชชเชพเช›เชพ เชœเชˆเช, เชชเซเชฐเช•เชพเชฐเซ‹ เชธเซ‡เชŸ เช•เชฐเซ€เช เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เช‰เชฎเซ‡เชฐเซ€เช:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

เชœเซ‡เชฎ เชคเชฎเซ‡ เชœเซ‹เชˆ เชถเช•เซ‹ เช›เซ‹, เช…เชฎเซ‡ เชตเชฟเชทเชฏเชจเซ€ เชถเชฐเซ‚เช†เชคเชจเซ€ เชชเชฆเซเชงเชคเชฟ - เชฎเซ‚เชฒเซเชฏ_เชชเซเชฐเช•เชพเชฐ เชฎเชพเชŸเซ‡ เชธเซเช•เซ€เชฎ เชธเชพเชฅเซ‡ เชเช• เชจเชตเซเช‚ เชชเชฐเชฟเชฎเชพเชฃ เชชเชธเชพเชฐ เช•เชฐเซ€เช เช›เซ€เช. เช†เช—เชณ, เชฌเชงเซเช‚ เชธเชฎเชพเชจ เชฏเซ‹เชœเชจเชพเชจเซ‡ เช…เชจเซเชธเชฐเซ‡ เช›เซ‡, เชคเซ‡เชฅเซ€ เชฎเชจเซ‡ เช…เชจเซเชฏ เช•เช‚เชˆเชชเชฃ เชชเชฐ เชงเซเชฏเชพเชจ เช†เชชเชตเชพเชจเซ‹ เช•เซ‹เชˆ เชฎเซเชฆเซเชฆเซ‹ เชฆเซ‡เช–เชพเชคเซ‹ เชจเชฅเซ€.

เช เซ€เช• เช›เซ‡, เช…เช‚เชคเชฟเชฎ เชธเซเชชเชฐเซเชถ เช เชฎเซ‡เชŸเชพ เชฎเชพเชนเชฟเชคเซ€ เชธเช‚เช—เซเชฐเชน เชเชœเชจเซเชŸเชจเซ‡ เชเช•เชคเซเชฐเชฟเชค_เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเชพเช‡เชŸเซเชธ เชฎเชพเชŸเซ‡ เช•เซ‰เชฒ เช‰เชฎเซ‡เชฐเชตเชพเชจเซ‹ เช›เซ‡:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

เช…เชฎเซ‡ เชธเช‚เชฆเซ‡เชถ เชฎเชพเชŸเซ‡ เช…เช—เชพเช‰ เชœเชพเชนเซ‡เชฐ เช•เชฐเซ‡เชฒเซ€ เชธเซเช•เซ€เชฎเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เช›เซ€เช. เช† เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชฎเซ‡เช‚ .cast เชชเชฆเซเชงเชคเชฟเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹ เช•เชพเชฐเชฃ เช•เซ‡ เช…เชฎเชพเชฐเซ‡ เชเชœเชจเซเชŸเชจเชพ เชชเชฐเชฟเชฃเชพเชฎเชจเซ€ เชฐเชพเชน เชœเซ‹เชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เชจเชฅเซ€, เชชเชฐเช‚เชคเซ เชคเซ‡ เช‰เชฒเซเชฒเซ‡เช–เชจเซ€เชฏ เช›เซ‡ เช•เซ‡ เชฎเชพเชฐเซเช—เซ‹ เชตเชฟเชทเชฏ เชชเชฐ เชธเช‚เชฆเซ‡เชถ เชฎเซ‹เช•เชฒเซ‹:

  1. เช•เชพเชธเซเชŸ - เช…เชตเชฐเซ‹เชงเชฟเชค เช•เชฐเชคเซเช‚ เชจเชฅเซ€ เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชชเชฐเชฟเชฃเชพเชฎเชจเซ€ เช…เชชเซ‡เช•เซเชทเชพ เชฐเชพเช–เชคเซเช‚ เชจเชฅเซ€. เชคเชฎเซ‡ เชชเชฐเชฟเชฃเชพเชฎ เชฌเซ€เชœเชพ เชตเชฟเชทเชฏ เชชเชฐ เชธเช‚เชฆเซ‡เชถ เชคเชฐเซ€เช•เซ‡ เชฎเซ‹เช•เชฒเซ€ เชถเช•เชคเชพ เชจเชฅเซ€.

  2. เชฎเซ‹เช•เชฒเซ‹ - เช…เชตเชฐเซ‹เชงเชฟเชค เช•เชฐเชคเซเช‚ เชจเชฅเซ€ เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชชเชฐเชฟเชฃเชพเชฎเชจเซ€ เช…เชชเซ‡เช•เซเชทเชพ เชฐเชพเช–เชคเซเช‚ เชจเชฅเซ€. เชคเชฎเซ‡ เชœเซ‡ เชตเชฟเชทเชฏ เชชเชฐ เชชเชฐเชฟเชฃเชพเชฎ เช†เชตเชถเซ‡ เชคเซ‡เชฎเชพเช‚ เชเชœเชจเซเชŸเชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹.

  3. เชชเซ‚เช›เซ‹ - เชชเชฐเชฟเชฃเชพเชฎเชจเซ€ เชฐเชพเชน เชœเซเช เช›เซ‡. เชคเชฎเซ‡ เชœเซ‡ เชตเชฟเชทเชฏ เชชเชฐ เชชเชฐเชฟเชฃเชพเชฎ เช†เชตเชถเซ‡ เชคเซ‡เชฎเชพเช‚ เชเชœเชจเซเชŸเชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹.

เชคเซ‡เชฅเซ€, เช†เชœเซ‡ เชฎเชพเชŸเซ‡ เชเชœเชจเซเชŸเซ‹ เชธเชพเชฅเซ‡ เชเชŸเชฒเซเช‚ เชœ เช›เซ‡!

เชกเซเชฐเซ€เชฎ เชŸเซ€เชฎ

เช›เซ‡เชฒเซเชฒเซ€ เชตเชธเซเชคเซ เชœเซ‡ เชฎเซ‡เช‚ เช† เชญเชพเช—เชฎเชพเช‚ เชฒเช–เชตเชพเชจเซเช‚ เชตเชšเชจ เช†เชชเซเชฏเซเช‚ เชนเชคเซเช‚ เชคเซ‡ เช†เชฆเซ‡เชถเซ‹ เช›เซ‡. เช…เช—เชพเช‰ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเซเชฏเซ‹ เช›เซ‡ เชคเซ‡เชฎ, เชซเซ‰เชธเซเชŸเชฎเชพเช‚ เช†เชฆเซ‡เชถเซ‹ เช•เซเชฒเชฟเช•เชจเซ€ เช†เชธเชชเชพเชธ เชเช• เช†เชตเชฐเชฃ เช›เซ‡. เชตเชพเชธเซเชคเชตเชฎเชพเช‚, -A เช•เซ€เชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเชคเซ€ เชตเช–เชคเซ‡ faust เช…เชฎเชพเชฐเชพ เช•เชธเซเชŸเชฎ เช†เชฆเซ‡เชถเชจเซ‡ เชคเซ‡เชจเชพ เช‡เชจเซเชŸเชฐเชซเซ‡เชธ เชธเชพเชฅเซ‡ เชœเซ‹เชกเซ‡ เช›เซ‡

เชฎเชพเช‚ เชœเชพเชนเซ‡เชฐ เช•เชฐเชพเชฏเซ‡เชฒเชพ เชเชœเชจเซเชŸเซ‹ เชชเช›เซ€ agents.py เชกเซ‡เช•เซ‹เชฐเซ‡เชŸเชฐ เชธเชพเชฅเซ‡ เชซเช‚เช•เซเชถเชจ เช‰เชฎเซ‡เชฐเซ‹ app.commandเชชเชฆเซเชงเชคเชฟเชจเซ‡ เชฌเซ‹เชฒเชพเชตเซ‡ เช›เซ‡ เช•เชพเชธเซเชŸ ัƒ เชเช•เชคเซเชฐเชฟเชค_เชธเซเชฐเช•เซเชทเชพ:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

เช†เชฎ, เชœเซ‹ เช†เชชเชฃเซ‡ เช†เชฆเซ‡เชถเซ‹เชจเซ€ เชธเซ‚เชšเชฟเชจเซ‡ เช•เซ‰เชฒ เช•เชฐเซ€เช, เชคเซ‹ เช†เชชเชฃเซ‹ เชจเชตเซ‹ เช†เชฆเซ‡เชถ เชคเซ‡เชฎเชพเช‚ เชนเชถเซ‡:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

เช…เชฎเซ‡ เชคเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เชฌเซ€เชœเชพ เช•เซ‹เชˆเชจเซ€ เชœเซ‡เชฎ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช, เชคเซ‡เชฅเซ€ เชšเชพเชฒเซ‹ เชซเซ‹เชธเซเชŸ เชตเชฐเซเช•เชฐเชจเซ‡ เชซเชฐเซ€เชฅเซ€ เชถเชฐเซ‚ เช•เชฐเซ€เช เช…เชจเซ‡ เชธเชฟเช•เซเชฏเซ‹เชฐเชฟเชŸเซ€เชเชจเซ‹ เชธเช‚เชชเซ‚เชฐเซเชฃ เชธเช‚เช—เซเชฐเชน เชถเชฐเซ‚ เช•เชฐเซ€เช:

> faust -A horton.agents start-collect-securities

เช†เช—เชณ เชถเซเช‚ เชฅเชถเซ‡?

เช†เช—เชณเชจเชพ เชญเชพเช—เชฎเชพเช‚, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡ เชฌเชพเช•เซ€เชจเชพ เชเชœเชจเซเชŸเซ‹เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡, เช…เชฎเซ‡ เชตเชฐเซเชทเชจเชพ เชŸเซเชฐเซ‡เชกเชฟเช‚เช—เชจเชพ เชฌเช‚เชง เชญเชพเชต เช…เชจเซ‡ เชเชœเชจเซเชŸเซ‹เชจเชพ เช•เซเชฐเซ‹เชจ เชฒเซ‹เช‚เชšเชฎเชพเช‚ เชšเชฐเชฎเชธเซ€เชฎเชพ เชถเซ‹เชงเชตเชพ เชฎเชพเชŸเซ‡เชจเซ€ เชธเชฟเช‚เช• เชฎเชฟเช•เซ‡เชจเชฟเชเชฎ เชชเชฐ เชตเชฟเชšเชพเชฐ เช•เชฐเซ€เชถเซเช‚.

เช†เชœ เชฎเชพเชŸเซ‡ เช†เชŸเชฒเซเช‚ เชœ! เชตเชพเช‚เชšเชตเชพ เชฌเชฆเชฒ เช†เชญเชพเชฐ :)

เช† เชญเชพเช— เชฎเชพเชŸเซ‡ เช•เซ‹เชก

เชซเซ‹เชธเซเชŸ, เชญเชพเช— II เชชเชฐ เชชเซƒเชทเซเช เชญเซ‚เชฎเชฟ เช•เชพเชฐเซเชฏเซ‹: เชเชœเชจเซเชŸเซ‹ เช…เชจเซ‡ เชŸเซ€เชฎเซ‹

เชชเซ€.เชเชธ. เช›เซ‡เชฒเซเชฒเชพ เชญเชพเช— เชนเซ‡เช เชณ เชฎเชจเซ‡ เชซเซ‹เชธเซเชŸ เช…เชจเซ‡ เช•เชจเซเชซเซเชฒเซเช…เชจเซเชŸ เช•เชพเชซเช•เชพ เชตเชฟเชถเซ‡ เชชเซ‚เช›เชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซเช‚ เชนเชคเซเช‚ (เชธเช‚เช—เชฎเชฎเชพเช‚ เชถเซเช‚ เชฒเช•เซเชทเชฃเซ‹ เช›เซ‡?). เชเชตเซเช‚ เชฒเชพเช—เซ‡ เช›เซ‡ เช•เซ‡ เชธเช‚เช—เชฎ เช˜เชฃเซ€ เชฐเซ€เชคเซ‡ เชตเชงเซ เช•เชพเชฐเซเชฏเชพเชคเซเชฎเช• เช›เซ‡, เชชเชฐเช‚เชคเซ เชนเช•เซ€เช•เชค เช เช›เซ‡ เช•เซ‡ เชซเซ‰เชธเซเชŸเชจเซ‡ เชธเช‚เช—เชฎ เชฎเชพเชŸเซ‡ เชธเช‚เชชเซ‚เชฐเซเชฃ เช•เซเชฒเชพเชฏเช‚เชŸ เชธเชชเซ‹เชฐเซเชŸ เชจเชฅเซ€ - เช† เชจเซ€เชšเซ‡ เชฎเซเชœเชฌ เช›เซ‡ เชฆเชธเซเชคเชพเชตเซ‡เชœเชฎเชพเช‚ เช•เซเชฒเชพเชฏเช‚เชŸ เชชเซเชฐเชคเชฟเชฌเช‚เชงเซ‹เชจเซเช‚ เชตเชฐเซเชฃเชจ.

เชธเซ‹เชฐเซเชธ: www.habr.com

เชเช• เชŸเชฟเชชเซเชชเชฃเซ€ เช‰เชฎเซ‡เชฐเซ‹