เบงเบฝเบเบ‡เบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบเปˆเบฝเบงเบเบฑเบš Faust, เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™ เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™

เบงเบฝเบเบ‡เบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบเปˆเบฝเบงเบเบฑเบš Faust, เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™ เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™

เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เป€เบ™เบทเป‰เบญเบซเบฒ

  1. เบžเบฒเบเบ—เบต I: เปเบ™เบฐเบ™เปเบฒ

  2. เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™

เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบซเบเบฑเบ‡เบขเบนเปˆเบ™เบตเป‰?

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบชเปˆเบงเบ™เบ—เบตเบชเบญเบ‡. เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบ‚เบฝเบ™เป„เบงเป‰เบเปˆเบญเบ™เบซเบ™เป‰เบฒเบ™เบตเป‰, เปƒเบ™เบกเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบฎเบฑเบ”เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰:

  1. เปƒเบซเป‰เบ‚เบฝเบ™เบฅเบนเบเบ„เป‰เบฒเบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบเบชเปเบฒเบฅเบฑเบš alphavantage เปƒเบ™ aiohttp เบ”เป‰เบงเบเบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบชเปเบฒเบฅเบฑเบšเบˆเบธเบ”เบชเบดเป‰เบ™เบชเบธเบ”เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™.

  2. เปƒเบซเป‰เบชเป‰เบฒเบ‡เบ•เบปเบงเปเบ—เบ™เบœเบนเป‰เบ—เบตเปˆเบˆเบฐเป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™เบซเบผเบฑเบเบŠเบฑเบšเปเบฅเบฐเบ‚เปเป‰เบกเบนเบ™ meta เบเปˆเบฝเบงเบเบฑเบšเบžเบงเบเป€เบ‚เบปเบฒ.

เปเบ•เปˆ, เบ™เบตเป‰เปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบฎเบฑเบ”เบชเปเบฒเบฅเบฑเบšเป‚เบ„เบ‡เบเบฒเบ™เบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡, เปเบฅเบฐเปƒเบ™เปเบ‡เปˆเบ‚เบญเบ‡เบเบฒเบ™เบ„เบปเป‰เบ™เบ„เบงเป‰เบฒ faust, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบฎเบฝเบ™เบฎเบนเป‰เบงเบดเบ—เบตเบเบฒเบ™เบ‚เบฝเบ™เบ•เบปเบงเปเบ—เบ™เบ—เบตเปˆเบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เป€เบซเบ”เบเบฒเบ™เบˆเบฒเบ kafka, เป€เบŠเบฑเปˆเบ™เบ”เบฝเบงเบเบฑเบ™เบเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เบ‚เบฝเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡ (click wrapper), เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ - เบชเปเบฒเบฅเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบกเบŠเบธเบเบเบนเป‰เบ„เบนเปˆเบกเบทเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบ•เบปเบงเปเบ—เบ™เบเปเบฒเบฅเบฑเบ‡เบ•เบดเบ”เบ•เบฒเบก.

เบเบฒเบ™โ€‹เบเบถเบโ€‹เบญเบปเบšโ€‹เบฎเบปเบก

เบฅเบนเบเบ„เป‰เบฒ AlphaVantage

เบ—เปเบฒเบญเบดเบ”, เปƒเบซเป‰เบ‚เบฝเบ™เบฅเบนเบเบ„เป‰เบฒ aiohttp เบ‚เบฐเบซเบ™เบฒเบ”เบ™เป‰เบญเบเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเปƒเบซเป‰ alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™เบˆเบฐเปเบˆเป‰เบ‡เบˆเบฒเบเบกเบฑเบ™:

  1. AlphaVantage API เปเบกเปˆเบ™เบ‚เป‰เบญเบ™เบ‚เป‰เบฒเบ‡เบ‡เปˆเบฒเบเบ”เบฒเบเปเบฅเบฐเบญเบญเบเปเบšเบšเบ—เบตเปˆเบชเบงเบเบ‡เบฒเบก, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบ‚เป‰เบญเบเบˆเบถเปˆเบ‡เบ•เบฑเบ”เบชเบดเบ™เปƒเบˆเป€เบฎเบฑเบ”เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบ—เบฑเบ‡เบซเบกเบปเบ”เป‚เบ”เบเบœเปˆเบฒเบ™เบงเบดเบ—เบตเบเบฒเบ™ construct_query เบšเปˆเบญเบ™เบ—เบตเปˆเบกเบตเบเบฒเบ™เป‚เบ— http.

  2. เบ‚เป‰เบญเบเป€เบญเบปเบฒเบ—เบปเปˆเบ‡เบ™เบฒเบ—เบฑเบ‡เปเบปเบ”เบกเบฒเปƒเบซเป‰ snake_case เป€เบžเบทเปˆเบญเบ„เบงเบฒเบกเบชเบฐเบ”เบงเบ.

  3. เบ”เบต, เบเบฒเบ™เบ•เบปเบšเปเบ•เปˆเบ‡ logger.catch เบชเปเบฒเบฅเบฑเบšเบœเบปเบ™เบœเบฐเบฅเบดเบ” traceback เบ—เบตเปˆเบชเบงเบเบ‡เบฒเบกเปเบฅเบฐเปƒเบซเป‰เบ‚เปเป‰เบกเบนเบ™.

PS เบขเปˆเบฒเบฅเบทเบกเป€เบžเบตเปˆเบก token alphavantage เบขเบนเปˆเปƒเบ™เบ—เป‰เบญเบ‡เบ–เบดเปˆเบ™เป€เบžเบทเปˆเบญ config.yml, เบซเบผเบทเบชเบปเปˆเบ‡เบญเบญเบเบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก HORTON_SERVICE_APIKEY. เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบš token เบ—เบตเปˆเบ™เบตเป‰.

เบซเป‰เบญเบ‡เบฎเบฝเบ™ CRUD

เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบกเบตเบเบฒเบ™เป€เบเบฑเบšเบเปเบฒเบซเบผเบฑเบเบŠเบฑเบšเป€เบžเบทเปˆเบญเป€เบเบฑเบšเบ‚เปเป‰เบกเบนเบ™ meta เบเปˆเบฝเบงเบเบฑเบšเบซเบผเบฑเบเบŠเบฑเบš.

เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™/security.py

เปƒเบ™เบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™เบ‚เบญเบ‡เบ‚เป‰เบญเบ, เบšเปเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบญเบฐเบ—เบดเบšเบฒเบเบซเบเบฑเบ‡เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰, เปเบฅเบฐเบŠเบฑเป‰เบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบ‚เบญเบ‡เบกเบฑเบ™เป€เบญเบ‡เปเบกเปˆเบ™เบ‚เป‰เบญเบ™เบ‚เป‰เบฒเบ‡เบ‡เปˆเบฒเบเบ”เบฒเบ.

get_app()

เปƒเบซเป‰เป€เบžเบตเปˆเบกเบŸเบฑเบ‡เบŠเบฑเบ™เบชเป‰เบฒเบ‡เบงเบฑเบ”เบ–เบธเปเบญเบฑเบšเบžเบฅเบดเป€เบ„เบŠเบฑเบ™เปƒเบ™ app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

เบชเปเบฒเบฅเบฑเบšเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบกเบตเบเบฒเบ™เบชเป‰เบฒเบ‡เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบเบ—เบตเปˆเบ‡เปˆเบฒเบเบ”เบฒเบเบ—เบตเปˆเบชเบธเบ”, เป€เบฅเบฑเบเบ™เป‰เบญเบเบ•เปเปˆเบกเบฒเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบ‚เบฐเบซเบเบฒเบเบกเบฑเบ™, เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เป€เบžเบทเปˆเบญเบšเปเปˆเปƒเบซเป‰เบ—เปˆเบฒเบ™เบฅเปเบ–เป‰เบฒ, เบ—เบตเปˆเบ™เบตเป‰. เบญเป‰เบฒเบ‡เบญเบตเบ‡ เบเบฑเบš App-class. เบ‚เป‰เบฒโ€‹เบžเบฐโ€‹เป€เบˆเบปเป‰เบฒโ€‹เบเบฑเบ‡โ€‹เปเบ™เบฐโ€‹เบ™เปเบฒโ€‹เปƒเบซเป‰โ€‹เบ—เปˆเบฒเบ™โ€‹เป€เบšเบดเปˆเบ‡โ€‹เปƒเบ™โ€‹เบซเป‰เบญเบ‡โ€‹เบเบฒเบ™โ€‹เบ›เบฑเบšโ€‹เบ„เปˆเบฒโ€‹, เป€เบ™เบทเปˆเบญเบ‡โ€‹เบˆเบฒเบโ€‹เบงเปˆเบฒโ€‹เบกเบฑเบ™โ€‹เบกเบตโ€‹เบ„เบงเบฒเบกโ€‹เบฎเบฑเบšโ€‹เบœเบดเบ”โ€‹เบŠเบญเบšโ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เบ„เปˆเบฒโ€‹เบซเบผเบฒเบโ€‹เบ—เบตเปˆโ€‹เบชเบธเบ”โ€‹.

เบชเปˆเบงเบ™เปƒเบซเบเปˆเปเบกเปˆเบ™

เบ•เบปเบงเปเบ—เบ™เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเปเบฅเบฐเบฎเบฑเบเบชเบฒเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบซเบผเบฑเบเบŠเบฑเบš

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบ—เปเบฒเบญเบดเบ”เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบงเบฑเบ”เบ–เบธเบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบ faust - เบกเบฑเบ™เบ‡เปˆเบฒเบเบ”เบฒเบเบ”เบต. เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเบ›เบฐเบเบฒเบ”เบซเบปเบงเบ‚เปเป‰เบชเปเบฒเบฅเบฑเบšเบ•เบปเบงเปเบ—เบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบขเปˆเบฒเบ‡เบˆเบฐเปเบˆเป‰เบ‡ ... เปƒเบ™เบ—เบตเปˆเบ™เบตเป‰เบกเบฑเบ™เบ„เบงเบ™เบˆเบฐเป€เบ›เบฑเบ™เบเบฒเบ™เบเปˆเบฒเบงเป€เบ–เบดเบ‡เบงเปˆเบฒเบกเบฑเบ™เปเบกเปˆเบ™เบซเบเบฑเบ‡, เบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบžเบฒเบเปƒเบ™เปเบกเปˆเบ™เบซเบเบฑเบ‡เปเบฅเบฐเบงเบดเบ—เบตเบเบฒเบ™เบ™เบตเป‰เบชเบฒเบกเบฒเบ”เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบˆเบฑเบ”เบฅเบฝเบ‡เปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™.

  1. เบซเบปเบงเบ‚เปเป‰เปƒเบ™ kafka, เบ–เป‰เบฒเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™เบฎเบนเป‰เบ„เปเบฒเบ™เบดเบเบฒเบกเบ—เบตเปˆเปเบ™เปˆเบ™เบญเบ™, เบกเบฑเบ™เบ”เบตเบเบงเปˆเบฒเบ—เบตเปˆเบˆเบฐเบญเปˆเบฒเบ™ เบ›เบดเบ”. เป€เบญเบเบฐเบชเบฒเบ™, เบซเบผเบทเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบญเปˆเบฒเบ™ เบšเบปเบ”เบชเบฐเบซเบผเบธเบš on Habre เปƒเบ™โ€‹เบžเบฒโ€‹เบชเบฒโ€‹เบฅเบฑเบ”โ€‹เป€เบŠเบโ€‹, เบšเปˆเบญเบ™โ€‹เบ—เบตเปˆโ€‹เบ—เบธเบโ€‹เบชเบดเปˆเบ‡โ€‹เบ—เบธเบโ€‹เบขเปˆเบฒเบ‡โ€‹เปเบกเปˆเบ™โ€‹เบชเบฐโ€‹เบ—เป‰เบญเบ™โ€‹เปƒเบซเป‰โ€‹เป€เบซเบฑเบ™โ€‹เบขเปˆเบฒเบ‡โ€‹เบ–เบทเบโ€‹เบ•เป‰เบญเบ‡ :)

  2. เบžเบฒเบฃเบฒเบกเบดเป€เบ•เบตเบžเบฒเบเปƒเบ™, เบญเบฐเบ—เบดเบšเบฒเบเป„เบ”เป‰เบ”เบตเปƒเบ™ faust doc, เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบ™เบปเบ”เบซเบปเบงเบ‚เปเป‰เป‚เบ”เบเบเบปเบ‡เปƒเบ™เบฅเบฐเบซเบฑเบ”, เปเบ™เปˆเบ™เบญเบ™, เบ™เบตเป‰เบซเบกเบฒเบเบ„เบงเบฒเบกเบงเปˆเบฒเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบชเบฐเบซเบ™เบญเบ‡เปƒเบซเป‰เป‚เบ”เบเบ™เบฑเบเบžเบฑเบ”เบ—เบฐเบ™เบฒ faust, เบ•เบปเบงเบขเปˆเบฒเบ‡: เบเบฒเบ™เบฎเบฑเบเบชเบฒเป„เบงเป‰, เบ™เบฐเป‚เบเบšเบฒเบเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒ (เป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบฅเบถเบš, เปเบ•เปˆเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบเปเบฒเบ™เบปเบ”. เบซเบ™เบฒเปเบซเบ™เป‰เบ™), เบˆเปเบฒโ€‹เบ™เบงเบ™โ€‹เบ‚เบญเบ‡โ€‹เบเบฒเบ™โ€‹เปเบšเปˆเบ‡โ€‹เบ›เบฑเบ™โ€‹เบ•เปเปˆโ€‹เบซเบปเบงโ€‹เบ‚เปเป‰ (เบ„เบฐเปเบ™เบ™เป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”, เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบเบปเบเบ•เบปเบงเบขเปˆเบฒเบ‡, เบซเบ™เป‰เบญเบเบเบงเปˆเบฒ เบ„เบงเบฒเบกเบชเปเบฒเบ„เบฑเบ™เบ—เบปเปˆเบงเป‚เบฅเบ faust เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบ).

  3. เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›, เบ•เบปเบงเปเบ—เบ™เบชเบฒเบกเบฒเบ”เบชเป‰เบฒเบ‡เบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบกเบตเบเบฒเบ™เบ„เบธเป‰เบกเบ„เบญเบ‡เบ—เบตเปˆเบกเบตเบ„เปˆเบฒเบ—เบปเปˆเบงเป‚เบฅเบ, เปเบ™เบงเปƒเบ”เบเปเปˆเบ•เบฒเบก, เบ‚เป‰เบญเบเบขเบฒเบเบ›เบฐเบเบฒเบ”เบ—เบธเบเบขเปˆเบฒเบ‡เบขเปˆเบฒเบ‡เบˆเบฐเปเบˆเป‰เบ‡. เบ™เบญเบเบˆเบฒเบเบ™เบฑเป‰เบ™, เบšเบฒเบ‡เบžเบฒเบฅเบฒเบกเบดเป€เบ•เบต (เบ•เบปเบงเบขเปˆเบฒเบ‡, เบˆเปเบฒเบ™เบงเบ™เบ‚เบญเบ‡เบเบฒเบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เบซเบผเบทเบ™เบฐเป‚เบเบšเบฒเบเบเบฒเบ™เบฎเบฑเบเบชเบฒเป„เบงเป‰) เบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เปƒเบ™เบเบฒเบ™เป‚เบ„เบชเบฐเบ™เบฒเบ•เบปเบงเปเบ—เบ™เบšเปเปˆเบชเบฒเบกเบฒเบ”เบ–เบทเบเบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป„เบ”เป‰.

    เบ™เบตเป‰เปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบกเบฑเบ™เบญเบฒเบ”เบˆเบฐเป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เบเปเบฒเบ™เบปเบ”เบซเบปเบงเบ‚เปเป‰เบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

เปเบฅเป‰เบง, เบ•เบญเบ™เบ™เบตเป‰เปƒเบซเป‰เบญเบฐเบ—เบดเบšเบฒเบเบชเบดเปˆเบ‡เบ—เบตเปˆเบ•เบปเบงเปเบ—เบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบฎเบฑเบ” :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เปƒเบ™เบ•เบญเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบญเบ‡เบ•เบปเบงเปเบ—เบ™, เบžเบงเบเป€เบฎเบปเบฒเป€เบ›เบตเบ”เบเบญเบ‡เบ›เบฐเบŠเบธเบก aiohttp เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบœเปˆเบฒเบ™เบฅเบนเบเบ„เป‰เบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เป€เบกเบทเปˆเบญเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™, เป€เบกเบทเปˆเบญเบ•เบปเบงเปเบ—เบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ–เบทเบเป€เบ›เบตเบ”เบ•เบปเบง, เบเบญเบ‡เบ›เบฐเบŠเบธเบกเบˆเบฐเป€เบ›เบตเบ”เบ—เบฑเบ™เบ—เบต - เบซเบ™เบถเปˆเบ‡, เบชเปเบฒเบฅเบฑเบšเป€เบงเบฅเบฒเบ—เบตเปˆเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ—เบฑเบ‡เบซเบกเบปเบ” (เบซเบผเบทเบซเบผเบฒเบเป†เบ„เบปเบ™, เบ–เป‰เบฒเบ—เปˆเบฒเบ™เบ›เปˆเบฝเบ™เบžเบฒเบฅเบฒเบกเบดเป€เบ•เบต. เบžเป‰เบญเบกเบเบฑเบ™ เบˆเบฒเบเบ•เบปเบงเปเบ—เบ™เบ—เบตเปˆเบกเบตเบซเบ™เปˆเบงเบเบ‡เบฒเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™).

เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเบ›เบฐเบ•เบดเบšเบฑเบ”เบ•เบฒเบกเบเบฐเปเบช (เบžเบงเบเป€เบฎเบปเบฒเบงเบฒเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเปƒเบ™ _, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒ, เปƒเบ™เบ•เบปเบงเปเบ—เบ™เบ™เบตเป‰, เบšเปเปˆเบชเบปเบ™เปƒเบˆเป€เบ™เบทเป‰เบญเบซเบฒ) เบ‚เบญเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบˆเบฒเบเบซเบปเบงเบ‚เปเป‰เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบ–เป‰เบฒเบžเบงเบเป€เบ‚เบปเบฒเบกเบตเบขเบนเปˆเปƒเบ™เบเบฒเบ™เบŠเบปเบ”เป€เบŠเบตเบเปƒเบ™เบ›เบฐเบˆเบธเบšเบฑเบ™, เบ–เป‰เบฒเบšเปเปˆเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบงเบปเบ‡เบˆเบญเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบฅเปเบ–เป‰เบฒเบเบฒเบ™เบกเบฒเบฎเบญเบ”เบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒ. เบ”เบต, เบžเบฒเบเปƒเบ™ loop เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบžเบงเบเป€เบฎเบปเบฒเป€เบ‚เบปเป‰เบฒเบชเบนเปˆเบฅเบฐเบšเบปเบšเบเบฒเบ™เบฎเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบก, เป„เบ”เป‰เบฎเบฑเบšเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบเบฒเบ™เป€เบ„เบทเปˆเบญเบ™เป„เบซเบง (get_securities เบเบฑเบšเบ„เบทเบ™เบกเบฒเบžเบฝเบ‡เปเบ•เปˆเบเบฒเบ™เป€เบ„เบทเปˆเบญเบ™เป„เบซเบงเป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™, เป€เบšเบดเปˆเบ‡เบฅเบฐเบซเบฑเบ”เบฅเบนเบเบ„เป‰เบฒ) เบซเบผเบฑเบเบŠเบฑเบšเปเบฅเบฐเบšเบฑเบ™เบ—เบถเบเบกเบฑเบ™เป„เบงเป‰เปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™, เบเบงเบ”เป€เบšเบดเปˆเบ‡เบงเปˆเบฒเบกเบตเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบžเบ—เบตเปˆเบกเบต ticker เบ”เบฝเบงเบเบฑเบ™เปเบฅเบฐ. เบเบฒเบ™เปเบฅเบเบ›เปˆเบฝเบ™เปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™, เบ–เป‰เบฒเบซเบฒเบเบงเปˆเบฒเบกเบต, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบกเบฑเบ™ (เป€เบˆเป‰เบ) เบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบ›เบฑเบšเบ›เบธเบ‡เบžเบฝเบ‡เปเบ•เปˆ.

เป€เบ›เบตเบ”เบ•เบปเบงเบเบฒเบ™เบชเป‰เบฒเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ!

> docker-compose up -d
... ะ—ะฐะฟัƒัะบ ะบะพะฝั‚ะตะนะฝะตั€ะพะฒ ...
> faust -A horton.agents worker --without-web -l info

เบ„เบธเบ™เบ™เบฐเบชเบปเบกเบšเบฑเบ” PS เบญเบปเบ‡เบ›เบฐเบเบญเบšเป€เบงเบฑเบš เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเบšเปเปˆเบžเบดเบˆเบฒเบฅเบฐเบ™เบฒ faust เปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบก, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบ™เบปเบ”เบ—เบธเบ‡เบ—เบตเปˆเป€เบซเบกเบฒเบฐเบชเบปเบก.

เปƒเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡เป€เบ›เบตเบ”เบ•เบปเบงเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบšเบญเบ faust เบšเปˆเบญเบ™เบ—เบตเปˆเบˆเบฐเบŠเบญเบเบซเบฒเบˆเบธเบ”เบ›เบฐเบชเบปเบ‡เบ‚เบญเบ‡เปเบญเบฑเบšเบžเบฅเบดเป€เบ„เบŠเบฑเบ™เปเบฅเบฐเบชเบดเปˆเบ‡เบ—เบตเปˆเบ•เป‰เบญเบ‡เป€เบฎเบฑเบ”เบเบฑเบšเบกเบฑเบ™ (เป€เบ›เบตเบ”เบ•เบปเบงเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™) เบเบฑเบšเบฅเบฐเบ”เบฑเบšเบœเบปเบ™เบœเบฐเบฅเบดเบ”เบšเบฑเบ™เบ—เบถเบเบ‚เปเป‰เบกเบนเบ™. เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบœเบปเบ™เบœเบฐเบฅเบดเบ”เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰:

Spoiler

โ”Œฦ’aยตSโ€  v1.10.4โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ id          โ”‚ horton                                            โ”‚
โ”‚ transport   โ”‚ [URL('kafka://localhost:9092')]                   โ”‚
โ”‚ store       โ”‚ memory:                                           โ”‚
โ”‚ log         โ”‚ -stderr- (info)                                   โ”‚
โ”‚ pid         โ”‚ 1271262                                           โ”‚
โ”‚ hostname    โ”‚ host-name                                         โ”‚
โ”‚ platform    โ”‚ CPython 3.8.2 (Linux x86_64)                      โ”‚
โ”‚ drivers     โ”‚                                                   โ”‚
โ”‚   transport โ”‚ aiokafka=1.1.6                                    โ”‚
โ”‚   web       โ”‚ aiohttp=3.6.2                                     โ”‚
โ”‚ datadir     โ”‚ /path/to/project/horton-data                      โ”‚
โ”‚ appdir      โ”‚ /path/to/project/horton-data/v1                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...

โ”ŒTopic Partition Setโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ topic                      โ”‚ partitions โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ collect_securities         โ”‚ {0-7}      โ”‚
โ”‚ horton-__assignor-__leader โ”‚ {0}        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 

เบกเบฑเบ™เบกเบตเบŠเบตเบงเบดเบ”เบขเบนเปˆ!!!

เปƒเบซเป‰โ€‹เป€เบฎเบปเบฒโ€‹เป€เบšเบดเปˆเบ‡โ€‹เบ—เบตเปˆโ€‹เบเปเบฒโ€‹เบ™เบปเบ”โ€‹เป„เบงเป‰โ€‹เปเบšเปˆเบ‡โ€‹เบ›เบฑเบ™โ€‹. เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เป€เบซเบฑเบ™เป„เบ”เป‰, เบซเบปเบงเบ‚เปเป‰เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™เบ”เป‰เบงเบเบŠเบทเปˆเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบ™เบปเบ”เป„เบงเป‰เปƒเบ™เบฅเบฐเบซเบฑเบ”, เบˆเปเบฒเบ™เบงเบ™เบžเบฒเบ—เบดเบŠเบฑเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ (8, เป€เบญเบปเบฒเบกเบฒเบˆเบฒเบ. topic_partitions - เบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบงเบฑเบ”เบ–เบธเบ‚เบญเบ‡เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบ), เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเป„เบ”เป‰เบฅเบฐเบšเบธเบ„เปˆเบฒเบชเปˆเบงเบ™เบšเบธเบเบ„เบปเบ™เบชเปเบฒเบฅเบฑเบšเบซเบปเบงเบ‚เปเป‰เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ (เบœเปˆเบฒเบ™เบžเบฒเบ—เบดเบŠเบฑเบ™). เบ•เบปเบงเปเบ—เบ™เบ—เบตเปˆเป€เบ›เบตเบ”เบ•เบปเบงเปƒเบ™เบœเบนเป‰เบญเบญเบเปเบฎเบ‡เบ‡เบฒเบ™เปเบกเปˆเบ™เป„เบ”เป‰เบ–เบทเบเบกเบญเบšเบซเบกเบฒเบเปƒเบซเป‰เบ—เบฑเบ‡เบซเบกเบปเบ” 8 เบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบฑเบ™เป€เบ›เบฑเบ™เบžเบฝเบ‡เปเบ•เปˆเบซเบ™เบถเปˆเบ‡, เปเบ•เปˆเบ™เบตเป‰เบˆเบฐเบ–เบทเบเบ›เบถเบเบชเบฒเบซเบฒเบฅเบทเปƒเบ™เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเปƒเบ™เบชเปˆเบงเบ™เบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เบˆเบฑเบ”เบเบธเปˆเบก.

เปเบฅเป‰เบง, เบ•เบญเบ™เบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เป„เบ›เบซเบฒเบ›เปˆเบญเบ‡เบขเป‰เบฝเบกเบ›เบฒเบเบ—เบฒเบ‡เบญเบทเปˆเบ™เปเบฅเบฐเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบซเบงเปˆเบฒเบ‡เป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS เปƒเบŠเป‰ @ เบžเบงเบเป€เบฎเบปเบฒเบชเบฐเปเบ”เบ‡เปƒเบซเป‰เป€เบซเบฑเบ™เบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบฅเบฑเบ‡เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบกเบตเบŠเบทเปˆเบงเปˆเบฒ "collect_securities".

เปƒเบ™โ€‹เบเปโ€‹เบฅเบฐโ€‹เบ™เบตโ€‹เบ™เบตเป‰โ€‹, เบ‚เปเป‰โ€‹เบ„เบงเบฒเบกโ€‹เป„เบ”เป‰โ€‹เป„เบ›โ€‹เบซเบฒโ€‹เบžเบฒโ€‹เบ—เบดโ€‹เบŠเบฑเบ™ 6 - เบ—เปˆเบฒเบ™โ€‹เบชเบฒโ€‹เบกเบฒเบ”โ€‹เบเบงเบ”โ€‹เบชเบญเบšโ€‹เบเบฒเบ™โ€‹เบ™เบตเป‰โ€‹เป‚เบ”เบโ€‹เบเบฒเบ™โ€‹เป„เบ› kafdrop onโ€‹ localhost:9000

เป„เบ›เบซเบฒเบ›เปˆเบญเบ‡เบขเป‰เบฝเบกเบขเบนเปˆเบ›เบฒเบเบเบญเบ”เบเบฑเบšเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบซเบฑเบ™เบ‚เปเป‰เบ„เบงเบฒเบกเบ—เบตเปˆเบกเบตเบ„เบงเบฒเบกเบชเบธเบเบ—เบตเปˆเบ–เบทเบเบชเบปเปˆเบ‡เป„เบ›เป‚เบ”เบเปƒเบŠเป‰ loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบชเบฒเบกเบฒเบ”เป€เบšเบดเปˆเบ‡เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ mongo (เปƒเบŠเป‰ Robo3T เบซเบผเบท Studio3T) เปเบฅเบฐเป€เบšเบดเปˆเบ‡เบงเปˆเบฒเบซเบผเบฑเบเบŠเบฑเบšเปเบกเปˆเบ™เบขเบนเปˆเปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™:

เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบšเปเปˆเปเบกเปˆเบ™เบกเบฐเบซเบฒเป€เบชเบ”เบ–เบต, เปเบฅเบฐเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบกเบตเบ„เบงเบฒเบกเบžเปเปƒเบˆเบเบฑเบšเบ•เบปเบงเป€เบฅเบทเบญเบเบเบฒเบ™เป€เบšเบดเปˆเบ‡เบ—เปเบฒเบญเบดเบ”.

เบงเบฝเบเบ‡เบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบเปˆเบฝเบงเบเบฑเบš Faust, เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™ เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™เบงเบฝเบเบ‡เบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบเปˆเบฝเบงเบเบฑเบš Faust, เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™ เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™

เบ„เบงเบฒเบกเบชเบธเบเปเบฅเบฐเบ„เบงเบฒเบกเบชเบธเบ - เบ•เบปเบงเปเบ—เบ™เบ—เปเบฒเบญเบดเบ”เปเบกเปˆเบ™เบเบฝเบกเบžเป‰เบญเบก :)

เบ•เบปเบงเปเบ—เบ™เบžเป‰เบญเบกเปเบฅเป‰เบง เบ•เบปเบงเปเบ—เบ™เปƒเปเปˆ เบญเบฒเบเบธเบเบทเบ™!

เปเบกเปˆเบ™เปเบฅเป‰เบง, เบœเบนเป‰เบŠเบฒเบ, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบเบงเบกเป€เบญเบปเบฒเบžเบฝเบ‡เปเบ•เปˆ 1/3 เบ‚เบญเบ‡เป€เบชเบฑเป‰เบ™เบ—เบฒเบ‡เบ—เบตเปˆเบเบฐเบเบฝเบกเป‚เบ”เบเบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰, เปเบ•เปˆเบขเปˆเบฒเบ—เปเป‰เบ–เบญเบเปƒเบˆ, เป€เบžเบฒเบฐเบงเปˆเบฒเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบกเบฑเบ™เบˆเบฐเบ‡เปˆเบฒเบเบ‚เบถเป‰เบ™.

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™เบ•เบปเบงเปเบ—เบ™เบ—เบตเปˆเป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™ meta เปเบฅเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰เบกเบฑเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เป€เบญเบเบฐเบชเบฒเบ™เบเบฒเบ™เบฅเบงเบšเบฅเบงเบก:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบ•เบปเบงเปเบ—เบ™เบ™เบตเป‰เบˆเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™เบเปˆเบฝเบงเบเบฑเบšเบ„เบงเบฒเบกเบ›เบญเบ”เป„เบžเบชเบฐเป€เบžเบฒเบฐ, เบžเบงเบเป€เบฎเบปเบฒเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบŠเบตเป‰เบšเบญเบ ticker (เบชเบฑเบ™เบเบฒเบฅเบฑเบ) เบ‚เบญเบ‡เบ„เบงเบฒเบกเบ›เบญเบ”เป„เบžเบ™เบตเป‰เปƒเบ™เบ‚เปเป‰เบ„เบงเบฒเบก. เบชเปเบฒเบฅเบฑเบšเบˆเบธเบ”เบ›เบฐเบชเบปเบ‡เบ™เบตเป‰เปƒเบ™ faust เบกเบต เบเบฒเบ™เบšเบฑเบ™เบ—เบถเบ โ€” เบซเป‰เบญเบ‡โ€‹เบฎเบฝเบ™โ€‹เบ—เบตเปˆโ€‹เบ›เบฐโ€‹เบเบฒเบ”โ€‹เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เบ‚เปเป‰โ€‹เบ„เบงเบฒเบกโ€‹เปƒเบ™โ€‹เบซเบปเบงโ€‹เบ‚เปเป‰โ€‹เบ•เบปเบงโ€‹เปเบ—เบ™โ€‹.

เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, เปƒเบซเป‰เป„เบ› records.py เปเบฅเบฐเบญเบฐเบ—เบดเบšเบฒเบเบงเปˆเบฒเบ‚เปเป‰เบ„เบงเบฒเบกเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เบ™เบตเป‰เบ„เบงเบ™เบˆเบฐเป€เบ›เบฑเบ™เปเบ™เบงเปƒเบ”:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบ—เปˆเบฒเบ™เบญเบฒเบ”เบˆเบฐเป„เบ”เป‰เบ„เบฒเบ”เป€เบ”เบปเบฒ, faust เปƒเบŠเป‰เบ„เปเบฒเบšเบฑเบ™เบเบฒเบเบ›เบฐเป€เบžเบ” python เป€เบžเบทเปˆเบญเบญเบฐเบ—เบดเบšเบฒเบเบฎเบนเบšเปเบšเบšเบ‚เปเป‰เบ„เบงเบฒเบก, เป€เบŠเบดเปˆเบ‡เปเบกเปˆเบ™เป€เบซเบ”เบœเบปเบ™เบ—เบตเปˆเบงเปˆเบฒเบชเบฐเบšเบฑเบšเบ•เปเบฒเปˆเบชเบธเบ”เบ—เบตเปˆเบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบธเบ™เป‚เบ”เบเบซเป‰เบญเบ‡เบชเบฐเบซเบกเบธเบ”เปเบกเปˆเบ™. 3.6.

เปƒเบซเป‰เบเบฑเบšเบ„เบทเบ™เป„เบ›เบซเบฒเบ•เบปเบงเปเบ—เบ™, เบเปเบฒเบ™เบปเบ”เบ›เบฐเป€เบžเบ”เปเบฅเบฐเป€เบžเบตเปˆเบกเบกเบฑเบ™:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

เบ”เบฑเปˆเบ‡เบ—เบตเปˆเป€เบˆเบปเป‰เบฒเบชเบฒเบกเบฒเบ”เป€เบซเบฑเบ™เป„เบ”เป‰, เบžเบงเบเป€เบฎเบปเบฒเบœเปˆเบฒเบ™เบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เปƒเบซเบกเปˆเบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบเบฒเบ™เบเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบซเบปเบงเบ‚เปเป‰ - value_type . เบ™เบญเบเบˆเบฒเบเบ™เบฑเป‰เบ™, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เบ›เบฐเบ•เบดเบšเบฑเบ”เบ•เบฒเบกเป‚เบ„เบ‡เบเบฒเบ™เบ”เบฝเบงเบเบฑเบ™, เบชเบฐเบ™เบฑเป‰เบ™เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบšเปเปˆเป€เบซเบฑเบ™เบˆเบธเบ”เปƒเบ”เบ—เบตเปˆเบˆเบฐเบขเบนเปˆเบเบฑเบšเบชเบดเปˆเบ‡เบญเบทเปˆเบ™.

เบ”เบต, เบเบฒเบ™เบชเปเบฒเบžเบฑเบ”เบชเบธเบ”เบ—เป‰เบฒเบเปเบกเปˆเบ™เบเบฒเบ™เป€เบžเบตเปˆเบกเบเบฒเบ™เป‚เบ—เบซเบฒเบ•เบปเบงเปเบ—เบ™เป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™ meta เป€เบžเบทเปˆเบญ collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

เบžเบงเบเป€เบฎเบปเบฒเบ™เปเบฒเปƒเบŠเป‰เป‚เบ„เบ‡เบเบฒเบ™เบ—เบตเปˆเบ›เบฐเบเบฒเบ”เบเปˆเบญเบ™เบซเบ™เป‰เบฒเบ™เบตเป‰เบชเปเบฒเบฅเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบก. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เปƒเบŠเป‰เบงเบดเบ—เบตเบเบฒเบ™ .cast เบ™เบฑเบšเบ•เบฑเป‰เบ‡เปเบ•เปˆเบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบฅเปเบ–เป‰เบฒเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบˆเบฒเบเบ•เบปเบงเปเบ—เบ™, เปเบ•เปˆเบกเบฑเบ™เบชเบปเบกเบ„เบงเบ™เบ—เบตเปˆเบˆเบฐเบเปˆเบฒเบงเป€เบ–เบดเบ‡เบงเปˆเบฒ. เบ—เบฒเบ‡ เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰:

  1. cast - เบšเปเปˆเบ•เบฑเบ™เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบฑเบ™เบšเปเปˆเป„เบ”เป‰เบ„เบฒเบ”เบซเบงเบฑเบ‡เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบš. เบ—เปˆเบฒเบ™เบšเปเปˆเบชเบฒเบกเบฒเบ”เบชเบปเปˆเบ‡เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบญเบทเปˆเบ™เป€เบ›เบฑเบ™เบ‚เปเป‰เบ„เบงเบฒเบก.

  2. send - เบšเปเปˆเบ•เบฑเบ™เป€เบžเบฒเบฐเบงเปˆเบฒเบกเบฑเบ™เบšเปเปˆเป„เบ”เป‰เบ„เบฒเบ”เบซเบงเบฑเบ‡เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบš. เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบฅเบฐเบšเบธเบ•เบปเบงเปเบ—เบ™เปƒเบ™เบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบˆเบฐเป„เบ›.

  3. เบ–เบฒเบก - เบฅเปเบ–เป‰เบฒเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบš. เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบฅเบฐเบšเบธเบ•เบปเบงเปเบ—เบ™เปƒเบ™เบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบˆเบฐเป„เบ›.

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเบกเบตเบ•เบปเบงเปเบ—เบ™เบชเปเบฒเบฅเบฑเบšเบกเบทเป‰เบ™เบตเป‰!

เบ—เบตเบกเบ‡เบฒเบ™เบเบฑเบ™

เบชเบดเปˆเบ‡เบชเบธเบ”เบ—เป‰เบฒเบเบ—เบตเปˆเบ‚เป‰เบญเบเบชเบฑเบ™เบเบฒเบงเปˆเบฒเบˆเบฐเบ‚เบฝเบ™เปƒเบ™เบชเปˆเบงเบ™เบ™เบตเป‰เปเบกเปˆเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡. เบ”เบฑเปˆเบ‡เบ—เบตเปˆเป„เบ”เป‰เบเปˆเบฒเบงเบเปˆเบญเบ™เบซเบ™เป‰เบฒเบ™เบตเป‰, เบ„เปเบฒเบชเบฑเปˆเบ‡เปƒเบ™ faust เปเบกเปˆเบ™ wrapper เบ›เบฐเบกเบฒเบ™เบ„เบฅเบดเบ. เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡, faust เบžเบฝเบ‡เปเบ•เปˆเป€เบญเบปเบฒเบ„เปเบฒเบชเบฑเปˆเบ‡เบ—เบตเปˆเบเปเบฒเบซเบ™เบปเบ”เป€เบญเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปƒเบชเปˆเปƒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเบ‚เบญเบ‡เบกเบฑเบ™เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบเปเบฒเบ™เบปเบ”เบฅเบฐเบซเบฑเบ” -A

เบซเบผเบฑเบ‡เบˆเบฒเบเบ•เบปเบงเปเบ—เบ™เบ›เบฐเบเบฒเบ”เปƒเบ™ agents.py เป€เบžเบตเปˆเบกเบŸเบฑเบ‡เบŠเบฑเบ™เบ—เบตเปˆเบกเบตเป€เบ„เบทเปˆเบญเบ‡เบ•เบปเบšเปเบ•เปˆเบ‡ app.commandเป‚เบ—เบซเบฒเบงเบดเบ—เบตเบเบฒเบ™ เปเบกเปˆเบžเบดเบกเบชเปเบฒเบฅเบฑเบš ัƒ collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบ–เป‰เบฒเบžเบงเบเป€เบฎเบปเบฒเป‚เบ—เบซเบฒเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบ„เปเบฒเบชเบฑเปˆเบ‡, เบ„เปเบฒเบชเบฑเปˆเบ‡เปƒเบซเบกเปˆเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบขเบนเปˆเปƒเบ™เบกเบฑเบ™:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เปƒเบŠเป‰เบกเบฑเบ™เบ„เบทเบเบฑเบšเบœเบนเป‰เบญเบทเปˆเบ™, เบชเบฐเบ™เบฑเป‰เบ™เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเป€เบฅเบตเปˆเบกเบžเบฐเบ™เบฑเบเบ‡เบฒเบ™ faust เปƒเปเปˆ เปเบฅเบฐเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบเบฒเบ™เป€เบเบฑเบšเบซเบผเบฑเบเบŠเบฑเบšเบ—เบตเปˆเป€เบ•เบฑเบกเบฎเบนเบšเปเบšเบš:

> faust -A horton.agents start-collect-securities

เบˆเบฐเบกเบตเบซเบเบฑเบ‡เป€เบเบตเบ”เบ‚เบทเป‰เบ™เบ•เปเปˆเป„เบ›?

เปƒเบ™เบชเปˆเบงเบ™เบ•เปเปˆเป„เบ›, เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ•เบปเบงเปเบ—เบ™เบ—เบตเปˆเบเบฑเบ‡เป€เบซเบผเบทเบญเป€เบ›เบฑเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเบเบปเบ™เป„เบเบเบฒเบ™เบซเบฅเบปเป‰เบกเบˆเบปเบกเปƒเบ™เบเบฒเบ™เบ„เบปเป‰เบ™เบซเบฒเบ—เบตเปˆเบชเบธเบ”เปƒเบ™เบฅเบฒเบ„เบฒเบ›เบดเบ”เบ‚เบญเบ‡เบเบฒเบ™เบŠเบทเป‰เบ‚เบฒเบเบชเปเบฒเบฅเบฑเบšเบ›เบตเปเบฅเบฐเบเบฒเบ™เป€เบ›เบตเบ”เบ•เบปเบง cron เบ‚เบญเบ‡เบ•เบปเบงเปเบ—เบ™.

เบ™เบฑเป‰เบ™เปเบกเปˆเบ™เบ—เบฑเบ‡เปเบปเบ”เบชเบณเบฅเบฑเบšเบกเบทเป‰เบ™เบตเป‰! เบ‚เบญเบšเปƒเบˆเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบญเปˆเบฒเบ™ :)

เบฅเบฐเบซเบฑเบ”เบชเปเบฒเบฅเบฑเบšเบชเปˆเบงเบ™เบ™เบตเป‰

เบงเบฝเบเบ‡เบฒเบ™เบžเบทเป‰เบ™เบ–เบฒเบ™เบเปˆเบฝเบงเบเบฑเบš Faust, เบžเบฒเบเบ—เบต II: เบ•เบปเบงเปเบ—เบ™ เปเบฅเบฐเบ—เบตเบกเบ‡เบฒเบ™

PS เบžเบฒเบโ€‹เปƒเบ•เป‰โ€‹เบžเบฒเบโ€‹เบชเปˆเบงเบ™โ€‹เบชเบธเบ”โ€‹เบ—เป‰เบฒเบโ€‹เบ‚เป‰เบฒโ€‹เบžเบฐโ€‹เป€เบˆเบปเป‰เบฒโ€‹เป„เบ”เป‰โ€‹เบ–เบทเบโ€‹เบ–เบฒเบกโ€‹เบเปˆเบฝเบงโ€‹เบเบฑเบš faust เปเบฅเบฐ confluent kafka (confluent เบกเบตเบ„เบธเบ™เบชเบปเบกเบšเบฑเบ”เปเบ™เบงเปƒเบ”?). เบกเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒ confluent เบกเบตเบ›เบฐเป‚เบซเบเบ”เบซเบผเบฒเบเปƒเบ™เบซเบผเบฒเบเบงเบดเบ—เบต, เปเบ•เปˆเบ„เบงเบฒเบกเบˆเบดเบ‡เปเบฅเป‰เบงเปเบกเปˆเบ™เบงเปˆเบฒ faust เบšเปเปˆเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบฅเบนเบเบ„เป‰เบฒเบขเปˆเบฒเบ‡เป€เบ•เบฑเบกเบ—เบตเปˆเบชเปเบฒเบฅเบฑเบš confluent - เบ™เบตเป‰เปเบกเปˆเบ™เบกเบฒเบˆเบฒเบ. เบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบเบเปˆเบฝเบงเบเบฑเบšเบ‚เปเป‰เบˆเปเบฒเบเบฑเบ”เบ‚เบญเบ‡เบฅเบนเบเบ„เป‰เบฒเปƒเบ™เป€เบญเบเบฐเบชเบฒเบ™.

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™