เดซเต—เดธเตเดฑเตเดฑเดฟเดฒเต† เดชเดถเตเดšเดพเดคเตเดคเดฒ เดœเต‹เดฒเดฟเด•เตพ, เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚

เดซเต—เดธเตเดฑเตเดฑเดฟเดฒเต† เดชเดถเตเดšเดพเดคเตเดคเดฒ เดœเต‹เดฒเดฟเด•เตพ, เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚

เด‰เดณเตเดณเดŸเด•เตเด• เดชเดŸเตเดŸเดฟเด•

  1. เดญเดพเด—เด‚ I: เด†เดฎเตเด–เด‚

  2. เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚

เดจเดฎเตเดฎเตพ เด‡เดตเดฟเดŸเต† เดŽเดจเตเดคเดพเดฃเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต?

เด…เดคเดฟเดจเดพเตฝ, เดฐเดฃเตเดŸเดพเด‚ เดญเดพเด—เด‚. เดจเต‡เดฐเดคเตเดคเต† เดŽเดดเตเดคเดฟเดฏเดคเตเดชเต‹เดฒเต†, เด…เดคเดฟเตฝ เดžเด™เตเด™เตพ เด‡เดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจเดต เดšเต†เดฏเตเดฏเตเด‚:

  1. เดจเดฎเตเด•เตเด•เต เด†เดตเดถเตเดฏเดฎเตเดณเตเดณ เดŽเตปเดกเต เดชเต‹เดฏเดฟเดจเตเดฑเตเด•เตพเด•เตเด•เดพเดฏเตเดณเตเดณ เด…เดญเตเดฏเตผเดคเตเดฅเดจเด•เตพเด•เตเด•เตŠเดชเตเดชเด‚ aiohttp-เดฏเดฟเตฝ เด†เตฝเดซเดตเดพเดจเตเดฑเต‡เดœเดฟเดจเดพเดฏเดฟ เด’เดฐเต เดšเต†เดฑเดฟเดฏ เด•เตเดฒเดฏเดจเตเดฑเต เดŽเดดเตเดคเดพเด‚.

  2. เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เดณเตเดŸเต†เดฏเตเด‚ เดฎเต†เดฑเตเดฑเดพ เดตเดฟเดตเดฐเด™เตเด™เดณเตเดŸเต†เดฏเตเด‚ เดกเดพเดฑเตเดฑ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเดจเตเดจ เด’เดฐเต เดเดœเดจเตเดฑเดฟเดจเต† เดจเดฎเตเด•เตเด•เต เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเด‚.

เดชเด•เตเดทเต‡, เดชเตเดฐเต‹เดœเด•เตเดฑเตเดฑเดฟเดจเดพเดฏเดฟ เดžเด™เตเด™เตพ เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต เด‡เดคเดพเดฃเต, เด•เต‚เดŸเดพเดคเต† เดซเดพเดธเตเดฑเตเดฑเต เด—เดตเต‡เดทเดฃเดคเตเดคเดฟเดจเตเดฑเต† เด…เดŸเดฟเดธเตเดฅเดพเดจเดคเตเดคเดฟเตฝ, เด•เดพเดซเตเด•เดฏเดฟเตฝ เดจเดฟเดจเตเดจเต เด‡เดตเดจเตเดฑเตเด•เตพ เดธเตเดŸเตเดฐเต€เด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจ เดเดœเดจเตเดฑเตเดฎเดพเดฐเต† เดŽเด™เตเด™เดจเต† เดŽเดดเตเดคเดพเดฎเต†เดจเตเดจเตเด‚ เด…เดคเตเดชเต‹เดฒเต† เด•เดฎเดพเตปเดกเตเด•เตพ เดŽเด™เตเด™เดจเต† เดŽเดดเตเดคเดพเดฎเต†เดจเตเดจเตเด‚ (เดฑเดพเดชเตเดชเตผ เด•เตเดฒเดฟเด•เตเด•เต เดšเต†เดฏเตเดฏเตเด•) เดžเด™เตเด™เตพ เดชเด เดฟเด•เตเด•เตเด‚ - เดเดœเดจเตเดฑเต เดจเดฟเดฐเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจ เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดฎเดพเดจเตเดตเตฝ เดชเตเดทเต เดธเดจเตเดฆเต‡เดถเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ.

เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เตฝ

เด†เตฝเดซเดตเดพเดจเตเดฑเต‡เดœเต เด•เตเดฒเดฏเดจเตเดฑเต

เด†เดฆเตเดฏเด‚, เด†เตฝเดซเดตเดพเดจเตเดฑเต‡เดœเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เด…เดญเตเดฏเตผเดคเตเดฅเดจเด•เตพเด•เตเด•เดพเดฏเดฟ เดจเดฎเตเด•เตเด•เต เด’เดฐเต เดšเต†เดฑเดฟเดฏ aiohttp เด•เตเดฒเดฏเดจเตเดฑเต เดŽเดดเตเดคเดพเด‚.

alphavantage.py

เดธเตเดชเต‡เดพเดฏเตเดฒเตผ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

เดตเดพเดธเตเดคเดตเดคเตเดคเดฟเตฝ, เด…เดคเดฟเตฝ เดจเดฟเดจเตเดจเต เดŽเดฒเตเดฒเดพเด‚ เดตเตเดฏเด•เตเดคเดฎเดพเดฃเต:

  1. AlphaVantage API เดตเดณเดฐเต† เดฒเดณเดฟเดคเดฎเดพเดฏเตเด‚ เดฎเดจเต‹เดนเดฐเดฎเดพเดฏเตเด‚ เดฐเต‚เดชเด•เตฝเดชเตเดชเดจ เดšเต†เดฏเตโ€Œเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต, เด…เดคเดฟเดจเดพเตฝ เดŽเดฒเตเดฒเดพ เด…เดญเตเดฏเตผเดคเตเดฅเดจเด•เดณเตเด‚ เดˆ เดฐเต€เดคเดฟเดฏเดฟเดฒเต‚เดŸเต† เดจเดŸเดคเตเดคเดพเตป เดžเดพเตป เดคเต€เดฐเตเดฎเดพเดจเดฟเดšเตเดšเต construct_query เด…เดตเดฟเดŸเต† เด’เดฐเต http เด•เต‹เตพ เด‰เดฃเตเดŸเต.

  2. เดžเดพเตป เดŽเดฒเตเดฒเดพ เดตเดฏเดฒเตเด•เดณเตเด‚ เด•เตŠเดฃเตเดŸเตเดตเดฐเตเดจเตเดจเต snake_case เดธเต—เด•เดฐเตเดฏเดคเตเดคเดฟเดจเดพเดฏเดฟ.

  3. เดจเดจเตเดจเดพเดฏเดฟ, เดฎเดจเต‹เดนเดฐเดตเตเด‚ เดตเดฟเดœเตเดžเดพเดจเดชเตเดฐเดฆเดตเตเดฎเดพเดฏ เดŸเตเดฐเต†เดฏเตเดธเตเดฌเดพเด•เตเด•เต เด”เดŸเตเดŸเตเดชเตเดŸเตเดŸเดฟเดจเตเดณเตเดณ logger.catch เด…เดฒเด™เตเด•เดพเดฐเด‚.

PS config.yml-เดฒเต‡เด•เตเด•เต เดชเตเดฐเดพเดฆเต‡เดถเดฟเด•เดฎเดพเดฏเดฟ เด†เตฝเดซเดตเดพเดจเตเดฑเต‡เดœเต เดŸเต‹เด•เตเด•เตบ เดšเต‡เตผเด•เตเด•เดพเตป เดฎเดฑเด•เตเด•เดฐเตเดคเต, เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดชเดฐเดฟเดธเตเดฅเดฟเดคเดฟ เดตเต‡เดฐเดฟเดฏเดฌเดฟเตพ เด•เดฏเดฑเตเดฑเตเดฎเดคเดฟ เดšเต†เดฏเตเดฏเตเด• HORTON_SERVICE_APIKEY. เดžเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดŸเต‹เด•เตเด•เตบ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเต เด‡เดตเดฟเดŸเต†.

CRUD เด•เตเดฒเดพเดธเต

เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เดณเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเดณเตเดณ เดฎเต†เดฑเตเดฑเดพ เดตเดฟเดตเดฐเด™เตเด™เตพ เดธเด‚เดญเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต เดžเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเต€เดธเต เดถเต‡เด–เดฐเด‚ เด‰เดฃเตเดŸเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเด‚.

เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเต/security.py

เดŽเดจเตเดฑเต† เด…เดญเดฟเดชเตเดฐเดพเดฏเดคเตเดคเดฟเตฝ, เด‡เดตเดฟเดŸเต† เด’เดจเตเดจเตเด‚ เดตเดฟเดถเดฆเต€เด•เดฐเดฟเด•เตเด•เต‡เดฃเตเดŸ เด†เดตเดถเตเดฏเดฎเดฟเดฒเตเดฒ, เด…เดŸเดฟเดธเตเดฅเดพเดจ เด•เตเดฒเดพเดธเต เดคเดจเตเดจเต† เดตเดณเดฐเต† เดฒเดณเดฟเดคเดฎเดพเดฃเต.

get_app()

เด’เดฐเต เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เด’เดฌเตโ€Œเดœเด•เตโ€Œเดฑเตเดฑเต เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต เด’เดฐเต เดซเด‚เด—เตโ€Œเดทเตป เดšเต‡เตผเด•เตเด•เดพเด‚ app.py

เดธเตเดชเต‡เดพเดฏเตเดฒเตผ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

เด‡เดชเตเดชเต‹เตพ เดžเด™เตเด™เตพเด•เตเด•เต เดเดฑเตเดฑเดตเตเด‚ เดฒเดณเดฟเดคเดฎเดพเดฏ เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตฝ เดฒเดญเดฟเด•เตเด•เตเด‚, เด•เตเดฑเดšเตเดšเต เด•เดดเดฟเดžเตเดžเต เดžเด™เตเด™เตพ เด…เดคเต เดตเดฟเด•เดธเดฟเดชเตเดชเดฟเด•เตเด•เตเด‚, เดŽเดจเตเดจเดฟเดฐเตเดจเตเดจเดพเดฒเตเด‚, เดจเดฟเด™เตเด™เดณเต† เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เดพเดคเดฟเดฐเดฟเด•เตเด•เดพเตป, เด‡เดตเดฟเดŸเต† เด…เดตเดฒเด‚เดฌเด™เตเด™เตพ เด†เดชเตเดชเต เด•เตเดฒเดพเดธเดฟเดฒเต‡เด•เตเด•เต. เดฎเดฟเด•เตเด• เด•เตเดฐเดฎเต€เด•เดฐเดฃเด™เตเด™เตพเด•เตเด•เตเด‚ เด‰เดคเตเดคเดฐเดตเดพเดฆเดฟเดฏเดพเดฏเดคเดฟเดจเดพเตฝ, เด•เตเดฐเดฎเต€เด•เดฐเดฃ เด•เตเดฒเดพเดธเต เดจเต‹เด•เตเด•เดพเดจเตเด‚ เดžเดพเตป เดจเดฟเด™เตเด™เดณเต† เด‰เดชเดฆเต‡เดถเดฟเด•เตเด•เตเดจเตเดจเต.

เดชเตเดฐเดงเดพเดจ เดถเดฐเต€เดฐเด‚

เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เดณเตเดŸเต† เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เดถเต‡เด–เดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เดชเดฐเดฟเดชเดพเดฒเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดฎเตเดณเตเดณ เดเดœเดจเตเดฑเต

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เด…เดคเดฟเดจเดพเตฝ, เด†เดฆเตเดฏเด‚ เดจเดฎเตเด•เตเด•เต เดซเดพเดธเตเดฑเตเดฑเต เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เด’เดฌเตเดœเด•เตเดฑเตเดฑเต เดฒเดญเดฟเด•เตเด•เตเด‚ - เด‡เดคเต เดตเดณเดฐเต† เดฒเดณเดฟเดคเดฎเดพเดฃเต. เด…เดŸเตเดคเตเดคเดคเดพเดฏเดฟ, เดžเด™เตเด™เดณเตเดŸเต† เดเดœเดจเตเดฑเดฟเดจเดพเดฏเดฟ เดžเด™เตเด™เตพ เด’เดฐเต เดตเดฟเดทเดฏเด‚ เดตเตเดฏเด•เตเดคเดฎเดพเดฏเดฟ เดชเตเดฐเด–เตเดฏเดพเดชเดฟเด•เตเด•เตเดจเตเดจเต... เด‡เดตเดฟเดŸเต† เด…เดคเต เดŽเดจเตเดคเดพเดฃเต†เดจเตเดจเตเด‚ เด†เดจเตเดคเดฐเดฟเด• เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเตผ เดŽเดจเตเดคเดพเดฃเต†เดจเตเดจเตเด‚ เด‡เดคเต เดŽเด™เตเด™เดจเต† เดตเตเดฏเดคเตเดฏเดธเตเดคเดฎเดพเดฏเดฟ เด•เตเดฐเดฎเต€เด•เดฐเดฟเด•เตเด•เดพเดฎเต†เดจเตเดจเตเด‚ เดชเดฐเดพเดฎเตผเดถเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเดพเดฃเต.

  1. เด•เดพเดซเตเด•เดฏเดฟเดฒเต† เดตเดฟเดทเดฏเด™เตเด™เตพ, เด•เตƒเดคเตเดฏเดฎเดพเดฏ เดจเดฟเตผเดตเดšเดจเด‚ เด…เดฑเดฟเดฏเดฃเดฎเต†เด™เตเด•เดฟเตฝ, เดตเดพเดฏเดฟเด•เตเด•เตเดจเตเดจเดคเดพเดฃเต เดจเดฒเตเดฒเดคเต เด“เดซเต. เดชเตเดฐเดฎเดพเดฃเด‚, เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดตเดพเดฏเดฟเด•เตเด•เดพเด‚ เดธเด‚เด—เตเดฐเดนเด‚ เดฑเดทเตเดฏเตป เดญเดพเดทเดฏเดฟเตฝ เดนเดฌเตเดฐเต†เดฏเดฟเตฝ, เดŽเดฒเตเดฒเดพเด‚ เดตเดณเดฐเต† เด•เตƒเดคเตเดฏเดฎเดพเดฏเดฟ เดชเตเดฐเดคเดฟเดซเดฒเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเต :)

  2. เด†เดจเตเดคเดฐเดฟเด• เดชเดฐเดพเดฎเต€เดฑเตเดฑเตผ, เดซเต‹เดธเตเดฑเตเดฑเต เดกเต‹เด•เตเด•เดฟเตฝ เดจเดจเตเดจเดพเดฏเดฟ เดตเดฟเดตเดฐเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเต, เด•เต‹เดกเดฟเตฝ เดตเดฟเดทเดฏเด‚ เดจเต‡เดฐเดฟเดŸเตเดŸเต เด•เต‹เตบเดซเดฟเด—เตผ เดšเต†เดฏเตเดฏเดพเตป เดžเด™เตเด™เดณเต† เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเดจเตเดจเต, เดคเต€เตผเดšเตเดšเดฏเดพเดฏเตเด‚, เด‡เดคเดฟเดจเตผเดคเตเดฅเด‚ เดซเดธเตเดฑเตเดฑเต เดกเต†เดตเดฒเดชเตเดชเตผเดฎเดพเตผ เดจเตฝเด•เตเดจเตเดจ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ, เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต: เดจเดฟเดฒเดจเดฟเตผเดคเตเดคเตฝ, เดจเดฟเดฒเดจเดฟเตผเดคเตเดคเตฝ เดจเดฏเด‚ (เดธเตเดฅเดฟเดฐเดธเตเดฅเดฟเดคเดฟเดฏเดพเดฏเดฟ เด‡เดฒเตเดฒเดพเดคเดพเด•เตเด•เตเด•, เดŽเดจเตเดจเดพเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดธเดœเตเดœเดฎเดพเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚ เด’เดคเตเด•เตเด•เดฎเตเดณเตเดณ), เด“เดฐเต‹ เดตเดฟเดทเดฏเดคเตเดคเดฟเดจเตเด‚ เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เดณเตเดŸเต† เดŽเดฃเตเดฃเด‚ (เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เตพเดšเต†เดฏเตเดฏเดพเตป, เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เด•เตเดฑเดตเต เด†เด—เต‹เดณ เดชเตเดฐเดพเดงเดพเดจเตเดฏเด‚ เด…เดชเต‡เด•เตเดทเด•เตพ เดตเต‡เด—เดคเตเดคเดฟเตฝ).

  3. เดชเตŠเดคเตเดตเต‡, เดเดœเดจเตเดฑเดฟเดจเต เด†เด—เต‹เดณ เดฎเต‚เดฒเตเดฏเด™เตเด™เดณเตเดณเตเดณ เด’เดฐเต เดจเดฟเดฏเดจเตเดคเตเดฐเดฟเดค เดตเดฟเดทเดฏเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚, เดŽเดจเตเดจเดฟเดฐเตเดจเตเดจเดพเดฒเตเด‚, เดŽเดฒเตเดฒเดพเด‚ เดตเตเดฏเด•เตเดคเดฎเดพเดฏเดฟ เดชเตเดฐเด–เตเดฏเดพเดชเดฟเด•เตเด•เดพเตป เดžเดพเตป เด†เด—เตเดฐเดนเดฟเด•เตเด•เตเดจเตเดจเต. เด•เต‚เดŸเดพเดคเต†, เดเดœเดจเตเดฑเต เดชเดฐเดธเตเดฏเดคเตเดคเดฟเดฒเต† เดตเดฟเดทเดฏเดคเตเดคเดฟเดจเตเดฑเต† เดšเดฟเดฒ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ (เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เดณเตเดŸเต† เดŽเดฃเตเดฃเด‚ เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดจเดฟเดฒเดจเดฟเตผเดคเตเดคเตฝ เดจเดฏเด‚) เด•เต‹เตบเดซเดฟเด—เตผ เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเดฟเดฒเตเดฒ.

    เดตเดฟเดทเดฏเด‚ เดธเตเดตเดฎเต‡เดงเดฏเดพ เดจเดฟเตผเดตเดšเดฟเด•เตเด•เดพเดคเต† เด‡เดคเต เดŽเด™เตเด™เดจเต†เดฏเดพเดฏเดฟเดฐเดฟเด•เตเด•เตเดฎเต†เดจเตเดจเต เด‡เดคเดพ:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เดถเดฐเดฟ, เด‡เดชเตเดชเต‹เตพ เดžเด™เตเด™เดณเตเดŸเต† เดเดœเดจเตเดฑเต เดŽเดจเตเดคเตเดšเต†เดฏเตเดฏเตเดฎเต†เดจเตเดจเต เดตเดฟเดตเดฐเดฟเด•เตเด•เดพเด‚ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

เด…เดคเดฟเดจเดพเตฝ, เดเดœเดจเตเดฑเดฟเดจเตเดฑเต† เดคเตเดŸเด•เตเด•เดคเตเดคเดฟเตฝ, เดžเด™เตเด™เดณเตเดŸเต† เด•เตเดฒเดฏเดจเตเดฑเต เดฎเตเด–เต‡เดจเดฏเตเดณเตเดณ เด…เดญเตเดฏเตผเดคเตเดฅเดจเด•เตพเด•เตเด•เดพเดฏเดฟ เดžเด™เตเด™เตพ เด’เดฐเต aiohttp เดธเต†เดทเตป เดคเตเดฑเด•เตเด•เตเดจเตเดจเต. เด…เด™เตเด™เดจเต†, เด’เดฐเต เดคเตŠเดดเดฟเดฒเดพเดณเดฟ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เดžเด™เตเด™เดณเตเดŸเต† เดเดœเดจเตเดฑเต เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เด’เดฐเต เดธเต†เดทเตป เด‰เดŸเดจเดŸเดฟ เดคเตเดฑเด•เตเด•เตเด‚ - เด’เดจเตเดจเต, เดฎเตเดดเตเดตเตป เดธเดฎเดฏเดคเตเดคเตเด‚ เดคเตŠเดดเดฟเดฒเดพเดณเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเต (เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ เดจเดฟเดฐเดตเดงเดฟ, เดจเดฟเด™เตเด™เตพ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเตผ เดฎเดพเดฑเตเดฑเตเด•เดฏเดพเดฃเต†เด™เตเด•เดฟเตฝ เด•เตบเด•เดฑเตปเดธเดฟ เดธเตเดฅเดฟเดฐเดธเตเดฅเดฟเดคเดฟ เดฏเต‚เดฃเดฟเดฑเตเดฑเตเดณเตเดณ เด’เดฐเต เดเดœเดจเตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต).

เด…เดŸเตเดคเตเดคเดคเดพเดฏเดฟ, เดžเด™เตเด™เตพ เดธเตเดŸเตเดฐเต€เด‚ เดชเดฟเดจเตเดคเตเดŸเดฐเตเดจเตเดจเต (เดžเด™เตเด™เตพ เดธเดจเตเดฆเต‡เดถเด‚ เดจเตฝเด•เตเดจเตเดจเต _, เดžเด™เตเด™เตพ, เดˆ เดเดœเดจเตเดฑเดฟเตฝ, เดžเด™เตเด™เดณเตเดŸเต† เดตเดฟเดทเดฏเดคเตเดคเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เดธเดจเตเดฆเต‡เดถเด™เตเด™เดณเตเดŸเต†) เด‰เดณเตเดณเดŸเด•เตเด•เดคเตเดคเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เดพเดคเตเดคเดคเดฟเดจเดพเตฝ, เด…เดต เดจเดฟเดฒเดตเดฟเดฒเต† เด“เดซเตโ€Œเดธเต†เดฑเตเดฑเดฟเตฝ เดจเดฟเดฒเดตเดฟเดฒเตเดฃเตเดŸเต†เด™เตเด•เดฟเตฝ, เด…เดฒเตเดฒเดพเดคเตเดคเดชเด•เตเดทเด‚ เดžเด™เตเด™เดณเตเดŸเต† เดธเตˆเด•เตเด•เดฟเตพ เด…เดตเดฐเตเดŸเต† เดตเดฐเดตเดฟเดจเดพเดฏเดฟ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เตเด‚. เดถเดฐเดฟ, เดžเด™เตเด™เดณเตเดŸเต† เดฒเต‚เดชเตเดชเดฟเดจเตเดณเตเดณเดฟเตฝ, เดžเด™เตเด™เตพ เดธเดจเตเดฆเต‡เดถเดคเตเดคเดฟเดจเตเดฑเต† เดฐเดธเต€เดคเต เดฒเต‹เด—เต เดšเต†เดฏเตเดฏเตเดจเตเดจเต, เดธเดœเต€เดตเดฎเดพเดฏ (get_securities เดฑเดฟเดŸเตเดŸเต‡เดฃเตเด•เตพ เดธเตเดฅเดฟเดฐเดธเตเดฅเดฟเดคเดฟเดฏเดพเดฏเดฟ เดฎเดพเดคเตเดฐเดฎเต‡ เดธเดœเต€เดตเดฎเดพเด•เต‚, เด•เตเดฒเดฏเดจเตเดฑเต เด•เต‹เดกเต เด•เดพเดฃเตเด•) เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เดณเตเดŸเต† เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เดจเต‡เดŸเตเด•เดฏเตเด‚ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเดฟเตฝ เดธเด‚เดฐเด•เตเดทเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต, เด…เดคเต‡ เดŸเดฟเด•เตเด•เดฑเตเดณเตเดณ เดธเตเดฐเด•เตเดทเดฏเตเดฃเตเดŸเต‹ เดŽเดจเตเดจเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเดฟเตฝ เดŽเด•เตเดธเตเดšเต‡เดžเตเดšเต , เด‰เดฃเตเดŸเต†เด™เตเด•เดฟเตฝ, เด…เดคเต (เดชเต‡เดชเตเดชเตผ) เดฒเดณเดฟเดคเดฎเดพเดฏเดฟ เด…เดชเตเดกเต‡เดฑเตเดฑเต เดšเต†เดฏเตเดฏเตเด‚.

เดจเดฎเตเด•เตเด•เต เดจเดฎเตเดฎเตเดŸเต† เดธเตƒเดทเตเดŸเดฟ เด†เดฐเด‚เดญเดฟเด•เตเด•เดพเด‚!

> docker-compose up -d
... ะ—ะฐะฟัƒัะบ ะบะพะฝั‚ะตะนะฝะตั€ะพะฒ ...
> faust -A horton.agents worker --without-web -l info

PS เดธเดตเดฟเดถเต‡เดทเดคเด•เตพ เดตเต†เดฌเต เด˜เดŸเด•เด‚ เดฒเต‡เด–เดจเด™เตเด™เดณเดฟเตฝ เดžเดพเตป เดซเต—เดธเตเดฑเตเดฑเต เดชเดฐเดฟเด—เดฃเดฟเด•เตเด•เดฟเดฒเตเดฒ, เด…เดคเดฟเดจเดพเตฝ เดžเด™เตเด™เตพ เด‰เดšเดฟเดคเดฎเดพเดฏ เดชเดคเดพเด• เดธเดœเตเดœเดฎเดพเด•เตเด•เดฟ.

เดžเด™เตเด™เดณเตเดŸเต† เดฒเต‹เดžเตเดšเต เด•เดฎเดพเตปเดกเดฟเตฝ, เด‡เตปเดซเต‹ เดฒเต‹เด—เต เด”เดŸเตเดŸเตโ€ŒเดชเตเดŸเตเดŸเต เดฒเต†เดตเตฝ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เด’เดฌเตโ€Œเดœเด•เตเดฑเตเดฑเต เดŽเดตเดฟเดŸเต†เดฏเดพเดฃเต เดคเดฟเดฐเดฏเต‡เดฃเตเดŸเดคเต†เดจเตเดจเตเด‚ เด…เดคเต เดŽเดจเตเดคเต เดšเต†เดฏเตเดฏเดฃเด‚ (เด’เดฐเต เดคเตŠเดดเดฟเดฒเดพเดณเดฟเดฏเต† เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เดฃเดฎเต†เดจเตเดจเตเด‚) เดžเด™เตเด™เตพ เดซเดพเดธเตเดฑเตเดฑเดฟเดจเต‹เดŸเต เดชเดฑเดžเตเดžเต. เดžเด™เตเด™เตพเด•เตเด•เต เด‡เดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจ เด”เดŸเตเดŸเตเดชเตเดŸเตเดŸเต เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเต:

เดธเตเดชเต‡เดพเดฏเตเดฒเตผ

โ”Œฦ’aยตSโ€  v1.10.4โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ id          โ”‚ horton                                            โ”‚
โ”‚ transport   โ”‚ [URL('kafka://localhost:9092')]                   โ”‚
โ”‚ store       โ”‚ memory:                                           โ”‚
โ”‚ log         โ”‚ -stderr- (info)                                   โ”‚
โ”‚ pid         โ”‚ 1271262                                           โ”‚
โ”‚ hostname    โ”‚ host-name                                         โ”‚
โ”‚ platform    โ”‚ CPython 3.8.2 (Linux x86_64)                      โ”‚
โ”‚ drivers     โ”‚                                                   โ”‚
โ”‚   transport โ”‚ aiokafka=1.1.6                                    โ”‚
โ”‚   web       โ”‚ aiohttp=3.6.2                                     โ”‚
โ”‚ datadir     โ”‚ /path/to/project/horton-data                      โ”‚
โ”‚ appdir      โ”‚ /path/to/project/horton-data/v1                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...

โ”ŒTopic Partition Setโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ topic                      โ”‚ partitions โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ collect_securities         โ”‚ {0-7}      โ”‚
โ”‚ horton-__assignor-__leader โ”‚ {0}        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 

เด‡เดคเดฟเดจเต เดœเต€เดตเดจเตเดฃเตเดŸเต!!!

เดชเดพเตผเดŸเตเดŸเต€เดทเตป เดธเต†เดฑเตเดฑเต เดจเต‹เด•เตเด•เดพเด‚. เดจเดฎเตเด•เตเด•เต เด•เดพเดฃเดพเดจเดพเด•เตเดจเตเดจเดคเตเดชเต‹เดฒเต†, เด•เต‹เดกเดฟเตฝ เดžเด™เตเด™เตพ เดจเดฟเดฏเตเด•เตเดคเดฎเดพเด•เตเด•เดฟเดฏ เดชเต‡เดฐเต, เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เดณเตเดŸเต† เดกเดฟเดซเต‹เตพเดŸเตเดŸเต เดจเดฎเตเดชเตผ (8, เดŽเดŸเตเดคเตเดคเดคเต เดตเดฟเดทเดฏเด‚_เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เตพ - เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเตป เด’เดฌเตโ€Œเดœเด•เตเดฑเตเดฑเต เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเตผ), เดžเด™เตเด™เดณเตเดŸเต† เดตเดฟเดทเดฏเดคเตเดคเดฟเดจเดพเดฏเดฟ เดžเด™เตเด™เตพ เด’เดฐเต เดตเตเดฏเด•เตเดคเดฟเด—เดค เดฎเต‚เดฒเตเดฏเด‚ เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เดฟเดฏเดฟเดŸเตเดŸเดฟเดฒเตเดฒเดพเดคเตเดคเดคเดฟเดจเดพเตฝ (เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เตพ เดตเดดเดฟ). เดตเตผเด•เตเด•เดฑเดฟเดฒเต† เดฒเต‹เดžเตเดšเต เดšเต†เดฏเตเดค เดเดœเดจเตเดฑเดฟเดจเต เดŽเดฒเตเดฒเดพ 8 เดชเดพเตผเดŸเตเดŸเต€เดทเดจเตเด•เดณเตเด‚ เดจเตฝเด•เดฟเดฏเดฟเดŸเตเดŸเตเดฃเตเดŸเต, เด•เดพเดฐเดฃเด‚ เด‡เดคเต เด’เดฐเต‡เดฏเตŠเดฐเต เดชเดพเตผเดŸเตเดŸเต€เดทเดจเดพเดฃเต, เดŽเดจเตเดจเดพเตฝ เด‡เดคเต เด•เตเดฒเดธเตเดฑเตเดฑเดฑเดฟเด‚เด—เดฟเดจเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเดณเตเดณ เดญเดพเด—เดคเตเดคเต เด•เต‚เดŸเตเดคเตฝ เดตเดฟเดถเดฆเดฎเดพเดฏเดฟ เดšเตผเดšเตเดš เดšเต†เดฏเตเดฏเตเด‚.

เดถเดฐเดฟ, เด‡เดชเตเดชเต‹เตพ เดจเดฎเตเด•เตเด•เต เดฎเดฑเตเดฑเตŠเดฐเต เดŸเต†เตผเดฎเดฟเดจเตฝ เดตเดฟเตปเดกเต‹เดฏเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ เดžเด™เตเด™เดณเตเดŸเต† เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เด’เดฐเต เดถเต‚เดจเตเดฏเดฎเดพเดฏ เดธเดจเตเดฆเต‡เดถเด‚ เด…เดฏเดฏเตเด•เตเด•เดพเด‚:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

เดชเดฟเดŽเดธเต เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเดคเต @ "collect_securities" เดŽเดจเตเดจ เดชเต‡เดฐเดฟเดฒเตเดณเตเดณ เด’เดฐเต เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เดพเดฃเต เดžเด™เตเด™เตพ เด’เดฐเต เดธเดจเตเดฆเต‡เดถเด‚ เด…เดฏเดฏเตเด•เตเด•เตเดจเตเดจเดคเต†เดจเตเดจเต เดžเด™เตเด™เตพ เด•เดพเดฃเดฟเด•เตเด•เตเดจเตเดจเต.

เดˆ เดธเดพเดนเดšเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดธเดจเตเดฆเต‡เดถเด‚ เดชเดพเตผเดŸเตเดŸเต€เดทเตป 6-เดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ - kafdrop on-เดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด‡เดคเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เดพเด‚ localhost:9000

เดžเด™เตเด™เดณเตเดŸเต† เดคเตŠเดดเดฟเดฒเดพเดณเดฟเดฏเตเดฎเดพเดฏเดฟ เดŸเต†เตผเดฎเดฟเดจเตฝ เดตเดฟเตปเดกเต‹เดฏเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เด•เตเดฎเตเดชเต‹เตพ, เดฒเต‹เด—เตเดฐเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด…เดฏเดšเตเดš เดธเดจเตเดคเต‹เดทเด•เดฐเดฎเดพเดฏ เดธเดจเตเดฆเต‡เดถเด‚ เดžเด™เตเด™เตพ เด•เดพเดฃเตเด‚:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

เดจเดฎเตเด•เตเด•เต เดฎเต‹เด‚เด—เต‹ (Robo3T เด…เดฒเตเดฒเต†เด™เตเด•เดฟเตฝ Studio3T เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต) เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เดพเดจเตเด‚ เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เตพ เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเดฟเตฝ เด‰เดฃเตเดŸเต†เดจเตเดจเต เด•เดพเดฃเดพเดจเตเด‚ เด•เดดเดฟเดฏเตเด‚:

เดžเดพเตป เด’เดฐเต เดถเดคเด•เต‹เดŸเต€เดถเตเดตเดฐเดจเดฒเตเดฒ, เด…เดคเดฟเดจเดพเตฝ เดžเด™เตเด™เตพ เด†เดฆเตเดฏเด‚ เด•เดพเดฃเดพเดจเตเดณเตเดณ เด“เดชเตเดทเดจเดฟเตฝ เดธเด‚เดคเตƒเดชเตเดคเดฐเดพเดฃเต.

เดซเต—เดธเตเดฑเตเดฑเดฟเดฒเต† เดชเดถเตเดšเดพเดคเตเดคเดฒ เดœเต‹เดฒเดฟเด•เตพ, เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚เดซเต—เดธเตเดฑเตเดฑเดฟเดฒเต† เดชเดถเตเดšเดพเดคเตเดคเดฒ เดœเต‹เดฒเดฟเด•เตพ, เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚

เดธเดจเตเดคเต‹เดทเดตเตเด‚ เดธเดจเตเดคเต‹เดทเดตเตเด‚ - เด†เดฆเตเดฏ เดเดœเดจเตเดฑเต เดคเดฏเตเดฏเดพเดฑเดพเดฃเต :)

เดเดœเดจเตเดฑเต เดคเดฏเตเดฏเดพเดฑเดพเดฃเต, เดชเตเดคเดฟเดฏ เดเดœเดจเตเดฑเดฟเดจเต เดฆเต€เตผเด˜เดพเดฏเตเดธเตเดธเต!

เด…เดคเต†, เดฎเดพเดจเตเดฏเดฐเต‡, เดˆ เดฒเต‡เด–เดจเด‚ เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เดฟเดฏ เดชเดพเดคเดฏเตเดŸเต† 1/3 เดฎเดพเดคเตเดฐเดฎเต‡ เดžเด™เตเด™เตพ เด•เดตเตผ เดšเต†เดฏเตเดคเดฟเดŸเตเดŸเตเดณเตเดณเต‚, เดชเด•เตเดทเต‡ เดจเดฟเดฐเตเดคเตเดธเดพเดนเดชเตเดชเต†เดŸเดฐเตเดคเต, เด•เดพเดฐเดฃเด‚ เด‡เดชเตเดชเต‹เตพ เด‡เดคเต เดŽเดณเตเดชเตเดชเดฎเดพเด•เตเด‚.

เด…เดคเดฟเดจเดพเตฝ เด‡เดชเตเดชเต‹เตพ เดจเดฎเตเด•เตเด•เต เดฎเต†เดฑเตเดฑเดพ เดตเดฟเดตเดฐเด™เตเด™เตพ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด•เดฏเตเด‚ เด’เดฐเต เดถเต‡เด–เดฐเดฃ เดชเตเดฐเดฎเดพเดฃเดคเตเดคเดฟเตฝ เด‡เดŸเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจ เด’เดฐเต เดเดœเดจเตเดฑเต เด†เดตเดถเตเดฏเดฎเดพเดฃเต:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

เดˆ เดเดœเดจเตเดฑเต เด’เดฐเต เดจเดฟเตผเดฆเตเดฆเดฟเดทเตโ€ŒเดŸ เดธเตเดฐเด•เตเดทเดฏเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเดณเตเดณ เดตเดฟเดตเดฐเด™เตเด™เตพ เดชเตเดฐเต‹เดธเดธเตเดธเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเดพเตฝ, เดธเดจเตเดฆเต‡เดถเดคเตเดคเดฟเตฝ เดˆ เดธเตเดฐเด•เตเดทเดฏเตเดŸเต† เดŸเดฟเด•เตเด•เตผ (เดšเดฟเดนเตเดจเด‚) เดžเด™เตเด™เตพ เดธเต‚เดšเดฟเดชเตเดชเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต. เดˆ เด†เดตเดถเตเดฏเดคเตเดคเดฟเดจเดพเดฏเดฟ เดซเดพเดธเตเดฑเตเดฑเดฟเตฝ เด‰เดฃเตเดŸเต เดฐเต‡เด–เด•เดณเต - เดเดœเดจเตเดฑเต เดตเดฟเดทเดฏเดคเตเดคเดฟเตฝ เดธเดจเตเดฆเต‡เดถ เดชเดฆเตเดงเดคเดฟ เดชเตเดฐเด–เตเดฏเดพเดชเดฟเด•เตเด•เตเดจเตเดจ เด•เตเดฒเดพเดธเตเด•เตพ.

เดˆ เดธเดพเดนเดšเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดจเดฎเตเด•เตเด•เต เดชเต‹เด•เดพเด‚ records.py เดˆ เดตเดฟเดทเดฏเดคเตเดคเดฟเดจเดพเดฏเตเดณเตเดณ เดธเดจเตเดฆเต‡เดถเด‚ เดŽเด™เตเด™เดจเต†เดฏเดพเดฏเดฟเดฐเดฟเด•เตเด•เดฃเดฎเต†เดจเตเดจเต เดตเดฟเดตเดฐเดฟเด•เตเด•เตเด•:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

เดจเดฟเด™เตเด™เตพ เดŠเดนเดฟเดšเตเดšเดคเตเดชเต‹เดฒเต†, เดธเดจเตเดฆเต‡เดถ เดธเตเด•เต€เดฎเดฏเต† เดตเดฟเดตเดฐเดฟเด•เตเด•เดพเตป เดซเต—เดธเตเดฑเตเดฑเต เดชเตˆเดคเตเดคเตบ เดคเดฐเด‚ เดตเตเดฏเดพเด–เตเดฏเดพเดจเด‚ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต, เด…เดคเดฟเดจเดพเดฒเดพเดฃเต เดฒเตˆเดฌเตเดฐเดฑเดฟ เดชเดฟเดจเตเดคเตเดฃเดฏเตเด•เตเด•เตเดจเตเดจ เดเดฑเตเดฑเดตเตเด‚ เด•เตเดฑเดžเตเดž เดชเดคเดฟเดชเตเดชเต 3.6.

เดจเดฎเตเด•เตเด•เต เดเดœเดจเตเดฑเดฟเดฒเต‡เด•เตเด•เต เดฎเดŸเด™เตเด™เดพเด‚, เดคเดฐเด™เตเด™เตพ เดธเดœเตเดœเดฎเดพเด•เตเด•เดฟ เด…เดคเต เดšเต‡เตผเด•เตเด•เตเด•:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด•เดพเดฃเดพเดจเดพเด•เตเดจเตเดจเดคเตเดชเต‹เดฒเต†, เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเด•เต€เดฎเดฟเดจเตŠเดชเตเดชเด‚ เด’เดฐเต เดชเตเดคเดฟเดฏ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเตผ เดตเดฟเดทเดฏเด‚ เดธเดฎเดพเดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจ เดฐเต€เดคเดฟเดฏเดฟเดฒเต‡เด•เตเด•เต เด•เตˆเดฎเดพเดฑเตเดจเตเดจเต - value_type. เด•เต‚เดŸเดพเดคเต†, เดŽเดฒเตเดฒเดพเด‚ เด’เดฐเต‡ เดธเตเด•เต€เด‚ เดชเดฟเดจเตเดคเตเดŸเดฐเตเดจเตเดจเต, เด…เดคเดฟเดจเดพเตฝ เดฎเดฑเตเดฑเต†เดจเตเดคเดฟเดจเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเตเด‚ เดšเดฟเดจเตเดคเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดžเดพเตป เด’เดฐเต เด…เตผเดคเตเดฅเดตเตเด‚ เด•เดพเดฃเตเดจเตเดจเดฟเดฒเตเดฒ.

เดฎเต†เดฑเตเดฑเดพ เด‡เตปเดซเตผเดฎเต‡เดทเตป เด•เดณเด•เตเดทเตป เดเดœเดจเตเดฑเดฟเดฒเต‡เด•เตเด•เต เด•เดณเด•เตโ€Œเดฑเตเดฑเต_เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเดฑเตเดฑเตเด•เดณเดฟเดฒเต‡เด•เตเด•เต เด’เดฐเต เด•เต‹เตพ เดšเต‡เตผเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต เด…เดตเดธเดพเดจ เดŸเดšเตเดšเต:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

เดธเดจเตเดฆเต‡เดถเดคเตเดคเดฟเดจเดพเดฏเดฟ เดžเด™เตเด™เตพ เดฎเตเดฎเตเดชเต เดชเตเดฐเด–เตเดฏเดพเดชเดฟเดšเตเดš เดธเตเด•เต€เด‚ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต. เดˆ เดธเดพเดนเดšเดฐเตเดฏเดคเตเดคเดฟเตฝ, เดเดœเดจเตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เดซเดฒเดคเตเดคเดฟเดจเดพเดฏเดฟ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเดฟเดฒเตเดฒ เดŽเดจเตเดจเดคเดฟเดจเดพเตฝ เดžเดพเตป .cast เดฐเต€เดคเดฟ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต, เดชเด•เตเดทเต‡ เด…เดคเต เดŽเดŸเตเดคเตเดคเตเดชเดฑเดฏเต‡เดฃเตเดŸเดคเดพเดฃเต. เดตเดดเดฟเด•เตพ เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เด’เดฐเต เดธเดจเตเดฆเต‡เดถเด‚ เด…เดฏเดฏเตเด•เตเด•เตเด•:

  1. เด•เดพเดธเตเดฑเตเดฑเต - เด’เดฐเต เดซเดฒเด‚ เดชเตเดฐเดคเต€เด•เตเดทเดฟเด•เตเด•เดพเดคเตเดคเดคเดฟเดจเดพเตฝ เดคเดŸเดฏเดฟเดฒเตเดฒ. เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดซเดฒเด‚ เดฎเดฑเตเดฑเตŠเดฐเต เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เต เดธเดจเตเดฆเต‡เดถเดฎเดพเดฏเดฟ เด…เดฏเดฏเตเด•เตเด•เดพเตป เด•เดดเดฟเดฏเดฟเดฒเตเดฒ.

  2. เด…เดฏเดฏเตเด•เตเด•เตเด• - เดคเดŸเดฏเดฟเดฒเตเดฒ, เด•เดพเดฐเดฃเด‚ เด…เดคเต เด’เดฐเต เดซเดฒเด‚ เดชเตเดฐเดคเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจเดฟเดฒเตเดฒ. เดซเดฒเด‚ เดชเต‹เด•เตเดจเตเดจ เดตเดฟเดทเดฏเดคเตเดคเดฟเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดเดœเดจเตเดฑเดฟเดจเต† เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚.

  3. เดšเต‹เดฆเดฟเด•เตเด•เตเด• - เด’เดฐเต เดซเดฒเดคเตเดคเดฟเดจเดพเดฏเดฟ เด•เดพเดคเตเดคเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต. เดซเดฒเด‚ เดชเต‹เด•เตเดจเตเดจ เดตเดฟเดทเดฏเดคเตเดคเดฟเตฝ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เดเดœเดจเตเดฑเดฟเดจเต† เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚.

เด…เดคเดฟเดจเดพเตฝ, เด‡เดจเตเดจเดคเตเดคเต† เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเดฎเดพเดฏเดฟ เด…เดคเตเดฐเดฎเดพเดคเตเดฐเด‚!

เดธเตเดตเดชเตเดจ เดŸเต€เด‚

เดˆ เดญเดพเด—เดคเตเดคเต เดŽเดดเตเดคเดพเดฎเต†เดจเตเดจเต เดžเดพเตป เดตเดพเด—เตเดฆเดพเดจเด‚ เดšเต†เดฏเตเดค เด…เดตเดธเดพเดจ เด•เดพเดฐเตเดฏเด‚ เด•เดฎเดพเตปเดกเตเด•เตพ เด†เดฃเต. เดจเต‡เดฐเดคเตเดคเต† เดธเต‚เดšเดฟเดชเตเดชเดฟเดšเตเดšเดคเตเดชเต‹เดฒเต†, เดซเดพเดธเตเดฑเตเดฑเดฟเดฒเต† เด•เดฎเดพเตปเดกเตเด•เตพ เด•เตเดฒเดฟเด•เตเด•เดฟเดจเต เดšเตเดฑเตเดฑเตเดฎเตเดณเตเดณ เด’เดฐเต เดฑเดพเดชเตเดชเดฑเดพเดฃเต. เดตเดพเดธเตเดคเดตเดคเตเดคเดฟเตฝ, -A เด•เต€ เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เตเดฎเตเดชเต‹เตพ, เด…เดคเดฟเดจเตเดฑเต† เด‡เดจเตเดฑเตผเดซเต‡เดธเดฟเดฒเต‡เด•เตเด•เต faust เดจเดฎเตเดฎเตเดŸเต† เด‡เดทเตโ€ŒเดŸเดพเดจเตเดธเตƒเดค เด•เดฎเดพเตปเดกเต เด…เดฑเตเดฑเดพเดšเตเดšเตเดšเต†เดฏเตเดฏเตเดจเตเดจเต

เดชเตเดฐเด–เตเดฏเดพเดชเดฟเดšเตเดš เดเดœเดจเตเดฑเตเดฎเดพเตผเด•เตเด•เต เดถเต‡เดทเด‚ agents.py เด’เดฐเต เดกเต†เด•เตเด•เดฑเต‡เดฑเตเดฑเตผ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด’เดฐเต เดซเด‚เด—เตเดทเตป เดšเต‡เตผเด•เตเด•เตเด• app.commandเดฐเต€เดคเดฟ เดตเดฟเดณเดฟเด•เตเด•เตเดจเตเดจเต เด•เดพเดธเตเดฑเตเดฑเตเดšเต†เดฏเตเดฏเตเด• ัƒ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด•_เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เตพ:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

เด…เดคเดฟเดจเดพเตฝ, เดžเด™เตเด™เตพ เด•เดฎเดพเตปเดกเตเด•เดณเตเดŸเต† เดฒเดฟเดธเตเดฑเตเดฑเต เดตเดฟเดณเดฟเด•เตเด•เตเด•เดฏเดพเดฃเต†เด™เตเด•เดฟเตฝ, เดจเดฎเตเดฎเตเดŸเต† เดชเตเดคเดฟเดฏ เด•เดฎเดพเตปเดกเต เด…เดคเดฟเตฝ เด‰เดฃเตเดŸเดพเด•เตเด‚:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

เดฎเดฑเตเดฑเดพเดฐเต†เดฏเตเด‚ เดชเต‹เดฒเต† เดจเดฎเตเด•เตเด•เต เด‡เดคเต เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚, เด…เดคเดฟเดจเดพเตฝ เดจเดฎเตเด•เตเด•เต เดซเดพเดธเตเดฑเตเดฑเต เดตเตผเด•เตเด•เตผ เดชเตเดจเดฐเดพเดฐเด‚เดญเดฟเดšเตเดšเต เดธเต†เด•เตเดฏเต‚เดฐเดฟเดฑเตเดฑเดฟเด•เดณเตเดŸเต† เด’เดฐเต เดชเต‚เตผเดฃเตเดฃ เดถเต‡เด–เดฐเด‚ เด†เดฐเด‚เดญเดฟเด•เตเด•เดพเด‚:

> faust -A horton.agents start-collect-securities

เด…เดŸเตเดคเตเดคเดคเดพเดฏเดฟ เดŽเดจเตเดคเต เดธเด‚เดญเดตเดฟเด•เตเด•เตเด‚?

เด…เดŸเตเดคเตเดค เดญเดพเด—เดคเตเดคเต, เดถเต‡เดทเดฟเด•เตเด•เตเดจเตเดจ เดเดœเดจเตเดฑเตเดฎเดพเดฐเต† เด‰เดฆเดพเดนเดฐเดฃเดฎเดพเดฏเดฟ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต, เดตเตผเดทเดคเตเดคเต‡เด•เตเด•เตเดณเตเดณ เดŸเตเดฐเต‡เดกเดฟเด‚เด—เดฟเดจเตเดฑเต† เด…เดตเดธเดพเดจ เดตเดฟเดฒเดฏเดฟเดฒเตเด‚ เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเดŸเต† เด•เตเดฐเต‹เตบ เดฒเต‹เดžเตเดšเดฟเดฒเตเด‚ เด…เดคเดฟเดฐเตเด•เดŸเดจเตเดจเดคเดฟเดจเดพเดฏเดฟ เดคเดฟเดฐเดฏเตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดธเดฟเด™เตเด•เต เดฎเต†เด•เตเด•เดพเดจเดฟเดธเด‚ เดžเด™เตเด™เตพ เดชเดฐเดฟเด—เดฃเดฟเด•เตเด•เตเด‚.

เด‡เดจเตเดจเดคเตเดคเต‡เด•เตเด•เต เด…เดคเตเดฐเดฎเดพเดคเตเดฐเด‚! เดตเดพเดฏเดฟเดšเตเดšเดคเดฟเดจเต เดจเดจเตเดฆเดฟ :)

เดˆ เดญเดพเด—เดคเตเดคเดฟเดจเตเดณเตเดณ เด•เต‹เดกเต

เดซเต—เดธเตเดฑเตเดฑเดฟเดฒเต† เดชเดถเตเดšเดพเดคเตเดคเดฒ เดœเต‹เดฒเดฟเด•เตพ, เดญเดพเด—เด‚ II: เดเดœเดจเตเดฑเตเดฎเดพเดฐเตเด‚ เดŸเต€เดฎเตเด•เดณเตเด‚

PS เด…เดตเดธเดพเดจ เดญเดพเด—เดคเตเดคเดฟเดจเต เด•เต€เดดเดฟเตฝ, เดซเต—เดธเตเดฑเตเดฑเตเด‚ เดธเด‚เด—เดฎเดตเตเด‚ เด†เดฏ เด•เดพเดซเตเด•เดฏเต†เด•เตเด•เตเดฑเดฟเดšเตเดšเต เดŽเดจเตเดจเต‹เดŸเต เดšเต‹เดฆเดฟเดšเตเดšเต (เดธเด‚เด—เดฎเดคเตเดคเดฟเดจเต เดŽเดจเตเดคเต เดธเดตเดฟเดถเต‡เดทเดคเด•เตพ เด‰เดฃเตเดŸเต?). เด•เตบโ€Œเดซเตเดฒเต‚เดฏเดจเตเดฑเต เดชเดฒ เดคเดฐเดคเตเดคเดฟเตฝ เด•เต‚เดŸเตเดคเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดจเด•เตเดทเดฎเดฎเดพเดฃเต†เดจเตเดจเต เดคเต‹เดจเตเดจเตเดจเตเดจเต, เดชเด•เตเดทเต‡ เดซเต—เดธเตเดฑเตเดฑเดฟเดจเต เด•เตบโ€Œเดซเตเดฒเต‚เดฏเดจเตเดฑเดฟเดจเต เดชเต‚เตผเดฃเตเดฃเดฎเดพเดฏ เด•เตเดฒเดฏเดจเตเดฑเต เดชเดฟเดจเตเดคเตเดฃเดฏเดฟเดฒเตเดฒ เดŽเดจเตเดจเดคเดพเดฃเต เดตเดธเตเดคเตเดค - เด‡เดคเต เด‡เดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจเดคเดฟเตฝ เดจเดฟเดจเตเดจเต เดชเดฟเดจเตเดคเตเดŸเดฐเตเดจเตเดจเต เดชเตเดฐเดฎเดพเดฃเดคเตเดคเดฟเดฒเต† เด•เตเดฒเดฏเดจเตเดฑเต เดจเดฟเดฏเดจเตเดคเตเดฐเดฃเด™เตเด™เดณเตเดŸเต† เดตเดฟเดตเดฐเดฃเด™เตเด™เตพ.

เด…เดตเดฒเด‚เดฌเด‚: www.habr.com

เด’เดฐเต เด…เดญเดฟเดชเตเดฐเดพเดฏเด‚ เดšเต‡เตผเด•เตเด•เตเด•