เจซเฉŒเจธเจŸ, เจญเจพเจ— II 'เจคเฉ‡ เจชเจฟเจ›เฉ‹เจ•เฉœ เจ•เจพเจฐเจœ: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚

เจซเฉŒเจธเจŸ, เจญเจพเจ— II 'เจคเฉ‡ เจชเจฟเจ›เฉ‹เจ•เฉœ เจ•เจพเจฐเจœ: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚

เจตเจฟเจธเจผเจพ-เจธเฉ‚เจšเฉ€

  1. เจญเจพเจ— I: เจœเจพเจฃ-เจชเจ›เจพเจฃ

  2. เจญเจพเจ— II: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจฅเฉ‡ เจ•เฉ€ เจ•เจฐ เจฐเจนเฉ‡ เจนเจพเจ‚?

เจ‡เจธ เจฒเจˆ, เจ‡เจธ เจฒเจˆ, เจฆเฉ‚เจœเจพ เจญเจพเจ—. เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจชเจนเจฟเจฒเจพเจ‚ เจฒเจฟเจ–เจฟเจ† เจ—เจฟเจ† เจธเฉ€, เจ‡เจธ เจตเจฟเฉฑเจš เจ…เจธเฉ€เจ‚ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ‡ เจ•เฉฐเจฎ เจ•เจฐเจพเจ‚เจ—เฉ‡:

  1. เจ†เจ‰ เจ…เจธเฉ€เจ‚ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ‡ เจ…เฉฐเจคเจฎ เจฌเจฟเฉฐเจฆเฉ‚เจ†เจ‚ เจฒเจˆ เจฌเฉ‡เจจเจคเฉ€เจ†เจ‚ เจฆเฉ‡ เจจเจพเจฒ aiohttp 'เจคเฉ‡ เจ…เจฒเจซเจพเจตเจพเจ‚เจŸเฉ‡เจœ เจฒเจˆ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเจพ เจ•เจฒเจพเจ‡เฉฐเจŸ เจฒเจฟเจ–เฉ€เจเฅค

  2. เจ†เจ‰ เจ‡เฉฑเจ• เจเจœเฉฐเจŸ เจฌเจฃเจพเจˆเจ เจœเฉ‹ เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจฌเจพเจฐเฉ‡ เจกเฉ‡เจŸเจพ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจ‰เฉฑเจคเฉ‡ เจฎเฉˆเจŸเจพ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจ‡เจ•เฉฑเจ เฉ€ เจ•เจฐเฉ‡เจ—เจพเฅค

เจชเจฐ, เจ‡เจน เจ‰เจน เจนเฉˆ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจ–เฉเจฆ เจชเฉเจฐเฉ‹เจœเฉˆเจ•เจŸ เจฒเจˆ เจ•เจฐเจพเจ‚เจ—เฉ‡, เจ…เจคเฉ‡ เจซเจพเจธเจŸ เจ–เฉ‹เจœ เจฆเฉ‡ เจฐเฉ‚เจช เจตเจฟเฉฑเจš, เจ…เจธเฉ€เจ‚ เจธเจฟเฉฑเจ–เจพเจ‚เจ—เฉ‡ เจ•เจฟ เจ•เจพเจซเจ•เจพ เจคเฉ‹เจ‚ เจธเจŸเฉเจฐเฉ€เจฎ เจ‡เจตเฉˆเจ‚เจŸเจพเจ‚ เจจเฉ‚เฉฐ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจเจœเฉฐเจŸเจพเจ‚ เจจเฉ‚เฉฐ เจ•เจฟเจตเฉ‡เจ‚ เจฒเจฟเจ–เจฃเจพ เจนเฉˆ, เจ…เจคเฉ‡ เจจเจพเจฒ เจนเฉ€ เจธเจพเจกเฉ‡ เจ•เฉ‡เจธ เจตเจฟเฉฑเจš เจ•เจฎเจพเจ‚เจกเจพเจ‚ (เจ•เจฒเจฟเฉฑเจ• เจฐเฉˆเจชเจฐ) เจจเฉ‚เฉฐ เจ•เจฟเจตเฉ‡เจ‚ เจฒเจฟเจ–เจฃเจพ เจนเฉˆ - เจ‰เจธ เจตเจฟเจธเจผเฉ‡ เจฒเจˆ เจฎเฉˆเจจเฉ‚เจ…เจฒ เจชเฉเจธเจผ เจธเฉเจจเฉ‡เจนเจฟเจ†เจ‚ เจฒเจˆ เจœเจฟเจธเจฆเฉ€ เจเจœเฉฐเจŸ เจจเจฟเจ—เจฐเจพเจจเฉ€ เจ•เจฐ เจฐเจฟเจนเจพ เจนเฉˆเฅค

เจธเจฟเจ–เจฒเจพเจˆ

AlphaVantage เจ•เจฒเจพเจ‡เฉฐเจŸ

เจชเจนเจฟเจฒเจพเจ‚, เจ†เจ“ เจ…เจฒเจซเจพเจตเจพเจ‚เจŸเฉ‡เจœ เจฆเฉ€เจ†เจ‚ เจฌเฉ‡เจจเจคเฉ€เจ†เจ‚ เจฒเจˆ เจ‡เฉฑเจ• เจ›เฉ‹เจŸเจพ aiohttp เจ•เจฒเจพเจ‡เฉฐเจŸ เจฒเจฟเจ–เฉ€เจเฅค

alphavantage.py

เจธเจชเฉ‹เจ‡เจฒเจฐ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

เจ…เจธเจฒ เจตเจฟเฉฑเจš, เจ‡เจธ เจคเฉ‹เจ‚ เจธเจญ เจ•เฉเจ เจธเจชเจธเจผเจŸ เจนเฉˆ:

  1. AlphaVantage API เจจเฉ‚เฉฐ เจ•เจพเจซเจผเฉ€ เจธเจงเจพเจฐเจจ เจ…เจคเฉ‡ เจธเฉเฉฐเจฆเจฐ เจขเฉฐเจ— เจจเจพเจฒ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆ, เจ‡เจธเจฒเจˆ เจฎเฉˆเจ‚ เจตเจฟเจงเฉ€ เจฆเฉเจ†เจฐเจพ เจธเจพเจฐเฉ€เจ†เจ‚ เจฌเฉ‡เจจเจคเฉ€เจ†เจ‚ เจ•เจฐเจจ เจฆเจพ เจซเฉˆเจธเจฒเจพ เจ•เฉ€เจคเจพ เจนเฉˆ construct_query เจœเจฟเฉฑเจฅเฉ‡ เจฌเจฆเจฒเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• http เจ•เจพเจฒ เจนเฉเฉฐเจฆเฉ€ เจนเฉˆเฅค

  2. เจฎเฉˆเจ‚ เจธเจพเจฐเฉ‡ เจ–เฉ‡เจค เจฒเจฟเจ†เจ‰เจ‚เจฆเจพ เจนเจพเจ‚ snake_case เจ†เจฐเจพเจฎ เจฒเจˆ.

  3. เจ–เฉˆเจฐ, เจธเฉเฉฐเจฆเจฐ เจ…เจคเฉ‡ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจญเจฐเจชเฉ‚เจฐ เจŸเจฐเฉ‡เจธเจฌเฉˆเจ• เจ†เจ‰เจŸเจชเฉเฉฑเจŸ เจฒเจˆ logger.catch เจธเจœเจพเจตเจŸเฅค

PS เจธเจฅเจพเจจเจ• เจคเฉŒเจฐ 'เจคเฉ‡ config.yml เจตเจฟเฉฑเจš เจ…เจฒเจซเจพเจตเจพเจ‚เจŸเฉ‡เจœ เจŸเฉ‹เจ•เจจ เจจเฉ‚เฉฐ เจœเฉ‹เฉœเจจเจพ เจจเจพ เจญเฉเฉฑเจฒเฉ‹, เจœเจพเจ‚ เจตเจพเจคเจพเจตเจฐเจฃ เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจจเฉ‚เฉฐ เจจเจฟเจฐเจฏเจพเจค เจ•เจฐเฉ‹เฅค HORTON_SERVICE_APIKEY. เจธเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจŸเฉ‹เจ•เจจ เจชเฉเจฐเจพเจชเจค เจนเฉเฉฐเจฆเจพ เจนเฉˆ เจ‡เฉฑเจฅเฉ‡.

CRUD เจ•เจฒเจพเจธ

เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจฌเจพเจฐเฉ‡ เจฎเฉˆเจŸเจพ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจธเจŸเฉ‹เจฐ เจ•เจฐเจจ เจฒเจˆ เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจฆเจพ เจธเฉฐเจ—เฉเจฐเจนเจฟ เจนเฉ‹เจตเฉ‡เจ—เจพเฅค

database/security.py

เจฎเฉ‡เจฐเฉ€ เจฐเจพเจ เจตเจฟเฉฑเจš, เจ‡เฉฑเจฅเฉ‡ เจ•เฉเจ เจตเฉ€ เจตเจฟเจ†เจ–เจฟเจ† เจ•เจฐเจจ เจฆเฉ€ เจ•เฉ‹เจˆ เจฒเฉ‹เฉœ เจจเจนเฉ€เจ‚ เจนเฉˆ, เจ…เจคเฉ‡ เจฌเฉ‡เจธ เจ•เจฒเจพเจธ เจ†เจชเจฃเฉ‡ เจ†เจช เจตเจฟเฉฑเจš เจ•เจพเจซเจผเฉ€ เจธเจงเจพเจฐเจจ เจนเฉˆ.

get_app()

เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจ†เจฌเจœเฉˆเจ•เจŸ เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ เจ‡เฉฑเจ• เจซเฉฐเจ•เจธเจผเจจ เจœเฉ‹เฉœเฉ‹ app.py

เจธเจชเฉ‹เจ‡เจฒเจฐ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

เจนเฉเจฃ เจฒเจˆ เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจธเจญ เจคเฉ‹เจ‚ เจธเจฐเจฒ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจฌเจฃเจพเจ‰เจฃเจพ เจนเฉ‹เจตเฉ‡เจ—เจพ, เจฅเฉ‹เฉœเฉ€ เจฆเฉ‡เจฐ เจฌเจพเจ…เจฆ เจ…เจธเฉ€เจ‚ เจ‡เจธเจฆเจพ เจตเจฟเจธเจคเจพเจฐ เจ•เจฐเจพเจ‚เจ—เฉ‡, เจนเจพเจฒเจพเจ‚เจ•เจฟ, เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‰เจกเฉ€เจ• เจจเจพ เจ•เจฐเจจ เจฒเจˆ, เจ‡เฉฑเจฅเฉ‡ เจนเจตเจพเจฒเฉ‡ เจเจช-เจ•เจฒเจพเจธ เจฒเจˆเฅค เจฎเฉˆเจ‚ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจธเฉˆเจŸเจฟเฉฐเจ—เจœเจผ เจ•เจฒเจพเจธ 'เจคเฉ‡ เจ‡เฉฑเจ• เจจเจœเจผเจฐ เจฎเจพเจฐเจจ เจฆเฉ€ เจตเฉ€ เจธเจฒเจพเจน เจฆเจฟเฉฐเจฆเจพ เจนเจพเจ‚, เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจœเจผเจฟเจ†เจฆเจพเจคเจฐ เจธเฉˆเจŸเจฟเฉฐเจ—เจพเจ‚ เจฒเจˆ เจœเจผเจฟเฉฐเจฎเฉ‡เจตเจพเจฐ เจนเฉˆเฅค

เจฎเฉเฉฑเจ– เจนเจฟเฉฑเจธเจพ

เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจฆเฉ€ เจธเฉ‚เจšเฉ€ เจ‡เจ•เฉฑเจ เฉ€ เจ•เจฐเจจ เจ…เจคเฉ‡ เจธเฉฐเจญเจพเจฒเจฃ เจฒเจˆ เจเจœเฉฐเจŸ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เจ‡เจธ เจฒเจˆ, เจชเจนเจฟเจฒเจพเจ‚ เจ…เจธเฉ€เจ‚ เจซเจพเจธเจŸ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจ†เจฌเจœเฉˆเจ•เจŸ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ - เจ‡เจน เจ•เจพเจซเจผเฉ€ เจธเจงเจพเจฐเจจ เจนเฉˆเฅค เจ…เฉฑเจ—เฉ‡, เจ…เจธเฉ€เจ‚ เจธเจชเฉฑเจธเจผเจŸ เจคเฉŒเจฐ 'เจคเฉ‡ เจธเจพเจกเฉ‡ เจเจœเฉฐเจŸ เจฒเจˆ เจ‡เฉฑเจ• เจตเจฟเจธเจผเจพ เจ˜เฉ‹เจธเจผเจฟเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚... เจ‡เฉฑเจฅเฉ‡ เจ‡เจน เจตเจฐเจฃเจจ เจฏเฉ‹เจ— เจนเฉˆ เจ•เจฟ เจ‡เจน เจ•เฉ€ เจนเฉˆ, เจ…เฉฐเจฆเจฐเฉ‚เจจเฉ€ เจฎเจพเจชเจฆเฉฐเจก เจ•เฉ€ เจนเฉˆ เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจตเฉฑเจ–เจฐเฉ‡ เจขเฉฐเจ— เจจเจพเจฒ เจ•เจฟเจตเฉ‡เจ‚ เจตเจฟเจตเจธเจฅเจฟเจค เจ•เฉ€เจคเจพ เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆเฅค

  1. เจ•เจพเจซเจ•เจพ เจตเจฟเฉฑเจš เจตเจฟเจธเจผเฉ‡, เจœเฉ‡เจ•เจฐ เจ…เจธเฉ€เจ‚ เจธเจนเฉ€ เจชเจฐเจฟเจญเจพเจธเจผเจพ เจœเจพเจฃเจจเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจชเฉœเฉเจนเจจเจพ เจฌเจฟเจนเจคเจฐ เจนเฉˆ เจฌเฉฐเจฆ เจฆเจธเจคเจพเจตเฉ‡เจœเจผ, เจœเจพเจ‚ เจคเฉเจธเฉ€เจ‚ เจชเฉœเฉเจน เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจธเฉฐเจ—เฉเจฐเจนเจฟ เจฐเฉ‚เจธเฉ€ เจตเจฟเฉฑเจš เจนเฉˆเจฌเจฐเฉ‡ 'เจคเฉ‡, เจœเจฟเฉฑเจฅเฉ‡ เจนเจฐ เจšเฉ€เจœเจผ เจฌเจฟเจฒเจ•เฉเจฒ เจธเจนเฉ€ เจฐเฉ‚เจช เจตเจฟเฉฑเจš เจชเฉเจฐเจคเฉ€เจฌเจฟเฉฐเจฌเจฟเจค เจนเฉเฉฐเจฆเฉ€ เจนเฉˆ :)

  2. เจ…เฉฐเจฆเจฐเฉ‚เจจเฉ€ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ, faust doc เจตเจฟเฉฑเจš เจฌเจนเฉเจค เจตเจงเฉ€เจ† เจขเฉฐเจ— เจจเจพเจฒ เจตเจฐเจฃเจจ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆ, เจธเจพเจจเฉ‚เฉฐ เจ•เฉ‹เจก เจตเจฟเฉฑเจš เจธเจฟเฉฑเจงเฉ‡ เจตเจฟเจธเจผเฉ‡ เจจเฉ‚เฉฐ เจ•เฉŒเจ‚เจซเจฟเจ—เจฐ เจ•เจฐเจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉˆ, เจฌเฉ‡เจธเจผเจ•, เจ‡เจธเจฆเจพ เจฎเจคเจฒเจฌ เจนเฉˆ เจซเฉŒเจธเจŸ เจกเจฟเจตเฉˆเจฒเจชเจฐเจพเจ‚ เจฆเฉเจ†เจฐเจพ เจชเฉเจฐเจฆเจพเจจ เจ•เฉ€เจคเฉ‡ เจ—เจ เจฎเจพเจชเจฆเฉฐเจก, เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ: เจงเจพเจฐเจจ, เจงเจพเจฐเจจ เจจเฉ€เจคเฉ€ (เจกเจฟเจซเฉŒเจฒเจŸ เจฎเจฟเจŸเจพเจ“, เจชเจฐ เจคเฉเจธเฉ€เจ‚ เจธเฉˆเฉฑเจŸ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจธเฉฐเจ–เฉ‡เจชเจชเฉเจฐเจคเฉ€ เจตเจฟเจธเจผเจพ เจญเจพเจ—เจพเจ‚ เจฆเฉ€ เจ—เจฟเจฃเจคเฉ€ (เจญเจพเจ—เจ•เจฐเจจ เจฒเจˆ, เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจ˜เฉฑเจŸ เจ—เจฒเฉ‹เจฌเจฒ เจฎเจนเฉฑเจคเจคเจพ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ faust).

  3. เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡, เจเจœเฉฐเจŸ เจ—เจฒเฉ‹เจฌเจฒ เจฎเฉเฉฑเจฒเจพเจ‚ เจฆเฉ‡ เจจเจพเจฒ เจ‡เฉฑเจ• เจชเฉเจฐเจฌเฉฐเจงเจฟเจค เจตเจฟเจธเจผเจพ เจฌเจฃเจพ เจธเจ•เจฆเจพ เจนเฉˆ, เจนเจพเจฒเจพเจ‚เจ•เจฟ, เจฎเฉˆเจ‚ เจนเจฐ เจšเฉ€เจœเจผ เจจเฉ‚เฉฐ เจธเจชเฉฑเจธเจผเจŸ เจฐเฉ‚เจช เจตเจฟเฉฑเจš เจ˜เฉ‹เจธเจผเจฟเจค เจ•เจฐเจจเจพ เจชเจธเฉฐเจฆ เจ•เจฐเจฆเจพ เจนเจพเจ‚เฅค เจ‡เจธ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจเจœเฉฐเจŸ เจ‡เจธเจผเจคเจฟเจนเจพเจฐ เจตเจฟเฉฑเจš เจตเจฟเจธเจผเฉ‡ เจฆเฉ‡ เจ•เฉเจ เจฎเจพเจชเจฆเฉฐเจก (เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจญเจพเจ—เจพเจ‚ เจฆเฉ€ เจ—เจฟเจฃเจคเฉ€ เจœเจพเจ‚ เจงเจพเจฐเจจ เจจเฉ€เจคเฉ€) เจจเฉ‚เฉฐ เจ•เฉŒเจ‚เจซเจฟเจ—เจฐ เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเจพ เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆเฅค

    เจตเจฟเจธเจผเฉ‡ เจจเฉ‚เฉฐ เจนเฉฑเจฅเฉ€เจ‚ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจ•เฉ€เจคเฉ‡ เจฌเจฟเจจเจพเจ‚ เจ‡เจน เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚ เจฆเจพ เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡ เจธเจ•เจฆเจพ เจนเฉˆ:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เจ–เฉˆเจฐ, เจ†เจ“ เจนเฉเจฃ เจตเจฐเจฃเจจ เจ•เจฐเฉ€เจ เจ•เจฟ เจธเจพเจกเจพ เจเจœเฉฐเจŸ เจ•เฉ€ เจ•เจฐเฉ‡เจ—เจพ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

เจ‡เจธ เจฒเจˆ, เจเจœเฉฐเจŸ เจฆเฉ€ เจธเจผเฉเจฐเฉ‚เจ†เจค เจตเจฟเฉฑเจš, เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจ•เจฒเจพเจ‡เฉฐเจŸ เจฆเฉเจ†เจฐเจพ เจฌเฉ‡เจจเจคเฉ€เจ†เจ‚ เจฒเจˆ เจ‡เฉฑเจ• aiohttp เจธเฉˆเจธเจผเจจ เจ–เฉ‹เจฒเฉเจนเจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚, เจ‡เฉฑเจ• เจ•เจฐเจฎเจšเจพเจฐเฉ€ เจจเฉ‚เฉฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจจ เจตเฉ‡เจฒเฉ‡, เจœเจฆเฉ‹เจ‚ เจธเจพเจกเจพ เจเจœเฉฐเจŸ เจฒเจพเจ‚เจš เจ•เฉ€เจคเจพ เจœเจพเจ‚เจฆเจพ เจนเฉˆ, เจ‡เฉฑเจ• เจธเฉˆเจธเจผเจจ เจคเฉเจฐเฉฐเจค เจ–เฉ‹เจฒเฉเจนเจฟเจ† เจœเจพเจตเฉ‡เจ—เจพ - เจ‡เฉฑเจ•, เจ•เจฐเจฎเจšเจพเจฐเฉ€ เจฆเฉ‡ เจšเฉฑเจฒ เจฐเจนเฉ‡ เจชเฉ‚เจฐเฉ‡ เจธเจฎเฉ‡เจ‚ เจฒเจˆ (เจœเจพเจ‚ เจ•เจˆ, เจœเฉ‡เจ•เจฐ เจคเฉเจธเฉ€เจ‚ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ เจฌเจฆเจฒเจฆเฉ‡ เจนเฉ‹ เจธเจนเจฟเจฎเจคเฉ€ เจกเจฟเจซเฉŒเจฒเจŸ เจฏเฉ‚เจจเจฟเจŸ เจตเจพเจฒเฉ‡ เจเจœเฉฐเจŸ เจคเฉ‹เจ‚)เฅค

เจ…เฉฑเจ—เฉ‡, เจ…เจธเฉ€เจ‚ เจธเจŸเฉเจฐเฉ€เจฎ เจฆเฉ€ เจชเจพเจฒเจฃเจพ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ (เจ…เจธเฉ€เจ‚ เจธเฉฐเจฆเฉ‡เจธเจผ เจจเฉ‚เฉฐ เจ…เฉฐเจฆเจฐ เจฐเฉฑเจ–เจฆเฉ‡ เจนเจพเจ‚ _, เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ…เจธเฉ€เจ‚, เจ‡เจธ เจเจœเฉฐเจŸ เจตเจฟเฉฑเจš, เจธเจพเจกเฉ‡ เจตเจฟเจธเจผเฉ‡ เจคเฉ‹เจ‚ เจธเฉฐเจฆเฉ‡เจธเจผเจพเจ‚ เจฆเฉ€ เจธเจฎเฉฑเจ—เจฐเฉ€ เจฆเฉ€ เจชเจฐเจตเจพเจน เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเฉ‡, เจœเฉ‡เจ•เจฐ เจ‰เจน เจฎเฉŒเจœเฉ‚เจฆเจพ เจ†เจซเจธเฉˆเฉฑเจŸ 'เจคเฉ‡ เจฎเฉŒเจœเฉ‚เจฆ เจนเจจ, เจจเจนเฉ€เจ‚ เจคเจพเจ‚ เจธเจพเจกเจพ เจšเฉฑเจ•เจฐ เจ‰เจนเจจเจพเจ‚ เจฆเฉ‡ เจ†เจ‰เจฃ เจฆเฉ€ เจ‰เจกเฉ€เจ• เจ•เจฐเฉ‡เจ—เจพเฅค เจ–เฉˆเจฐ, เจธเจพเจกเฉ‡ เจฒเฉ‚เจช เจฆเฉ‡ เจ…เฉฐเจฆเจฐ, เจ…เจธเฉ€เจ‚ เจธเฉเจจเฉ‡เจนเฉ‡ เจฆเฉ€ เจฐเจธเฉ€เจฆ เจจเฉ‚เฉฐ เจฒเฉŒเจ— เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจ•เจฟเจฐเจฟเจ†เจธเจผเฉ€เจฒ เจฆเฉ€ เจธเฉ‚เจšเฉ€ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ (get_securities เจธเจฟเจฐเจซเจผ เจกเจฟเจซเฉŒเจฒเจŸ เจคเฉŒเจฐ 'เจคเฉ‡ เจธเจฐเจ—เจฐเจฎ เจนเฉˆ, เจ•เจฒเจพเจ‡เฉฐเจŸ เจ•เฉ‹เจก เจฆเฉ‡เจ–เฉ‹) เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจตเจฟเฉฑเจš เจธเฉเจฐเฉฑเจ–เจฟเจ…เจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจ‡เจน เจœเจพเจ‚เจš เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ เจ•เจฟ เจ•เฉ€ เจ‰เจธเฉ‡ เจŸเจฟเจ•เจฐ เจจเจพเจฒ เจ•เฉ‹เจˆ เจธเฉเจฐเฉฑเจ–เจฟเจ† เจนเฉˆ เจ…เจคเฉ‡ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจตเจฟเฉฑเจš เจเจ•เจธเจšเฉ‡เจ‚เจœ, เจœเฉ‡เจ•เจฐ เจ‰เฉฑเจฅเฉ‡ เจนเฉˆ, เจคเจพเจ‚ เจ‡เจน (เจ•เจพเจ—เจœเจผ) เจฌเจธ เจ…เฉฑเจชเจกเฉ‡เจŸ เจ•เฉ€เจคเจพ เจœเจพเจตเฉ‡เจ—เจพเฅค

เจ†เจ“ เจ†เจชเจฃเฉ€ เจฐเจšเจจเจพ เจจเฉ‚เฉฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเฉ€เจ!

> docker-compose up -d
... ะ—ะฐะฟัƒัะบ ะบะพะฝั‚ะตะนะฝะตั€ะพะฒ ...
> faust -A horton.agents worker --without-web -l info

PS เจตเจฟเจธเจผเฉ‡เจธเจผเจคเจพเจตเจพเจ‚ เจตเฉˆเฉฑเจฌ เจญเจพเจ— เจฎเฉˆเจ‚ เจฒเฉ‡เจ–เจพเจ‚ เจตเจฟเฉฑเจš เจซเจพเจธเจŸ 'เจคเฉ‡ เจตเจฟเจšเจพเจฐ เจจเจนเฉ€เจ‚ เจ•เจฐเจพเจ‚เจ—เจพ, เจ‡เจธ เจฒเจˆ เจ…เจธเฉ€เจ‚ เจ‰เจšเจฟเจค เจซเจฒเฉˆเจ— เจธเฉˆเฉฑเจŸ เจ•เฉ€เจคเจพ เจนเฉˆเฅค

เจธเจพเจกเฉ€ เจฒเจพเจ‚เจš เจ•เจฎเจพเจ‚เจก เจตเจฟเฉฑเจš, เจ…เจธเฉ€เจ‚ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจฒเฉŒเจ— เจ†เจ‰เจŸเจชเฉเฉฑเจŸ เจชเฉฑเจงเจฐ เจฆเฉ‡ เจจเจพเจฒ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจ†เจฌเจœเฉˆเจ•เจŸ เจจเฉ‚เฉฐ เจ•เจฟเฉฑเจฅเฉ‡ เจฒเฉฑเจญเจฃเจพ เจนเฉˆ เจ…เจคเฉ‡ เจ‡เจธ เจจเจพเจฒ เจ•เฉ€ เจ•เจฐเจจเจพ เจนเฉˆ (เจ‡เฉฑเจ• เจตเจฐเจ•เจฐ เจจเฉ‚เฉฐ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจจเจพ เจนเฉˆ) เจฌเจพเจฐเฉ‡ เจฆเฉฑเจธเจฟเจ† เจนเฉˆเฅค เจธเจพเจจเฉ‚เฉฐ เจนเฉ‡เจ  เจฆเจฟเฉฑเจคเฉ€ เจ†เจ‰เจŸเจชเฉเฉฑเจŸ เจฎเจฟเจฒเจฆเฉ€ เจนเฉˆ:

เจธเจชเฉ‹เจ‡เจฒเจฐ

โ”Œฦ’aยตSโ€  v1.10.4โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ id          โ”‚ horton                                            โ”‚
โ”‚ transport   โ”‚ [URL('kafka://localhost:9092')]                   โ”‚
โ”‚ store       โ”‚ memory:                                           โ”‚
โ”‚ log         โ”‚ -stderr- (info)                                   โ”‚
โ”‚ pid         โ”‚ 1271262                                           โ”‚
โ”‚ hostname    โ”‚ host-name                                         โ”‚
โ”‚ platform    โ”‚ CPython 3.8.2 (Linux x86_64)                      โ”‚
โ”‚ drivers     โ”‚                                                   โ”‚
โ”‚   transport โ”‚ aiokafka=1.1.6                                    โ”‚
โ”‚   web       โ”‚ aiohttp=3.6.2                                     โ”‚
โ”‚ datadir     โ”‚ /path/to/project/horton-data                      โ”‚
โ”‚ appdir      โ”‚ /path/to/project/horton-data/v1                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...

โ”ŒTopic Partition Setโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ topic                      โ”‚ partitions โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ collect_securities         โ”‚ {0-7}      โ”‚
โ”‚ horton-__assignor-__leader โ”‚ {0}        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 

เจ‡เจน เจœเจฟเฉฐเจฆเจพ เจนเฉˆ !!!

เจ†เจ“ เจชเจพเจฐเจŸเฉ€เจธเจผเจจ เจธเฉˆเฉฑเจŸ เจจเฉ‚เฉฐ เจตเฉ‡เจ–เฉ€เจเฅค เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจ…เจธเฉ€เจ‚ เจตเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเจพเจ‚, เจ‡เฉฑเจ• เจตเจฟเจธเจผเจพ เจ‰เจธ เจจเจพเจฎ เจจเจพเจฒ เจฌเจฃเจพเจ‡เจ† เจ—เจฟเจ† เจธเฉ€ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจ•เฉ‹เจก เจตเจฟเฉฑเจš เจจเจฟเจฐเจงเจพเจฐเจค เจ•เฉ€เจคเจพ เจนเฉˆ, เจญเจพเจ—เจพเจ‚ เจฆเฉ€ เจกเจฟเจซเฉŒเจฒเจŸ เจธเฉฐเจ–เจฟเจ† (8, เจ‡เจธ เจคเฉ‹เจ‚ เจฒเจˆ เจ—เจˆ เจนเฉˆเฅค topic_partitions - เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจ เจ†เจฌเจœเฉˆเจ•เจŸ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ), เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ…เจธเฉ€เจ‚ เจ†เจชเจฃเฉ‡ เจตเจฟเจธเจผเฉ‡ (เจญเจพเจ— เจฐเจพเจนเฉ€เจ‚) เจฒเจˆ เจ‡เฉฑเจ• เจตเจฟเจ…เจ•เจคเฉ€เจ—เจค เจฎเฉเฉฑเจฒ เจจเจฟเจฐเจงเจพเจฐเจค เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเจพ เจนเฉˆเฅค เจตเจฐเจ•เจฐ เจตเจฟเฉฑเจš เจฒเจพเจ‚เจš เจ•เฉ€เจคเฉ‡ เจเจœเฉฐเจŸ เจจเฉ‚เฉฐ เจธเจพเจฐเฉ‡ 8 เจญเจพเจ— เจฆเจฟเฉฑเจคเฉ‡ เจ—เจ เจนเจจ, เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจธเจฟเจฐเจซ เจ‡เฉฑเจ• เจนเฉˆ, เจชเจฐ เจ‡เจธ เจจเฉ‚เฉฐ เจ•เจฒเฉฑเจธเจŸเจฐเจฟเฉฐเจ— เจฌเจพเจฐเฉ‡ เจญเจพเจ— เจตเจฟเฉฑเจš เจตเจงเฉ‡เจฐเฉ‡ เจตเจฟเจธเจฅเจพเจฐ เจตเจฟเฉฑเจš เจตเจฟเจšเจพเจฐเจฟเจ† เจœเจพเจตเฉ‡เจ—เจพเฅค

เจ–เฉˆเจฐ, เจนเฉเจฃ เจ…เจธเฉ€เจ‚ เจ•เจฟเจธเฉ‡ เจนเฉ‹เจฐ เจŸเจฐเจฎเฉ€เจจเจฒ เจตเจฟเฉฐเจกเฉ‹ 'เจคเฉ‡ เจœเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจธเจพเจกเฉ‡ เจตเจฟเจธเจผเฉ‡ 'เจคเฉ‡ เจ‡เฉฑเจ• เจ–เจพเจฒเฉ€ เจธเฉเจจเฉ‡เจนเจพ เจญเฉ‡เจœ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ เจชเฉ€.เจเจธ @ เจ…เจธเฉ€เจ‚ เจฆเจฟเจ–เจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚ เจ•เจฟ เจ…เจธเฉ€เจ‚ โ€œcollect_securitiesโ€ เจจเจพเจฎเจ• เจตเจฟเจธเจผเฉ‡ เจจเฉ‚เฉฐ เจธเฉเจจเฉ‡เจนเจพ เจญเฉ‡เจœ เจฐเจนเฉ‡ เจนเจพเจ‚เฅค

เจ‡เจธ เจ•เฉ‡เจธ เจตเจฟเฉฑเจš, เจฎเฉˆเจธเฉ‡เจœ เจชเจพเจฐเจŸเฉ€เจธเจผเจจ 6 เจตเจฟเฉฑเจš เจ—เจฟเจ† - เจคเฉเจธเฉ€เจ‚ kafdrop on เจคเฉ‡ เจœเจพ เจ•เฉ‡ เจ‡เจธเจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ localhost:9000

เจ†เจชเจฃเฉ‡ เจตเจฐเจ•เจฐ เจฆเฉ‡ เจจเจพเจฒ เจŸเจฐเจฎเฉ€เจจเจฒ เจตเจฟเฉฐเจกเฉ‹ 'เจคเฉ‡ เจœเจพ เจ•เฉ‡, เจ…เจธเฉ€เจ‚ เจฒเฉ‹เจ—เฉเจฐเฉ‚ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจญเฉ‡เจœเฉ‡ เจ—เจ เจ‡เฉฑเจ• เจ–เฉเจธเจผเฉ€ เจฆเจพ เจธเฉฐเจฆเฉ‡เจธเจผ เจฆเฉ‡เจ–เจพเจ‚เจ—เฉ‡:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

เจ…เจธเฉ€เจ‚ เจฎเฉ‹เจ‚เจ—เฉ‹ (Robo3T เจœเจพเจ‚ Studio3T เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ) เจตเจฟเฉฑเจš เจตเฉ€ เจฆเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจฆเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ•เจฟ เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจกเฉ‡เจŸเจพเจฌเฉ‡เจธ เจตเจฟเฉฑเจš เจนเจจ:

เจฎเฉˆเจ‚ เจ…เจฐเจฌเจชเจคเฉ€ เจจเจนเฉ€เจ‚ เจนเจพเจ‚, เจ…เจคเฉ‡ เจ‡เจธเจฒเจˆ เจ…เจธเฉ€เจ‚ เจชเจนเจฟเจฒเฉ‡ เจฆเฉ‡เจ–เจฃ เจฆเฉ‡ เจตเจฟเจ•เจฒเจช เจจเจพเจฒ เจธเฉฐเจคเฉเจธเจผเจŸ เจนเจพเจ‚เฅค

เจซเฉŒเจธเจŸ, เจญเจพเจ— II 'เจคเฉ‡ เจชเจฟเจ›เฉ‹เจ•เฉœ เจ•เจพเจฐเจœ: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚เจซเฉŒเจธเจŸ, เจญเจพเจ— II 'เจคเฉ‡ เจชเจฟเจ›เฉ‹เจ•เฉœ เจ•เจพเจฐเจœ: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚

เจ–เฉเจธเจผเฉ€ เจ…เจคเฉ‡ เจ–เฉเจธเจผเฉ€ - เจชเจนเจฟเจฒเจพ เจเจœเฉฐเจŸ เจคเจฟเจ†เจฐ เจนเฉˆ :)

เจเจœเฉฐเจŸ เจคเจฟเจ†เจฐ, เจจเจตเจพเจ‚ เจเจœเฉฐเจŸ เจœเจฟเฉฐเจฆเจพ เจฐเจนเฉ‡!

เจนเจพเจ‚, เจธเฉฑเจœเจฃเฉ‹, เจ…เจธเฉ€เจ‚ เจ‡เจธ เจฒเฉ‡เจ– เจฆเฉเจ†เจฐเจพ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเฉ‡ เจฎเจพเจฐเจ— เจฆเจพ เจธเจฟเจฐเจซ 1/3 เจนเจฟเฉฑเจธเจพ เจ•เจตเจฐ เจ•เฉ€เจคเจพ เจนเฉˆ, เจชเจฐ เจจเจฟเจฐเจพเจธเจผ เจจเจพ เจนเฉ‹เจตเฉ‹, เจ•เจฟเจ‰เจ‚เจ•เจฟ เจนเฉเจฃ เจ‡เจน เจธเฉŒเจ–เจพ เจนเฉ‹ เจœเจพเจตเฉ‡เจ—เจพเฅค

เจ‡เจธ เจฒเจˆ เจนเฉเจฃ เจธเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจเจœเฉฐเจŸ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆ เจœเฉ‹ เจฎเฉˆเจŸเจพ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจ‡เจ•เฉฑเจ เฉ€ เจ•เจฐเจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจธเฉฐเจ—เฉเจฐเจนเจฟ เจฆเจธเจคเจพเจตเฉ‡เจœเจผ เจตเจฟเฉฑเจš เจฐเฉฑเจ–เจฆเจพ เจนเฉˆ:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจเจœเฉฐเจŸ เจ‡เฉฑเจ• เจ–เจพเจธ เจธเฉเจฐเฉฑเจ–เจฟเจ† เจฌเจพเจฐเฉ‡ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจฆเฉ€ เจชเฉเจฐเจ•เจฟเจฐเจฟเจ† เจ•เจฐเฉ‡เจ—เจพ, เจธเจพเจจเฉ‚เฉฐ เจธเฉฐเจฆเฉ‡เจธเจผ เจตเจฟเฉฑเจš เจ‡เจธ เจธเฉเจฐเฉฑเจ–เจฟเจ† เจฆเฉ‡ เจŸเจฟเจ•เจฐ (เจชเฉเจฐเจคเฉ€เจ•) เจจเฉ‚เฉฐ เจฆเจฐเจธเจพเจ‰เจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆเฅค faust เจตเจฟเฉฑเจš เจ‡เจธ เจฎเจ•เจธเจฆ เจฒเจˆ เจนเจจ เจฐเจฟเจ•เจพเจฐเจก - เจ‰เจน เจ•เจฒเจพเจธเจพเจ‚ เจœเฉ‹ เจเจœเฉฐเจŸ เจตเจฟเจธเจผเฉ‡ เจตเจฟเฉฑเจš เจธเฉฐเจฆเฉ‡เจธเจผ เจธเจ•เฉ€เจฎ เจฆเจพ เจเจฒเจพเจจ เจ•เจฐเจฆเฉ€เจ†เจ‚ เจนเจจเฅค

เจ‡เจธ เจฎเจพเจฎเจฒเฉ‡ เจตเจฟเฉฑเจš, เจ†เจ“ เจœเจพเจฃเฉ€เจ records.py เจ…เจคเฉ‡ เจตเจฐเจฃเจจ เจ•เจฐเฉ‹ เจ•เจฟ เจ‡เจธ เจตเจฟเจธเจผเฉ‡ เจฒเจˆ เจธเฉเจจเฉ‡เจนเจพ เจ•เจฟเจนเฉ‹ เจœเจฟเจนเจพ เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡เจฃเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆ:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจคเฉเจธเฉ€เจ‚ เจ…เจจเฉเจฎเจพเจจ เจฒเจ—เจพเจ‡เจ† เจนเฉ‹เจตเฉ‡เจ—เจพ, เจซเฉŒเจธเจŸ เจฎเฉˆเจธเฉ‡เจœ เจธเจ•เฉ€เจฎเจพ เจฆเจพ เจตเจฐเจฃเจจ เจ•เจฐเจจ เจฒเจˆ เจชเจพเจˆเจฅเจจ เจŸเจพเจˆเจช เจเจจเฉ‹เจŸเฉ‡เจธเจผเจจ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเจพ เจนเฉˆ, เจ‡เจธ เจฒเจˆ เจฒเจพเจ‡เจฌเฉเจฐเฉ‡เจฐเฉ€ เจฆเฉเจ†เจฐเจพ เจธเจฎเจฐเจฅเจฟเจค เจ˜เฉฑเจŸเฉ‹-เจ˜เฉฑเจŸ เจธเฉฐเจธเจ•เจฐเจฃ เจนเฉˆ 3.6.

เจ†เจ‰ เจเจœเฉฐเจŸ เจคเฉ‡ เจตเจพเจชเจธ เจ†เจ“, เจ•เจฟเจธเจฎเจพเจ‚ เจจเฉ‚เฉฐ เจธเฉˆเฉฑเจŸ เจ•เจฐเฉ€เจ เจ…เจคเฉ‡ เจ‡เจธเจจเฉ‚เฉฐ เจœเฉ‹เฉœเฉ€เจ:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจคเฉเจธเฉ€เจ‚ เจตเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเฉ‹, เจ…เจธเฉ€เจ‚ เจตเจฟเจธเจผเฉ‡ เจฆเฉ€ เจธเจผเฉเจฐเฉ‚เจ†เจคเฉ€ เจตเจฟเจงเฉ€ - เจฎเฉเฉฑเจฒ_เจŸเจพเจˆเจช เจฒเจˆ เจ‡เฉฑเจ• เจธเจ•เฉ€เจฎ เจฆเฉ‡ เจจเจพเจฒ เจ‡เฉฑเจ• เจจเจตเจพเจ‚ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐ เจชเจพเจธ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เจธ เจคเฉ‹เจ‚ เจ‡เจฒเจพเจตเจพ, เจธเจญ เจ•เฉเจ เจ‰เจธเฉ‡ เจธเจ•เฉ€เจฎ เจฆเฉ€ เจชเจพเจฒเจฃเจพ เจ•เจฐเจฆเจพ เจนเฉˆ, เจ‡เจธเจฒเจˆ เจฎเฉˆเจจเฉ‚เฉฐ เจ•เจฟเจธเฉ‡ เจนเฉ‹เจฐ เจšเฉ€เจœเจผ 'เจคเฉ‡ เจงเจฟเจ†เจจ เจฆเฉ‡เจฃ เจฆเจพ เจ•เฉ‹เจˆ เจฌเจฟเฉฐเจฆเฉ‚ เจจเจœเจผเจฐ เจจเจนเฉ€เจ‚ เจ†เจ‰เจ‚เจฆเจพเฅค

เจ–เฉˆเจฐ, เจ…เฉฐเจคเจฟเจฎ เจ›เฉ‹เจน เจฎเฉˆเจŸเจพ เจœเจพเจฃเจ•เจพเจฐเฉ€ เจ‡เจ•เฉฑเจคเจฐ เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจเจœเฉฐเจŸ เจจเฉ‚เฉฐ เจ‡เจ•เฉฑเจ เจพ_เจธเฉเจฐเฉฑเจ–เจฟเจ† เจ•เจฐเจจ เจฒเจˆ เจ‡เฉฑเจ• เจ•เจพเจฒ เจœเฉ‹เฉœเจจเจพ เจนเฉˆ:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

เจ…เจธเฉ€เจ‚ เจธเฉเจจเฉ‡เจนเฉ‡ เจฒเจˆ เจชเจนเจฟเจฒเจพเจ‚ เจเจฒเจพเจจเฉ€ เจธเจ•เฉ€เจฎ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚เฅค เจ‡เจธ เจ•เฉ‡เจธ เจตเจฟเฉฑเจš, เจฎเฉˆเจ‚ .cast เจตเจฟเจงเฉ€ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เฉ€เจคเฉ€ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจธเจพเจจเฉ‚เฉฐ เจเจœเฉฐเจŸ เจคเฉ‹เจ‚ เจจเจคเฉ€เจœเฉ‡ เจฆเฉ€ เจ‰เจกเฉ€เจ• เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจจเจนเฉ€เจ‚ เจนเฉˆ, เจชเจฐ เจ‡เจน เจตเจฐเจฃเจจ เจฏเฉ‹เจ— เจนเฉˆ เจ•เจฟ เจคเจฐเฉ€เจ•เฉ‡ เจตเจฟเจธเจผเฉ‡ เจจเฉ‚เฉฐ เจธเฉเจจเฉ‡เจนเจพ เจญเฉ‡เจœเฉ‹:

  1. เจ•เจพเจธเจŸ - เจฌเจฒเฉŒเจ• เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเจพ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจจเจคเฉ€เจœเฉ‡ เจฆเฉ€ เจ‰เจฎเฉ€เจฆ เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเจพเฅค เจคเฉเจธเฉ€เจ‚ เจจเจคเฉ€เจœเฉ‡ เจจเฉ‚เฉฐ เจ•เจฟเจธเฉ‡ เจนเฉ‹เจฐ เจตเจฟเจธเจผเฉ‡ 'เจคเฉ‡ เจธเฉฐเจฆเฉ‡เจธเจผ เจตเจœเฉ‹เจ‚ เจจเจนเฉ€เจ‚ เจญเฉ‡เจœ เจธเจ•เจฆเฉ‡ เจนเฉ‹เฅค

  2. เจญเฉ‡เจœเฉ‹ - เจฌเจฒเฉŒเจ• เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเจพ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจจเจคเฉ€เจœเฉ‡ เจฆเฉ€ เจ‰เจฎเฉ€เจฆ เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเจพ. เจคเฉเจธเฉ€เจ‚ เจ‰เจธ เจตเจฟเจธเจผเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจเจœเฉฐเจŸ เจจเจฟเจธเจผเจšเจฟเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจœเจฟเจธ เจตเจฟเฉฑเจš เจจเจคเฉ€เจœเจพ เจœเจพเจตเฉ‡เจ—เจพเฅค

  3. เจชเฉเฉฑเจ›เฉ‹ - เจจเจคเฉ€เจœเฉ‡ เจฆเฉ€ เจ‰เจกเฉ€เจ• เจ•เจฐ เจฐเจฟเจนเจพ เจนเฉˆ. เจคเฉเจธเฉ€เจ‚ เจ‰เจธ เจตเจฟเจธเจผเฉ‡ เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจเจœเฉฐเจŸ เจจเจฟเจธเจผเจšเจฟเจค เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจœเจฟเจธ เจตเจฟเฉฑเจš เจจเจคเฉ€เจœเจพ เจœเจพเจตเฉ‡เจ—เจพเฅค

เจ‡เจธ เจฒเจˆ, เจ‡เจน เจธเจญ เจ…เฉฑเจœ เจฒเจˆ เจเจœเฉฐเจŸเจพเจ‚ เจจเจพเจฒ เจนเฉˆ!

เจธเฉเจชเจจเฉ‡ เจฆเฉ€ เจŸเฉ€เจฎ

เจ†เจ–เจฐเฉ€ เจšเฉ€เจœเจผ เจœเฉ‹ เจฎเฉˆเจ‚ เจ‡เจธ เจนเจฟเฉฑเจธเฉ‡ เจตเจฟเฉฑเจš เจฒเจฟเจ–เจฃ เจฆเจพ เจตเจพเจ…เจฆเจพ เจ•เฉ€เจคเจพ เจธเฉ€ เจ‰เจน เจนเฉˆ เจ•เจฎเจพเจ‚เจกเจพเจ‚. เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจชเจนเจฟเจฒเจพเจ‚ เจฆเฉฑเจธเจฟเจ† เจ—เจฟเจ† เจนเฉˆ, เจซเฉŒเจธเจŸ เจตเจฟเฉฑเจš เจ•เจฎเจพเจ‚เจกเจพเจ‚ เจ•เจฒเจฟเฉฑเจ• เจฆเฉ‡ เจ†เจฒเฉ‡-เจฆเฉเจ†เจฒเฉ‡ เจ‡เฉฑเจ• เจฐเฉˆเจชเจฐ เจนเจจเฅค เจตเจพเจธเจคเจต เจตเจฟเฉฑเจš, เจซเฉŒเจธเจŸ เจธเจพเจกเฉ€ เจ•เจธเจŸเจฎ เจ•เจฎเจพเจ‚เจก เจจเฉ‚เฉฐ เจ‡เจธเจฆเฉ‡ เจ‡เฉฐเจŸเจฐเจซเฉ‡เจธ เจจเจพเจฒ เจœเฉ‹เฉœเจฆเจพ เจนเฉˆ เจœเจฆเฉ‹เจ‚ -A เจ•เฉเฉฐเจœเฉ€ เจจเฉ‚เฉฐ เจจเจฟเจฐเจงเจพเจฐเจค เจ•เฉ€เจคเจพ เจœเจพเจ‚เจฆเจพ เจนเฉˆ

เจตเจฟเจš เจเจฒเจพเจจเฉ‡ เจเจœเฉฐเจŸเจพเจ‚ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ agents.py เจ‡เฉฑเจ• เจธเจœเจพเจตเจŸ เจจเจพเจฒ เจ‡เฉฑเจ• เจซเฉฐเจ•เจธเจผเจจ เจธเจผเจพเจฎเจฒ เจ•เจฐเฉ‹ app.commandเจขเฉฐเจ— เจจเฉ‚เฉฐ เจ•เจพเจฒ เจ•เจฐ เจฐเจฟเจนเจพ เจนเฉˆ เจธเฉเฉฑเจŸ เจฆเจฟเจ“ ัƒ เจ‡เจ•เฉฑเจ เจพ_เจธเฉเจฐเฉฑเจ–เจฟเจ†:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

เจ‡เจธ เจคเจฐเฉเจนเจพเจ‚, เจœเฉ‡เจ•เจฐ เจ…เจธเฉ€เจ‚ เจ•เจฎเจพเจ‚เจกเจพเจ‚ เจฆเฉ€ เจธเฉ‚เจšเฉ€ เจจเฉ‚เฉฐ เจ•เจพเจฒ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจธเจพเจกเฉ€ เจจเจตเฉ€เจ‚ เจ•เจฎเจพเจ‚เจก เจ‡เจธ เจตเจฟเฉฑเจš เจนเฉ‹เจตเฉ‡เจ—เฉ€:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

เจ…เจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจ•เจฟเจธเฉ‡ เจนเฉ‹เจฐ เจฆเฉ€ เจคเจฐเฉเจนเจพเจ‚ เจตเจฐเจค เจธเจ•เจฆเฉ‡ เจนเจพเจ‚, เจ‡เจธเจฒเจˆ เจ†เจ“ เจซเฉŒเจธเจŸ เจตเจฐเจ•เจฐ เจจเฉ‚เฉฐ เจฎเฉเฉœ เจšเจพเจฒเฉ‚ เจ•เจฐเฉ€เจ เจ…เจคเฉ‡ เจชเฉเจฐเจคเฉ€เจญเฉ‚เจคเฉ€เจ†เจ‚ เจฆเจพ เจ‡เฉฑเจ• เจชเฉ‚เจฐเจพ เจธเฉฐเจ—เฉเจฐเจนเจฟ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเฉ€เจ:

> faust -A horton.agents start-collect-securities

เจ…เฉฑเจ—เฉ‡ เจ•เฉ€ เจนเฉ‹เจตเฉ‡เจ—เจพ?

เจ…เจ—เจฒเฉ‡ เจนเจฟเฉฑเจธเฉ‡ เจตเจฟเฉฑเจš, เจ‡เฉฑเจ• เจ‰เจฆเจพเจนเจฐเจจ เจฆเฉ‡ เจคเฉŒเจฐ 'เจคเฉ‡ เจฌเจพเจ•เฉ€ เจฌเจšเฉ‡ เจเจœเฉฐเจŸเจพเจ‚ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ, เจ…เจธเฉ€เจ‚ เจธเจพเจฒ เจฒเจˆ เจตเจชเจพเจฐ เจฆเฉ€เจ†เจ‚ เจธเจฎเจพเจชเจคเฉ€ เจ•เฉ€เจฎเจคเจพเจ‚ เจ…เจคเฉ‡ เจเจœเฉฐเจŸเจพเจ‚ เจฆเฉ‡ เจ•เจฐเฉŒเจจ เจฒเจพเจ‚เจš เจตเจฟเฉฑเจš เจ…เจคเจฟเจ…เฉฐเจคเจคเจพ เจฆเฉ€ เจ–เฉ‹เจœ เจฒเจˆ เจธเจฟเฉฐเจ• เจตเจฟเจงเฉ€ เจจเฉ‚เฉฐ เจฆเฉ‡เจ–เจพเจ‚เจ—เฉ‡เฅค

เจ‡เจน เจธเจญ เจ…เฉฑเจœ เจฒเจˆ เจนเฉˆ! เจชเฉœเฉเจนเจจ เจฒเจˆ เจงเฉฐเจจเจตเจพเจฆ :)

เจ‡เจธ เจนเจฟเฉฑเจธเฉ‡ เจฒเจˆ เจ•เฉ‹เจก

เจซเฉŒเจธเจŸ, เจญเจพเจ— II 'เจคเฉ‡ เจชเจฟเจ›เฉ‹เจ•เฉœ เจ•เจพเจฐเจœ: เจเจœเฉฐเจŸ เจ…เจคเฉ‡ เจŸเฉ€เจฎเจพเจ‚

PS เจชเจฟเจ›เจฒเฉ‡ เจญเจพเจ— เจฆเฉ‡ เจคเจนเจฟเจค เจฎเฉˆเจจเฉ‚เฉฐ เจซเฉŒเจธเจŸ เจ…เจคเฉ‡ เจ•เฉฐเจซเจฒเฉ‚เจเจ‚เจŸ เจ•เจพเจซเจ•เจพ เจฌเจพเจฐเฉ‡ เจชเฉเฉฑเจ›เจฟเจ† เจ—เจฟเจ† เจธเฉ€ (เจธเฉฐเจ—เจฎ เจตเจฟเฉฑเจš เจ•เจฟเจนเฉœเฉ€เจ†เจ‚ เจตเจฟเจธเจผเฉ‡เจธเจผเจคเจพเจตเจพเจ‚ เจนเจจ?). เจ…เจœเจฟเจนเจพ เจฒเจ—เจฆเจพ เจนเฉˆ เจ•เจฟ เจ•เฉฐเจซเจฒเฉ‚เจเจ‚เจŸ เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ เจคเจฐเฉ€เจ•เจฟเจ†เจ‚ เจจเจพเจฒ เจตเจงเฉ‡เจฐเฉ‡ เจ•เจพเจฐเจœเจธเจผเฉ€เจฒ เจนเฉˆ, เจชเจฐ เจคเฉฑเจฅ เจ‡เจน เจนเฉˆ เจ•เจฟ เจซเฉŒเจธเจŸ เจ•เฉ‹เจฒ เจ•เฉฐเจซเจฒเฉ‚เจเจ‚เจŸ เจฒเจˆ เจชเฉ‚เจฐเจพ เจ—เจพเจนเจ• เจธเจฎเจฐเจฅเจจ เจจเจนเฉ€เจ‚ เจนเฉˆ - เจ‡เจน เจ‡เจธ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ เจนเฉˆ เจฆเจธเจคเจพเจตเฉ‡เจœเจผ เจตเจฟเฉฑเจš เจ•เจฒเจพเจ‡เฉฐเจŸ เจชเจพเจฌเฉฐเจฆเฉ€เจ†เจ‚ เจฆเจพ เจตเฉ‡เจฐเจตเจพ.

เจธเจฐเฉ‹เจค: www.habr.com

เจ‡เฉฑเจ• เจŸเจฟเฉฑเจชเจฃเฉ€ เจœเฉ‹เฉœเฉ‹