āĻĢāĻžāωāĻ¸ā§āĻŸā§‡āϰ āĻĒāϟāĻ­ā§‚āĻŽāĻŋāϰ āĻ•āĻžāϜ, āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞ

āĻĢāĻžāωāĻ¸ā§āĻŸā§‡āϰ āĻĒāϟāĻ­ā§‚āĻŽāĻŋāϰ āĻ•āĻžāϜ, āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞ

āĻŦāĻŋāώāϝāĻŧāĻŦāĻ¸ā§āϤ⧁ āϏ⧂āϚāĻŋ

  1. āĻĒāĻ°ā§āĻŦ I: āĻ­ā§‚āĻŽāĻŋāĻ•āĻž

  2. āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞ

āφāĻŽāϰāĻž āĻāĻ–āĻžāύ⧇ āĻ•āĻŋ āĻ•āϰāĻ›āĻŋ?

āϏ⧁āϤāϰāĻžāĻ‚, āϤāĻžāχ, āĻĻā§āĻŦāĻŋāϤ⧀āϝāĻŧ āĻ…āĻ‚āĻļ. āϝ⧇āĻŽāύāϟāĻŋ āφāϗ⧇ āϞ⧇āĻ–āĻž āĻšāϝāĻŧ⧇āϛ⧇, āĻāϤ⧇ āφāĻŽāϰāĻž āύāĻŋāĻŽā§āύāϞāĻŋāĻ–āĻŋāϤāϗ⧁āϞāĻŋ āĻ•āϰāĻŦ:

  1. āφāĻŽāĻžāĻĻ⧇āϰ āĻĒā§āϰāϝāĻŧā§‹āϜāύ⧀āϝāĻŧ āĻļ⧇āώ āĻĒāϝāĻŧ⧇āĻ¨ā§āϟāϗ⧁āϞāĻŋāϰ āϜāĻ¨ā§āϝ āĻ…āύ⧁āϰ⧋āϧ āϏāĻš aiohttp-āĻ alphavantage-āĻāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āϛ⧋āϟ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϞāĻŋāĻ–āĻŋāĨ¤

  2. āφāϏ⧁āύ āĻāĻ•āϟāĻŋ āĻāĻœā§‡āĻ¨ā§āϟ āϤ⧈āϰāĻŋ āĻ•āϰāĻŋ āϝāĻžāϰāĻž āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϜ āĻāĻŦāĻ‚ āϤāĻžāĻĻ⧇āϰ āωāĻĒāϰ āĻŽā§‡āϟāĻž āϤāĻĨā§āϝ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰāĻŦ⧇āĨ¤

āĻ•āĻŋāĻ¨ā§āϤ⧁, āφāĻŽāϰāĻž āĻāχ āĻĒā§āϰāĻ•āĻ˛ā§āĻĒ⧇āϰ āϜāĻ¨ā§āϝāχ āĻāϟāĻŋ āĻ•āϰāĻŦ, āĻāĻŦāĻ‚ āĻĢāĻžāĻ¸ā§āϟ āĻ—āĻŦ⧇āώāĻŖāĻžāϰ āĻĒāϰāĻŋāĻĒā§āϰ⧇āĻ•ā§āώāĻŋāϤ⧇, āφāĻŽāϰāĻž āĻļāĻŋāĻ–āĻŦ āϕ⧀āĻ­āĻžāĻŦ⧇ āĻāĻœā§‡āĻ¨ā§āϟ āϞāĻŋāĻ–āϤ⧇ āĻšāϝāĻŧ āϝāĻž āĻ•āĻžāĻĢāĻ•āĻž āĻĨ⧇āϕ⧇ āχāϭ⧇āĻ¨ā§āϟāϗ⧁āϞāĻŋ āĻĒā§āϰāϏ⧇āϏ āĻ•āϰ⧇, āϏ⧇āχāϏāĻžāĻĨ⧇ āϕ⧀āĻ­āĻžāĻŦ⧇ āĻ•āĻŽāĻžāĻ¨ā§āĻĄ āϞāĻŋāĻ–āϤ⧇ āĻšāϝāĻŧ (āĻ•ā§āϞāĻŋāĻ• āĻ°ā§āϝāĻžāĻĒāĻžāϰ), āφāĻŽāĻžāĻĻ⧇āϰ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇ - āĻāĻœā§‡āĻ¨ā§āϟ āϝ⧇ āĻŦāĻŋāώāϝāĻŧ⧇ āύāϜāϰāĻĻāĻžāϰāĻŋ āĻ•āϰāϛ⧇ āϏ⧇āχ āĻŦāĻŋāώāϝāĻŧ⧇ āĻŽā§āϝāĻžāύ⧁āϝāĻŧāĻžāϞ āĻĒ⧁āĻļ āĻŦāĻžāĻ°ā§āϤāĻžāϰ āϜāĻ¨ā§āϝāĨ¤

āĻĒā§āϰāĻļāĻŋāĻ•ā§āώāĻŖ

āφāϞāĻĢāĻžāĻ­ā§āϝāĻžāĻ¨ā§āĻŸā§‡āϜ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ

āĻĒā§āϰāĻĨāĻŽā§‡, āĻ…ā§āϝāĻžāϞāĻĢāĻžāĻ­āĻžāĻ¨ā§āĻŸā§‡āĻœā§‡āϰ āĻ…āύ⧁āϰ⧋āϧ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āϛ⧋āϟ aiohttp āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϞāĻŋāĻ–āĻŋāĨ¤

alphavantage.py

āĻ­āĻ•ā§āώāĻ•

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

āĻĒā§āϰāĻ•ā§ƒāϤāĻĒāĻ•ā§āώ⧇, āĻāϟāĻŋ āĻĨ⧇āϕ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻĒāϰāĻŋāĻˇā§āĻ•āĻžāϰ:

  1. AlphaVantage API āĻŦ⧇āĻļ āϏāĻšāϜ āĻāĻŦāĻ‚ āϏ⧁āĻ¨ā§āĻĻāϰāĻ­āĻžāĻŦ⧇ āĻĄāĻŋāϜāĻžāχāύ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇, āϤāĻžāχ āφāĻŽāĻŋ āĻĒāĻĻā§āϧāϤāĻŋāϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āϏāĻŽāĻ¸ā§āϤ āĻ…āύ⧁āϰ⧋āϧ āĻ•āϰāĻžāϰ āϏāĻŋāĻĻā§āϧāĻžāĻ¨ā§āϤ āύāĻŋāϝāĻŧ⧇āĻ›āĻŋ construct_query āϝ⧇āĻ–āĻžāύ⧇ āϘ⧁āϰ⧇ āĻāĻ•āϟāĻŋ http āĻ•āϞ āφāϛ⧇āĨ¤

  2. āφāĻŽāĻŋ āϏāĻŦ āĻ•ā§āώ⧇āĻ¤ā§āϰ āφāύāĻž snake_case āφāϰāĻžāĻŽā§‡āϰ āϜāĻ¨ā§āϝ

  3. āĻ­āĻžāϞ, āϏ⧁āĻ¨ā§āĻĻāϰ āĻāĻŦāĻ‚ āϤāĻĨā§āϝāĻĒā§‚āĻ°ā§āĻŖ āĻŸā§āϰ⧇āϏāĻŦā§āϝāĻžāĻ• āφāωāϟāĻĒ⧁āϟ āϜāĻ¨ā§āϝ logger.catch āϏāĻœā§āϜāĻž.

PS config.yml-āĻ āĻ¸ā§āĻĨāĻžāύ⧀āϝāĻŧāĻ­āĻžāĻŦ⧇ alphavantage āĻŸā§‹āϕ⧇āύ āϝ⧋āĻ— āĻ•āϰāϤ⧇ āϭ⧁āϞāĻŦ⧇āύ āύāĻž, āĻ…āĻĨāĻŦāĻž āĻāύāĻ­āĻžāϝāĻŧāϰāύāĻŽā§‡āĻ¨ā§āϟ āϭ⧇āϰāĻŋāϝāĻŧ⧇āĻŦāϞ āĻāĻ•ā§āϏāĻĒā§‹āĻ°ā§āϟ āĻ•āϰāϤ⧇ āϭ⧁āϞāĻŦ⧇āύ āύāĻž HORTON_SERVICE_APIKEY. āφāĻŽāϰāĻž āĻāĻ•āϟāĻŋ āĻŸā§‹āϕ⧇āύ āĻ—ā§āϰāĻšāĻŖ āĻ•āϰāĻŋ āĻāĻ–āĻžāύ⧇.

CRUD āĻ•ā§āϞāĻžāϏ

āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϜ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻŽā§‡āϟāĻž āϤāĻĨā§āϝ āϏāĻ‚āϰāĻ•ā§āώāĻŖ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻ•āϟāĻŋ āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϜ āϏāĻ‚āĻ—ā§āϰāĻš āĻĨāĻžāĻ•āĻŦ⧇āĨ¤

database/security.py

āφāĻŽāĻžāϰ āĻŽāϤ⧇, āĻāĻ–āĻžāύ⧇ āĻ•āĻŋāϛ⧁ āĻŦā§āϝāĻžāĻ–ā§āϝāĻž āĻ•āϰāĻžāϰ āĻĒā§āϰāϝāĻŧā§‹āϜāύ āύ⧇āχ, āĻāĻŦāĻ‚ āĻŦ⧇āϏ āĻ•ā§āϞāĻžāϏ āύāĻŋāĻœā§‡āχ āĻŦ⧇āĻļ āϏāĻšāϜāĨ¤

āĻ…ā§āϝāĻžāĻĒ āϟāĻŋ āύāĻŋāύ()

āĻāĻ•āϟāĻŋ āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻŦāĻ¸ā§āϤ⧁ āϤ⧈āϰāĻŋ āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻĢāĻžāĻ‚āĻļāύ āϝ⧋āĻ— āĻ•āϰāĻž āϝāĻžāĻ• app.py

āĻ­āĻ•ā§āώāĻ•

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

āφāĻĒāĻžāϤāϤ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻžāϛ⧇ āϏāĻšāϜāϤāĻŽ āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āϤ⧈āϰāĻŋ āĻšāĻŦ⧇, āĻāĻ•āϟ⧁ āĻĒāϰ⧇ āφāĻŽāϰāĻž āĻāϟāĻŋāϕ⧇ āĻĒā§āϰāϏāĻžāϰāĻŋāϤ āĻ•āϰāĻŦ, āϤāĻŦ⧇, āφāĻĒāύāĻžāϕ⧇ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āύāĻž āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ, āĻāĻ–āĻžāύ⧇ āϤāĻĨā§āϝāϏ⧂āĻ¤ā§āϰ āĻ…ā§āϝāĻžāĻĒ-āĻ•ā§āϞāĻžāϏ⧇āĨ¤ āφāĻŽāĻŋ āφāĻĒāύāĻžāϕ⧇ āϏ⧇āϟāĻŋāĻ‚āϏ āĻ•ā§āϞāĻžāϏāϟāĻŋ āĻāĻ•āĻŦāĻžāϰ āĻĻ⧇āĻ–āĻžāϰ āĻĒāϰāĻžāĻŽāĻ°ā§āĻļ āĻĻāĻŋāĻšā§āĻ›āĻŋ, āϝ⧇āĻšā§‡āϤ⧁ āĻāϟāĻŋ āĻŦ⧇āĻļāĻŋāϰāĻ­āĻžāĻ— āϏ⧇āϟāĻŋāĻ‚āϏ⧇āϰ āϜāĻ¨ā§āϝ āĻĻāĻžāϝāĻŧā§€āĨ¤

āĻĒā§āϰāϧāĻžāύ āĻ…āĻ‚āĻļ

āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāĻœā§‡āϰ āϤāĻžāϞāĻŋāĻ•āĻž āϏāĻ‚āĻ—ā§āϰāĻš āĻ“ āϰāĻ•ā§āώāĻŖāĻžāĻŦ⧇āĻ•ā§āώāϪ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻœā§‡āĻ¨ā§āϟ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

āϏ⧁āϤāϰāĻžāĻ‚, āĻĒā§āϰāĻĨāĻŽā§‡ āφāĻŽāϰāĻž āĻĢāĻžāĻ¸ā§āϟ āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻ…āĻŦāĻœā§‡āĻ•ā§āϟāϟāĻŋ āĻĒāĻžāχ - āĻāϟāĻŋ āĻŦ⧇āĻļ āϏāĻšāϜāĨ¤ āĻāϰ āĻĒāϰ⧇, āφāĻŽāϰāĻž āĻ¸ā§āĻĒāĻˇā§āϟāĻ­āĻžāĻŦ⧇ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻŦāĻŋāώāϝāĻŧ āĻ˜ā§‹āώāĻŖāĻž āĻ•āϰāĻŋ... āĻāĻ–āĻžāύ⧇ āĻāϟāĻŋ āϕ⧀, āĻ…āĻ­ā§āϝāĻ¨ā§āϤāϰ⧀āĻŖ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ āϕ⧀ āĻāĻŦāĻ‚ āĻāϟāĻŋāϕ⧇ āϕ⧀āĻ­āĻžāĻŦ⧇ āφāϞāĻžāĻĻāĻžāĻ­āĻžāĻŦ⧇ āϏāĻžāϜāĻžāύ⧋ āϝ⧇āϤ⧇ āĻĒāĻžāϰ⧇ āϤāĻž āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāĻžāϰ āĻŽāϤ⧋āĨ¤

  1. āĻ•āĻžāĻĢāĻ•āĻžāϰ āϟāĻĒāĻŋāĻ•āϏ, āϝāĻĻāĻŋ āφāĻŽāϰāĻž āϏāĻ āĻŋāĻ• āϏāĻ‚āĻœā§āĻžāĻž āϜāĻžāύāϤ⧇ āϚāĻžāχ, āϤāĻžāĻšāϞ⧇ āĻĒāĻĄāĻŧāĻž āĻ­āĻžāϞ⧋ āĻŦāĻ¨ā§āϧ āύāĻĨāĻŋ, āĻ…āĻĨāĻŦāĻž āφāĻĒāύāĻŋ āĻĒāĻĄāĻŧāϤ⧇ āĻĒāĻžāϰ⧇āύ āĻŦāĻŋāĻŽā§‚āĻ°ā§āϤ āϰāĻžāĻļāĻŋāϝāĻŧāĻžāύ āĻ­āĻžāώāĻžāϝāĻŧ āĻšāĻžāĻŦā§āϰ⧇āϤ⧇, āϝ⧇āĻ–āĻžāύ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻŦ⧇āĻļ āϏāĻ āĻŋāĻ•āĻ­āĻžāĻŦ⧇ āĻĒā§āϰāϤāĻŋāĻĢāϞāĻŋāϤ āĻšāϝāĻŧ :)

  2. āĻ…āĻ­ā§āϝāĻ¨ā§āϤāϰ⧀āĻŖ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ, faust āĻĄāĻ•-āĻ āĻŦ⧇āĻļ āĻ­āĻžāϞāĻ­āĻžāĻŦ⧇ āĻŦāĻ°ā§āĻŖāύāĻž āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇, āφāĻŽāĻžāĻĻ⧇āϰāϕ⧇ āϕ⧋āĻĄā§‡ āϏāϰāĻžāϏāϰāĻŋ āĻŦāĻŋāώāϝāĻŧ āĻ•āύāĻĢāĻŋāĻ—āĻžāϰ āĻ•āϰāϤ⧇ āĻĻ⧇āϝāĻŧ, āĻ…āĻŦāĻļā§āϝāχ, āĻāϰ āĻ…āĻ°ā§āĻĨ āĻĢāĻžāĻ¸ā§āϟ āĻĄā§‡āϭ⧇āϞāĻĒāĻžāϰāĻĻ⧇āϰ āĻĻā§āĻŦāĻžāϰāĻž āĻĒā§āϰāĻĻāĻ¤ā§āϤ āĻĒāϰāĻžāĻŽāĻŋāϤāĻŋāϗ⧁āϞāĻŋ, āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ: āϧāĻžāϰāĻŖ, āϧāĻžāϰāĻŖ āύ⧀āϤāĻŋ (āĻĄāĻŋāĻĢāĻ˛ā§āϟāϰ⧂āĻĒ⧇ āĻŽā§āϛ⧇ āĻĢ⧇āϞ⧁āύ, āϤāĻŦ⧇ āφāĻĒāύāĻŋ āϏ⧇āϟ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ āύāĻŋāĻšā§āĻ›āĻŋāĻĻā§āϰ), āĻŦāĻŋāώāϝāĻŧ āĻĒā§āϰāϤāĻŋ āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ⧇āϰ āϏāĻ‚āĻ–ā§āϝāĻž (āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύāĻ•āϰāϤ⧇, āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ, āĻāϰ āĻšā§‡āϝāĻŧ⧇ āĻ•āĻŽ āĻŦāĻŋāĻļā§āĻŦāĻŦā§āϝāĻžāĻĒā§€ āϤāĻžā§ŽāĻĒāĻ°ā§āϝ āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻĢāĻžāĻ¸ā§āϟ)āĨ¤

  3. āϏāĻžāϧāĻžāϰāĻŖāĻ­āĻžāĻŦ⧇, āĻāĻœā§‡āĻ¨ā§āϟ āĻŦāĻŋāĻļā§āĻŦāĻŦā§āϝāĻžāĻĒā§€ āĻŽāĻžāύ āϏāĻš āĻāĻ•āϟāĻŋ āĻĒāϰāĻŋāϚāĻžāϞāĻŋāϤ āĻŦāĻŋāώāϝāĻŧ āϤ⧈āϰāĻŋ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇, āϤāĻŦ⧇, āφāĻŽāĻŋ āĻ¸ā§āĻĒāĻˇā§āϟāĻ­āĻžāĻŦ⧇ āϏāĻŦāĻ•āĻŋāϛ⧁ āĻ˜ā§‹āώāĻŖāĻž āĻ•āϰāϤ⧇ āϚāĻžāχāĨ¤ āωāĻĒāϰāĻ¨ā§āϤ⧁, āĻāĻœā§‡āĻ¨ā§āϟ āĻŦāĻŋāĻœā§āĻžāĻžāĻĒāύ⧇ āĻŦāĻŋāώāϝāĻŧ⧇āϰ āĻ•āĻŋāϛ⧁ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ (āωāĻĻāĻžāĻšāϰāĻŖāĻ¸ā§āĻŦāϰ⧂āĻĒ, āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ⧇āϰ āϏāĻ‚āĻ–ā§āϝāĻž āĻŦāĻž āϧāϰ⧇ āϰāĻžāĻ–āĻžāϰ āύ⧀āϤāĻŋ) āĻ•āύāĻĢāĻŋāĻ—āĻžāϰ āĻ•āϰāĻž āϝāĻžāĻŦ⧇ āύāĻžāĨ¤

    āĻŽā§āϝāĻžāύ⧁āϝāĻŧāĻžāϞāĻŋ āĻŦāĻŋāώāϝāĻŧāϟāĻŋ āϏāĻ‚āĻœā§āĻžāĻžāϝāĻŧāĻŋāϤ āύāĻž āĻ•āϰ⧇ āĻāϟāĻŋ āĻĻ⧇āĻ–āϤ⧇ āϕ⧇āĻŽāύ āĻšāϤ⧇ āĻĒāĻžāϰ⧇ āϤāĻž āĻāĻ–āĻžāύ⧇:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

āφāĻšā§āĻ›āĻž, āĻāĻ–āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻœā§‡āĻ¨ā§āϟ āϕ⧀ āĻ•āϰāĻŦ⧇ āϤāĻž āĻŦāĻ°ā§āĻŖāύāĻž āĻ•āϰāĻŋ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

āϏ⧁āϤāϰāĻžāĻ‚, āĻāĻœā§‡āĻ¨ā§āĻŸā§‡āϰ āĻļ⧁āϰ⧁āϤ⧇, āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āĻŸā§‡āϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡ āĻ…āύ⧁āϰ⧋āϧ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ aiohttp āϏ⧇āĻļāύ āϖ⧁āϞāĻŋāĨ¤ āĻāχāĻ­āĻžāĻŦ⧇, āĻāĻ•āϜāύ āĻ•āĻ°ā§āĻŽā§€ āĻļ⧁āϰ⧁ āĻ•āϰāĻžāϰ āϏāĻŽāϝāĻŧ, āϝāĻ–āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻœā§‡āĻ¨ā§āϟ āϚāĻžāϞ⧁ āĻ•āϰāĻž āĻšāϝāĻŧ, āϤāĻ–āύāχ āĻāĻ•āϟāĻŋ āϏ⧇āĻļāύ āĻ–ā§‹āϞāĻž āĻšāĻŦ⧇ - āĻāĻ•, āĻĒ⧁āϰ⧋ āϏāĻŽāϝāĻŧ āĻ•āĻ°ā§āĻŽā§€ āϚāϞāϛ⧇ (āĻ…āĻĨāĻŦāĻž āĻāĻ•āĻžāϧāĻŋāĻ•, āϝāĻĻāĻŋ āφāĻĒāύāĻŋ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ āĻĒāϰāĻŋāĻŦāĻ°ā§āϤāύ āĻ•āϰ⧇āύ) āϏāĻŽā§āĻĒāĻžāϤāĻŦāĻŋāĻ¨ā§āĻĻ⧁ āĻāĻ•āϟāĻŋ āĻĄāĻŋāĻĢāĻ˛ā§āϟ āχāωāύāĻŋāϟ āϏāĻš āĻāĻ•āϟāĻŋ āĻāĻœā§‡āĻ¨ā§āϟ āĻĨ⧇āϕ⧇)āĨ¤

āĻāϰāĻĒāϰ⧇, āφāĻŽāϰāĻž āĻ¸ā§āĻŸā§āϰ⧀āĻŽ āĻ…āύ⧁āϏāϰāĻŖ āĻ•āϰāĻŋ (āφāĻŽāϰāĻž āĻŦāĻžāĻ°ā§āϤāĻžāϟāĻŋ āϰāĻžāĻ–āĻŋ _, āϝ⧇āĻšā§‡āϤ⧁ āφāĻŽāϰāĻž, āĻāχ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡, āφāĻŽāĻžāĻĻ⧇āϰ āĻŦāĻŋāώāϝāĻŧ āĻĨ⧇āϕ⧇ āĻŦāĻžāĻ°ā§āϤāĻžāϗ⧁āϞāĻŋāϰ āĻŦāĻŋāώāϝāĻŧāĻŦāĻ¸ā§āϤ⧁ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āϚāĻŋāĻ¨ā§āϤāĻž āĻ•āϰāĻŋ āύāĻž, āϝāĻĻāĻŋ āϏ⧇āϗ⧁āϞāĻŋ āĻŦāĻ°ā§āϤāĻŽāĻžāύ āĻ…āĻĢāϏ⧇āĻŸā§‡ āĻŦāĻŋāĻĻā§āϝāĻŽāĻžāύ āĻĨāĻžāϕ⧇, āĻ…āĻ¨ā§āϝāĻĨāĻžāϝāĻŧ āφāĻŽāĻžāĻĻ⧇āϰ āϚāĻ•ā§āϰ āϤāĻžāĻĻ⧇āϰ āφāĻ—āĻŽāύ⧇āϰ āϜāĻ¨ā§āϝ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰāĻŦ⧇āĨ¤ āĻ āĻŋāĻ• āφāϛ⧇, āφāĻŽāĻžāĻĻ⧇āϰ āϞ⧁āĻĒ⧇āϰ āĻ­āĻŋāϤāϰ⧇, āφāĻŽāϰāĻž āĻŦāĻžāĻ°ā§āϤāĻžāϰ āϰāϏāĻŋāĻĻ āϞāĻ— āĻ•āϰāĻŋ, āϏāĻ•ā§āϰāĻŋāϝāĻŧ⧇āϰ āĻāĻ•āϟāĻŋ āϤāĻžāϞāĻŋāĻ•āĻž āĻĒāĻžāχ (get_securities āĻļ⧁āϧ⧁āĻŽāĻžāĻ¤ā§āϰ āĻĄāĻŋāĻĢāĻ˛ā§āϟāϰ⧂āĻĒ⧇ āϏāĻ•ā§āϰāĻŋāϝāĻŧ āĻšāϝāĻŧ, āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϕ⧋āĻĄ āĻĻ⧇āϖ⧁āύ) āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϜ āĻāĻŦāĻ‚ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ⧇ āϏāĻ‚āϰāĻ•ā§āώāĻŖ āĻ•āϰāĻŋ, āĻāĻ•āχ āϟāĻŋāĻ•āĻžāϰ⧇āϰ āϏāĻžāĻĨ⧇ āĻāĻ•āϟāĻŋ āύāĻŋāϰāĻžāĻĒāĻ¤ā§āϤāĻž āφāϛ⧇ āĻ•āĻŋāύāĻž āϤāĻž āĻĒāϰ⧀āĻ•ā§āώāĻž āĻ•āϰ⧇ āĻāĻŦāĻ‚ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ⧇ āĻŦāĻŋāύāĻŋāĻŽāϝāĻŧ, āϝāĻĻāĻŋ āĻĨāĻžāϕ⧇, āϤāĻŦ⧇ āĻāϟāĻŋ (āĻ•āĻžāĻ—āϜ) āϕ⧇āĻŦāϞ āφāĻĒāĻĄā§‡āϟ āĻ•āϰāĻž āĻšāĻŦ⧇āĨ¤

āφāĻŽāĻžāĻĻ⧇āϰ āϏ⧃āĻˇā§āϟāĻŋ āϚāĻžāϞ⧁ āĻ•āϰāĻž āϝāĻžāĻ•!

> docker-compose up -d
... ЗаĐŋ҃ҁĐē ĐēĐžĐŊŅ‚ĐĩĐšĐŊĐĩŅ€ĐžĐ˛ ...
> faust -A horton.agents worker --without-web -l info

āĻĒāĻŋāĻāϏ āĻŦ⧈āĻļāĻŋāĻˇā§āĻŸā§āϝ āĻ“āϝāĻŧ⧇āĻŦ āωāĻĒāĻžāĻĻāĻžāύ āφāĻŽāĻŋ āύāĻŋāĻŦāĻ¨ā§āϧāϗ⧁āϞāĻŋāϤ⧇ āĻĢāĻžāĻ¸ā§āϟ āĻŦāĻŋāĻŦ⧇āϚāύāĻž āĻ•āϰāĻŦ āύāĻž, āϤāĻžāχ āφāĻŽāϰāĻž āωāĻĒāϝ⧁āĻ•ā§āϤ āĻĒāϤāĻžāĻ•āĻž āϏ⧇āϟ āĻ•āϰ⧇āĻ›āĻŋāĨ¤

āφāĻŽāĻžāĻĻ⧇āϰ āϞāĻžā§āϚ āĻ•āĻŽāĻžāĻ¨ā§āĻĄā§‡, āφāĻŽāϰāĻž āϤāĻĨā§āϝ āϞāĻ— āφāωāϟāĻĒ⧁āϟ āĻ¸ā§āϤāϰ⧇āϰ āϏāĻžāĻĨ⧇ āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻ…āĻŦāĻœā§‡āĻ•ā§āϟāϟāĻŋ āϕ⧋āĻĨāĻžāϝāĻŧ āϖ⧁āρāϜāϤ⧇ āĻšāĻŦ⧇ āĻāĻŦāĻ‚ āĻāϟāĻŋāϰ āϏāĻžāĻĨ⧇ āϕ⧀ āĻ•āϰāϤ⧇ āĻšāĻŦ⧇ (āĻāĻ•āϜāύ āĻ•āĻ°ā§āĻŽā§€ āϞāĻžā§āϚ āĻ•āϰ⧁āύ) āĻŦāϞ⧇āĻ›āĻŋāϞāĻžāĻŽāĨ¤ āφāĻŽāϰāĻž āύāĻŋāĻŽā§āύāϞāĻŋāĻ–āĻŋāϤ āφāωāϟāĻĒ⧁āϟ āĻĒ⧇āϤ⧇:

āĻ­āĻ•ā§āώāĻ•

ڟaÂĩS† v1.10.4â”Ŧ───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... ĐģĐžĐŗĐ¸, ĐģĐžĐŗĐ¸, ĐģĐžĐŗĐ¸ ...

┌Topic Partition Set─────────â”Ŧ────────────┐
│ topic                      │ partitions │
├────────────────────────────â”ŧ────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

āĻāϟāĻž āĻœā§€āĻŦāĻŋāϤ!!!

āφāϏ⧁āύ āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ āϏ⧇āϟāϟāĻŋ āĻĻ⧇āĻ–āĻŋāĨ¤ āφāĻŽāϰāĻž āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāĻšā§āĻ›āĻŋ, āĻāĻ•āϟāĻŋ āĻŦāĻŋāώāϝāĻŧ āϤ⧈āϰāĻŋ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ āϝ⧇ āύāĻžāĻŽāϟāĻŋ āφāĻŽāϰāĻž āϕ⧋āĻĄā§‡ āĻŽāύ⧋āύ⧀āϤ āĻ•āϰ⧇āĻ›āĻŋ, āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ⧇āϰ āĻĄāĻŋāĻĢāĻ˛ā§āϟ āϏāĻ‚āĻ–ā§āϝāĻž (8, āĻĨ⧇āϕ⧇ āύ⧇āĻ“āϝāĻŧāĻž topic_partitions - āĻ…ā§āϝāĻžāĻĒā§āϞāĻŋāϕ⧇āĻļāύ āĻ…āĻŦāĻœā§‡āĻ•ā§āϟ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ), āϝ⧇āĻšā§‡āϤ⧁ āφāĻŽāϰāĻž āφāĻŽāĻžāĻĻ⧇āϰ āĻŦāĻŋāώāϝāĻŧ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻĒ⧃āĻĨāĻ• āĻŽāĻžāύ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻ•āϰāĻŋāύāĻŋ (āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ⧇āϰ āĻŽāĻžāĻ§ā§āϝāĻŽā§‡)āĨ¤ āĻ“āϝāĻŧāĻžāĻ°ā§āĻ•āĻžāϰ⧇ āϞāĻžā§āϚ āĻ•āϰāĻž āĻāĻœā§‡āĻ¨ā§āϟāϕ⧇ āϏāĻŽāĻ¸ā§āϤ 8āϟāĻŋ āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ āĻŦāϰāĻžāĻĻā§āĻĻ āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇, āϝ⧇āĻšā§‡āϤ⧁ āĻāϟāĻŋ āĻāĻ•āĻŽāĻžāĻ¤ā§āϰ, āϤāĻŦ⧇ āĻāϟāĻŋ āĻ•ā§āϞāĻžāĻ¸ā§āϟāĻžāϰāĻŋāĻ‚ āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āĻ…āĻ‚āĻļ⧇ āφāϰāĻ“ āĻŦāĻŋāĻļāĻĻ⧇ āφāϞ⧋āϚāύāĻž āĻ•āϰāĻž āĻšāĻŦ⧇āĨ¤

āĻ āĻŋāĻ• āφāϛ⧇, āĻāĻ–āύ āφāĻŽāϰāĻž āĻ…āĻ¨ā§āϝ āϟāĻžāĻ°ā§āĻŽāĻŋāύāĻžāϞ āωāχāĻ¨ā§āĻĄā§‹āϤ⧇ āϝ⧇āϤ⧇ āĻĒāĻžāϰāĻŋ āĻāĻŦāĻ‚ āφāĻŽāĻžāĻĻ⧇āϰ āĻŦāĻŋāώāϝāĻŧ⧇ āĻāĻ•āϟāĻŋ āĻ–āĻžāϞāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāϤ⧇ āĻĒāĻžāϰāĻŋ:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

āĻĒāĻŋāĻāϏ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ @ āφāĻŽāϰāĻž āĻĻ⧇āĻ–āĻžāχ āϝ⧇ āφāĻŽāϰāĻž "collect_securities" āύāĻžāĻŽā§‡āϰ āĻāĻ•āϟāĻŋ āĻŦāĻŋāώāϝāĻŧ⧇ āĻāĻ•āϟāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāĻšā§āĻ›āĻŋāĨ¤

āĻāχ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āĻŦāĻžāĻ°ā§āϤāĻžāϟāĻŋ āĻĒāĻžāĻ°ā§āϟāĻŋāĻļāύ 6-āĻ āĻ—āĻŋāϝāĻŧ⧇āĻ›āĻŋāϞ - āφāĻĒāύāĻŋ āĻ•ā§āϝāĻžāĻĢāĻĄā§āϰāĻĒ āĻ…āύ-āĻ āĻ—āĻŋāϝāĻŧ⧇ āĻāϟāĻŋ āĻĒāϰ⧀āĻ•ā§āώāĻž āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ localhost:9000

āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻ°ā§āĻŽā§€āϰ āϏāĻžāĻĨ⧇ āϟāĻžāĻ°ā§āĻŽāĻŋāύāĻžāϞ āωāχāĻ¨ā§āĻĄā§‹āϤ⧇ āĻ—āĻŋāϝāĻŧ⧇, āφāĻŽāϰāĻž loguru āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇ āĻĒāĻžāĻ āĻžāύ⧋ āĻāĻ•āϟāĻŋ āϖ⧁āĻļāĻŋāϰ āĻŦāĻžāĻ°ā§āϤāĻž āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāĻŦ:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

āφāĻŽāϰāĻž āĻŽāĻ™ā§āĻ—ā§‹ (Robo3T āĻŦāĻž Studio3T āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇) āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāϰāĻŋ āĻāĻŦāĻ‚ āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāϰāĻŋ āϝ⧇ āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϗ⧁āϞāĻŋ āĻĄāĻžāϟāĻžāĻŦ⧇āϏ⧇ āϰāϝāĻŧ⧇āϛ⧇:

āφāĻŽāĻŋ āĻŦāĻŋāϞāĻŋāϝāĻŧāύ⧇āϝāĻŧāĻžāϰ āύāχ, āĻāĻŦāĻ‚ āϤāĻžāχ āφāĻŽāϰāĻž āĻĒā§āϰāĻĨāĻŽ āĻĻ⧇āĻ–āĻžāϰ āĻŦāĻŋāĻ•āĻ˛ā§āĻĒ⧇ āϏāĻ¨ā§āϤ⧁āĻˇā§āϟāĨ¤

āĻĢāĻžāωāĻ¸ā§āĻŸā§‡āϰ āĻĒāϟāĻ­ā§‚āĻŽāĻŋāϰ āĻ•āĻžāϜ, āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞāĻĢāĻžāωāĻ¸ā§āĻŸā§‡āϰ āĻĒāϟāĻ­ā§‚āĻŽāĻŋāϰ āĻ•āĻžāϜ, āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞ

āϏ⧁āĻ– āĻāĻŦāĻ‚ āφāύāĻ¨ā§āĻĻ - āĻĒā§āϰāĻĨāĻŽ āĻāĻœā§‡āĻ¨ā§āϟ āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ :)

āĻāĻœā§‡āĻ¨ā§āϟ āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ, āύāϤ⧁āύ āĻāĻœā§‡āĻ¨ā§āϟ āĻĻā§€āĻ°ā§āϘāĻœā§€āĻŦā§€ āĻšā§‹āĻ•!

āĻšā§āϝāĻžāρ, āĻ­āĻĻā§āϰāϞ⧋āĻ•, āφāĻŽāϰāĻž āĻāχ āύāĻŋāĻŦāĻ¨ā§āϧāϟāĻŋ āĻĻā§āĻŦāĻžāϰāĻž āĻĒā§āϰāĻ¸ā§āϤ⧁āϤ āĻĒāĻĨ⧇āϰ āĻŽāĻžāĻ¤ā§āϰ 1/3 āĻ…āĻ‚āĻļ āĻ•āĻ­āĻžāϰ āĻ•āϰ⧇āĻ›āĻŋ, āϤāĻŦ⧇ āύāĻŋāĻ°ā§ā§ŽāϏāĻžāĻšāĻŋāϤ āĻšāĻŦ⧇āύ āύāĻž, āĻ•āĻžāϰāĻŖ āĻāĻ–āύ āĻāϟāĻŋ āφāϰāĻ“ āϏāĻšāϜ āĻšāĻŦ⧇āĨ¤

āϏ⧁āϤāϰāĻžāĻ‚ āĻāĻ–āύ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻŽāύ āĻāĻ•āϜāύ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡āϰ āĻĒā§āϰāϝāĻŧā§‹āϜāύ āϝāĻž āĻŽā§‡āϟāĻž āϤāĻĨā§āϝ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰ⧇ āĻāĻŦāĻ‚ āĻāĻ•āϟāĻŋ āϏāĻ‚āĻ—ā§āϰāĻš āύāĻĨāĻŋāϤ⧇ āϰāĻžāϖ⧇:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

āϝ⧇āĻšā§‡āϤ⧁ āĻāχ āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻ•āϟāĻŋ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āύāĻŋāϰāĻžāĻĒāĻ¤ā§āϤāĻž āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āϤāĻĨā§āϝ āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻ•āϰāĻŦ⧇, āϤāĻžāχ āφāĻŽāĻžāĻĻ⧇āϰ āĻŦāĻžāĻ°ā§āϤāĻžāϝāĻŧ āĻāχ āύāĻŋāϰāĻžāĻĒāĻ¤ā§āϤāĻžāϰ āϟāĻŋāĻ•āĻžāϰ (āĻĒā§āϰāϤ⧀āĻ•) āύāĻŋāĻ°ā§āĻĻ⧇āĻļ āĻ•āϰāϤ⧇ āĻšāĻŦ⧇āĨ¤ āĻāχ āωāĻĻā§āĻĻ⧇āĻļā§āϝ⧇ āĻĢāĻžāωāĻ¸ā§āĻŸā§‡ āφāϛ⧇ āϰ⧇āĻ•āĻ°ā§āĻĄāϏ — āĻ•ā§āϞāĻžāϏ āϝ⧇āϗ⧁āϞāĻŋ āĻāĻœā§‡āĻ¨ā§āϟ āĻŦāĻŋāώāϝāĻŧ⧇ āĻŦāĻžāĻ°ā§āϤāĻž āĻ¸ā§āĻ•āĻŋāĻŽ āĻ˜ā§‹āώāĻŖāĻž āĻ•āϰ⧇āĨ¤

āĻāχ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āĻāϰ āϝāĻžāύ records.py āĻāĻŦāĻ‚ āĻāχ āĻŦāĻŋāώāϝāĻŧ⧇āϰ āĻŦāĻžāĻ°ā§āϤāĻžāϟāĻŋ āϕ⧇āĻŽāύ āĻšāĻ“āϝāĻŧāĻž āωāϚāĻŋāϤ āϤāĻž āĻŦāĻ°ā§āĻŖāύāĻž āĻ•āϰ⧁āύ:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

āφāĻĒāύāĻŋ āĻ…āύ⧁āĻŽāĻžāύ āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ, faust āĻŦāĻžāĻ°ā§āϤāĻž āĻ¸ā§āĻ•āĻŋāĻŽāĻž āĻŦāĻ°ā§āĻŖāύāĻž āĻ•āϰāϤ⧇ āĻĒāĻžāχāĻĨāύ āϟāĻžāχāĻĒ āĻŸā§€āĻ•āĻž āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇, āϝāĻžāϰ āĻ•āĻžāϰāϪ⧇ āϞāĻžāχāĻŦā§āϰ⧇āϰāĻŋ āĻĻā§āĻŦāĻžāϰāĻž āϏāĻŽāĻ°ā§āĻĨāĻŋāϤ āϏāĻ°ā§āĻŦāύāĻŋāĻŽā§āύ āϏāĻ‚āĻ¸ā§āĻ•āϰāĻŖ 3.6.

āφāϏ⧁āύ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡ āĻĢāĻŋāϰ⧇ āϝāĻžāχ, āĻĒā§āϰāĻ•āĻžāϰāϗ⧁āϞāĻŋ āϏ⧇āϟ āĻ•āϰ⧁āύ āĻāĻŦāĻ‚ āĻāϟāĻŋ āϝ⧋āĻ— āĻ•āϰ⧁āύ:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

āφāĻĒāύāĻŋ āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāĻšā§āϛ⧇āύ, āφāĻŽāϰāĻž āϟāĻĒāĻŋāĻ• āĻĒā§āϰāĻžāϰāĻŽā§āĻ­āĻŋāĻ• āĻĒāĻĻā§āϧāϤāĻŋāϤ⧇ āĻāĻ•āϟāĻŋ āĻ¸ā§āĻ•āĻŋāĻŽ āϏāĻš āĻāĻ•āϟāĻŋ āύāϤ⧁āύ āĻĒā§āϝāĻžāϰāĻžāĻŽāĻŋāϟāĻžāϰ āĻĒāĻžāϏ āĻ•āϰāĻŋ - value_typeāĨ¤ āφāϰāĻ“, āϏāĻŦāĻ•āĻŋāϛ⧁ āĻāĻ•āχ āĻ¸ā§āĻ•āĻŋāĻŽ āĻ…āύ⧁āϏāϰāĻŖ āĻ•āϰ⧇, āϤāĻžāχ āφāĻŽāĻŋ āĻ…āĻ¨ā§āϝ āĻ•āĻŋāϛ⧁āϤ⧇ āĻĨāĻžāĻ•āĻžāϰ āϕ⧋āύ āĻŦāĻŋāĻ¨ā§āĻĻ⧁ āĻĻ⧇āĻ–āϤ⧇ āĻĒāĻžāĻšā§āĻ›āĻŋ āύāĻžāĨ¤

āĻ āĻŋāĻ• āφāϛ⧇, āĻšā§‚āĻĄāĻŧāĻžāĻ¨ā§āϤ āĻ¸ā§āĻĒāĻ°ā§āĻļ āĻšāϞ āĻŽā§‡āϟāĻž āϤāĻĨā§āϝ āϏāĻ‚āĻ—ā§āϰāĻšāĻ•āĻžāϰ⧀ āĻāĻœā§‡āĻ¨ā§āϟāϕ⧇ collect_securitites āĻ•āϰāĻžāϰ āϜāĻ¨ā§āϝ āĻāĻ•āϟāĻŋ āĻ•āϞ āϝ⧋āĻ— āĻ•āϰāĻž:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

āφāĻŽāϰāĻž āĻŦāĻžāĻ°ā§āϤāĻžāϰ āϜāĻ¨ā§āϝ āĻĒā§‚āĻ°ā§āĻŦ⧇ āĻ˜ā§‹āώāĻŋāϤ āĻ¸ā§āĻ•āĻŋāĻŽ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāĻŋāĨ¤ āĻāχ āĻ•ā§āώ⧇āĻ¤ā§āϰ⧇, āφāĻŽāĻŋ .cast āĻĒāĻĻā§āϧāϤāĻŋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇āĻ›āĻŋ āϝ⧇āĻšā§‡āϤ⧁ āφāĻŽāĻžāĻĻ⧇āϰ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡āϰ āĻĢāϞāĻžāĻĢāϞ⧇āϰ āϜāĻ¨ā§āϝ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰāϤ⧇ āĻšāĻŦ⧇ āύāĻž, āϤāĻŦ⧇ āĻāϟāĻŋ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāĻžāϰ āĻŽāϤ⧋ āωāĻĒāĻžāϝāĻŧ āĻŦāĻŋāώāϝāĻŧ⧇ āĻāĻ•āϟāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻĒāĻžāĻ āĻžāύ:

  1. cast - āĻŦā§āϞāĻ• āĻ•āϰ⧇ āύāĻž āĻ•āĻžāϰāĻŖ āĻāϟāĻŋ āĻĢāϞāĻžāĻĢāϞ āφāĻļāĻž āĻ•āϰ⧇ āύāĻžāĨ¤ āφāĻĒāύāĻŋ āĻāĻ•āϟāĻŋ āĻŦāĻžāĻ°ā§āϤāĻž āĻšāĻŋāϏāĻžāĻŦ⧇ āĻ…āĻ¨ā§āϝ āĻŦāĻŋāώāϝāĻŧ⧇ āĻĢāϞāĻžāĻĢāϞ āĻĒāĻžāĻ āĻžāϤ⧇ āĻĒāĻžāϰāĻŦ⧇āύ āύāĻž.

  2. send - āĻŦā§āϞāĻ• āĻ•āϰ⧇ āύāĻž āĻ•āĻžāϰāĻŖ āĻāϟāĻŋ āĻĢāϞāĻžāĻĢāϞ āφāĻļāĻž āĻ•āϰ⧇ āύāĻžāĨ¤ āφāĻĒāύāĻŋ āĻāĻ•āϟāĻŋ āĻāĻœā§‡āĻ¨ā§āϟ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ āϝ⧇ āĻŦāĻŋāώāϝāĻŧ⧇ āĻĢāϞāĻžāĻĢāϞ āϝāĻžāĻŦ⧇āĨ¤

  3. āϜāĻŋāĻœā§āĻžāĻžāϏāĻž āĻ•āϰ⧁āύ - āĻĢāϞāĻžāĻĢāϞ⧇āϰ āϜāĻ¨ā§āϝ āĻ…āĻĒ⧇āĻ•ā§āώāĻž āĻ•āϰ⧁āύāĨ¤ āφāĻĒāύāĻŋ āĻāĻ•āϟāĻŋ āĻāĻœā§‡āĻ¨ā§āϟ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāϤ⧇ āĻĒāĻžāϰ⧇āύ āϝ⧇ āĻŦāĻŋāώāϝāĻŧ⧇ āĻĢāϞāĻžāĻĢāϞ āϝāĻžāĻŦ⧇āĨ¤

āϏ⧁āϤāϰāĻžāĻ‚, āφāϜāϕ⧇āϰ āϜāĻ¨ā§āϝ āĻāĻœā§‡āĻ¨ā§āϟāĻĻ⧇āϰ āϏāĻžāĻĨ⧇ āϝ⧇ āϏāĻŦ!

āĻ¸ā§āĻŦāĻĒā§āύ⧇āϰ āĻĻāϞ

āĻļ⧇āώ āϜāĻŋāύāĻŋāϏāϟāĻŋ āφāĻŽāĻŋ āĻāχ āĻ…āĻ‚āĻļ⧇ āϞāĻŋāĻ–āϤ⧇ āĻĒā§āϰāϤāĻŋāĻļā§āϰ⧁āϤāĻŋ āĻĻāĻŋāϝāĻŧ⧇āĻ›āĻŋ āϤāĻž āĻšāϞ āĻ•āĻŽāĻžāĻ¨ā§āĻĄāĨ¤ āφāϗ⧇āχ āωāĻ˛ā§āϞ⧇āĻ– āĻ•āϰāĻž āĻšāϝāĻŧ⧇āϛ⧇, faust-āĻ āĻ•āĻŽāĻžāĻ¨ā§āĻĄāϗ⧁āϞāĻŋ āĻ•ā§āϞāĻŋāϕ⧇āϰ āϚāĻžāϰāĻĒāĻžāĻļ⧇ āĻāĻ•āϟāĻŋ āĻŽā§‹āĻĄāĻŧāĻ•āĨ¤ āφāϏāϞ⧇, faust -A āϕ⧀ āύāĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āϟ āĻ•āϰāĻžāϰ āϏāĻŽāϝāĻŧ āφāĻŽāĻžāĻĻ⧇āϰ āĻ•āĻžāĻ¸ā§āϟāĻŽ āĻ•āĻŽāĻžāĻ¨ā§āĻĄāϕ⧇ āĻāϰ āχāĻ¨ā§āϟāĻžāϰāĻĢ⧇āϏ⧇ āϏāĻ‚āϝ⧁āĻ•ā§āϤ āĻ•āϰ⧇

āĻ˜ā§‹āώāĻŋāϤ āĻāĻœā§‡āĻ¨ā§āϟāĻĻ⧇āϰ āĻĒāϰ⧇ agents.py āĻĄā§‡āϕ⧋āϰ⧇āϟāϰ⧇āϰ āϏāĻžāĻĨ⧇ āĻāĻ•āϟāĻŋ āĻĢāĻžāĻ‚āĻļāύ āϝ⧋āĻ— āĻ•āϰ⧁āύ app.commandāĻĒāĻĻā§āϧāϤāĻŋ āĻ•āϞāĻŋāĻ‚ āύāĻŋāĻ•ā§āώ⧇āĻĒ Ņƒ āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāϏ āϏāĻ‚āĻ—ā§āϰāĻš āĻ•āϰ⧁āύ:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

āϏ⧁āϤāϰāĻžāĻ‚, āϝāĻĻāĻŋ āφāĻŽāϰāĻž āĻ•āĻŽāĻžāĻ¨ā§āĻĄā§‡āϰ āϤāĻžāϞāĻŋāĻ•āĻž āĻ•āϞ āĻ•āϰāĻŋ, āφāĻŽāĻžāĻĻ⧇āϰ āύāϤ⧁āύ āĻ•āĻŽāĻžāĻ¨ā§āĻĄ āĻāϤ⧇ āĻĨāĻžāĻ•āĻŦ⧇:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

āφāĻŽāϰāĻž āĻāϟāĻŋāϕ⧇ āĻ…āĻ¨ā§āϝ āĻ•āĻžāϰāĻ“ āĻŽāϤ⧋ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰāϤ⧇ āĻĒāĻžāϰāĻŋ, āϤāĻžāχ āφāϏ⧁āύ āĻĢāĻžāĻ¸ā§āϟ āĻ“āϝāĻŧāĻžāĻ°ā§āĻ•āĻžāϰāϟāĻŋ āĻĒ⧁āύāϰāĻžāϝāĻŧ āϚāĻžāϞ⧁ āĻ•āϰāĻŋ āĻāĻŦāĻ‚ āϏāĻŋāĻ•āĻŋāωāϰāĻŋāϟāĻŋāĻœā§‡āϰ āĻāĻ•āϟāĻŋ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āϏāĻ‚āĻ—ā§āϰāĻš āĻļ⧁āϰ⧁ āĻ•āϰāĻŋ:

> faust -A horton.agents start-collect-securities

āĻĒāϰāĻŦāĻ°ā§āϤ⧀āϤ⧇ āϕ⧀ āĻšāĻŦ⧇?

āĻĒāϰāĻŦāĻ°ā§āϤ⧀ āĻ…āĻ‚āĻļ⧇, āĻāĻ•āϟāĻŋ āωāĻĻāĻžāĻšāϰāĻŖ āĻšāĻŋāϏāĻžāĻŦ⧇ āĻ…āĻŦāĻļāĻŋāĻˇā§āϟ āĻāĻœā§‡āĻ¨ā§āϟ āĻŦā§āϝāĻŦāĻšāĻžāϰ āĻ•āϰ⧇, āφāĻŽāϰāĻž āĻŦāĻ›āϰ⧇āϰ āĻŸā§āϰ⧇āĻĄāĻŋāĻ‚ āĻāĻŦāĻ‚ āĻ•ā§āϰāύ āϞāĻžā§āĻšā§‡āϰ āĻāĻœā§‡āĻ¨ā§āĻŸā§‡āϰ āĻ•ā§āϞ⧋āϜāĻŋāĻ‚ āĻĒā§āϰāĻžāχāϏ⧇āϰ āϚāϰāĻŽāϤāĻž āĻ…āύ⧁āϏāĻ¨ā§āϧāĻžāύ⧇āϰ āϜāĻ¨ā§āϝ āϏāĻŋāĻ™ā§āĻ• āĻĒā§āϰāĻ•ā§āϰāĻŋāϝāĻŧāĻž āĻŦāĻŋāĻŦ⧇āϚāύāĻž āĻ•āϰāĻŦāĨ¤

āφāϜ āϝ⧇ āϜāĻ¨ā§āϝ āϏāĻŦ! āĻĒāĻĄāĻŧāĻžāϰ āϜāĻ¨ā§āϝ āϧāĻ¨ā§āϝāĻŦāĻžāĻĻ :)

āĻāχ āĻ…āĻ‚āĻļ⧇āϰ āϜāĻ¨ā§āϝ āϕ⧋āĻĄ

āĻĢāĻžāωāĻ¸ā§āĻŸā§‡āϰ āĻĒāϟāĻ­ā§‚āĻŽāĻŋāϰ āĻ•āĻžāϜ, āĻĒāĻžāĻ°ā§āϟ II: āĻāĻœā§‡āĻ¨ā§āϟ āĻāĻŦāĻ‚ āĻĻāϞ

āĻĒāĻŋāĻāϏ āĻļ⧇āώ āĻ…āĻ‚āĻļ⧇āϰ āĻ…āϧ⧀āύ⧇ āφāĻŽāĻžāϕ⧇ āĻĢāĻžāĻ¸ā§āϟ āĻāĻŦāĻ‚ āϏāĻ™ā§āĻ—āĻŽ āĻ•āĻžāĻĢāĻ•āĻž āϏāĻŽā§āĻĒāĻ°ā§āϕ⧇ āϜāĻŋāĻœā§āĻžāĻžāϏāĻž āĻ•āϰāĻž āĻšāϝāĻŧ⧇āĻ›āĻŋāϞ (āϏāĻ™ā§āĻ—āĻŽā§‡ āĻ•āĻŋ āĻŦ⧈āĻļāĻŋāĻˇā§āĻŸā§āϝ āφāϛ⧇?) āĻĻ⧇āϖ⧇ āĻŽāύ⧇ āĻšāĻšā§āϛ⧇ āϏāĻ™ā§āĻ—āĻŽāϟāĻŋ āĻ…āύ⧇āĻ• āωāĻĒāĻžāϝāĻŧ⧇ āφāϰāĻ“ āĻ•āĻžāĻ°ā§āϝāĻ•āϰ⧀, āϤāĻŦ⧇ āϏāĻ¤ā§āϝāϟāĻŋ āĻšāϞ āϝ⧇ āĻĢāĻžāĻ¸ā§āĻŸā§‡āϰ āϏāĻ™ā§āĻ—āĻŽā§‡āϰ āϜāĻ¨ā§āϝ āϏāĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϏāĻŽāĻ°ā§āĻĨāύ āύ⧇āχ - āĻāϟāĻŋ āĻ…āύ⧁āϏāϰāĻŖ āĻ•āϰ⧇ āύāĻĨāĻŋāϤ⧇ āĻ•ā§āϞāĻžāϝāĻŧ⧇āĻ¨ā§āϟ āϏ⧀āĻŽāĻžāĻŦāĻĻā§āϧāϤāĻžāϰ āĻŦāĻ°ā§āĻŖāύāĻž.

āωāĻ¤ā§āϏ: www.habr.com