рдлреЙрд╕реНрдЯ, рднрд╛рдЧ II рд╡рд░ рдкрд╛рд░реНрд╢реНрд╡рднреВрдореА рдХрд╛рд░реНрдпреЗ: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШ

рдлреЙрд╕реНрдЯ, рднрд╛рдЧ II рд╡рд░ рдкрд╛рд░реНрд╢реНрд╡рднреВрдореА рдХрд╛рд░реНрдпреЗ: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШ

рд╕рд╛рдордЧреНрд░реА рд╕рд╛рд░рдгреА

  1. рднрд╛рдЧ I: рдкрд░рд┐рдЪрдп

  2. рднрд╛рдЧ II: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШ

рдЖрдкрдг рдЗрдереЗ рдХрд╛рдп рдХрд░рдд рдЖрд╣реЛрдд?

рддрд░, рдореНрд╣рдгреВрди, рджреБрд╕рд░рд╛ рднрд╛рдЧ. рдЖрдзреА рд▓рд┐рд╣рд┐рд▓реНрдпрд╛рдкреНрд░рдорд╛рдгреЗ, рддреНрдпрд╛рдд рдЖрдкрдг рдкреБрдвреАрд▓ рдЧреЛрд╖реНрдЯреА рдХрд░реВ:

  1. рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрд╡рд╢реНрдпрдХ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдПрдВрдбрдкреЙрдЗрдВрдЯреНрд╕рдЪреНрдпрд╛ рд╡рд┐рдирдВрддреАрд╕рд╣ aiohttp рд╡рд░ alphavantage рд╕рд╛рдареА рдПрдХ рд▓рд╣рд╛рди рдХреНрд▓рд╛рдпрдВрдЯ рд▓рд┐рд╣реВрдпрд╛.

  2. рдЪрд▓рд╛ рдПрдХ рдПрдЬрдВрдЯ рддрдпрд╛рд░ рдХрд░реВ рдЬреЛ рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬрд╡рд░реАрд▓ рдбреЗрдЯрд╛ рдЖрдгрд┐ рддреНрдпрд╛рд╡рд░реАрд▓ рдореЗрдЯрд╛ рдорд╛рд╣рд┐рддреА рдЧреЛрд│рд╛ рдХрд░реЗрд▓.

рдкрд░рдВрддреБ, рдЖрдореНрд╣реА рдкреНрд░рдХрд▓реНрдкрд╛рд╕рд╛рдареА рд╣реЗрдЪ рдХрд░рдгрд╛рд░ рдЖрд╣реЛрдд рдЖрдгрд┐ рдлрд╛рд╕реНрдЯ рд░рд┐рд╕рд░реНрдЪрдЪреНрдпрд╛ рд╕рдВрджрд░реНрднрд╛рдд, рдЖрдореНрд╣реА рдХрд╛рдлреНрдХрд╛рдордзреАрд▓ рдШрдЯрдирд╛рдВрд╡рд░ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгрд╛рд░реЗ рдПрдЬрдВрдЯ рдХрд╕реЗ рд▓рд┐рд╣рд╛рдпрдЪреЗ, рддрд╕реЗрдЪ рдЖрдордЪреНрдпрд╛ рдмрд╛рдмрддреАрдд рдХрдорд╛рдВрдбреНрд╕ (рдХреНрд▓рд┐рдХ рд░реЕрдкрд░) рдХрд╕реЗ рд▓рд┐рд╣рд╛рдпрдЪреЗ рддреЗ рд╢рд┐рдХреВ - рдПрдЬрдВрдЯ рдирд┐рд░реАрдХреНрд╖рдг рдХрд░рдд рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░ рдореЕрдиреНрдпреБрдЕрд▓ рдкреБрд╢ рд╕рдВрджреЗрд╢рд╛рдВрд╕рд╛рдареА.

рдкреНрд░рд╢рд┐рдХреНрд╖рдг

AlphaVantage рдХреНрд▓рд╛рдпрдВрдЯ

рдкреНрд░рдердо, рдЕрд▓реНрдлрд╛рд╡реНрд╣рдВрдЯреЗрдЬрдЪреНрдпрд╛ рд╡рд┐рдирдВрддреАрд╕рд╛рдареА рдПрдХ рд▓рд╣рд╛рди aiohttp рдХреНрд▓рд╛рдпрдВрдЯ рд▓рд┐рд╣реВ.

alphavantage.py

рд╕реНрдкрд╛рдпрд▓реЗрдЯрд░

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

рдкреНрд░рддреНрдпрдХреНрд╖рд╛рдд, рддреНрдпрд╛рддреВрди рд╕рд░реНрд╡ рдХрд╛рд╣реА рд╕реНрдкрд╖реНрдЯ рдЖрд╣реЗ:

  1. AlphaVantage API рдЕрдЧрджреА рд╕реЛрдкреНрдпрд╛ рдЖрдгрд┐ рд╕реБрдВрджрд░рдкрдгреЗ рдбрд┐рдЭрд╛рдЗрди рдХреЗрд▓реЗрд▓реЗ рдЖрд╣реЗ, рдореНрд╣рдгреВрди рдореА рд╕рд░реНрд╡ рд╡рд┐рдирдВрддреНрдпрд╛ рдкрджреНрдзрддреАрджреНрд╡рд╛рд░реЗ рдХрд░рдгреНрдпрд╛рдЪреЗ рдард░рд╡рд▓реЗ construct_query рдЬрд┐рдереЗ рдЙрд▓рдЯ рдПрдХ http рдХреЙрд▓ рдЖрд╣реЗ.

  2. рдореА рд╕рд░реНрд╡ рд╢реЗрддрд╛рдд рдЖрдгрддреЛ snake_case рдЖрд░рд╛рдорд╛рд╕рд╛рдареА.

  3. рдмрд░рдВ, рд╕реБрдВрджрд░ рдЖрдгрд┐ рдорд╛рд╣рд┐рддреАрдкреВрд░реНрдг рдЯреНрд░реЗрд╕рдмреЕрдХ рдЖрдЙрдЯрдкреБрдЯрд╕рд╛рдареА logger.catch рд╕рдЬрд╛рд╡рдЯ.

P.S. config.yml рд╡рд░ рд╕реНрдерд╛рдирд┐рдХрд░рд┐рддреНрдпрд╛ рдЕрд▓реНрдлрд╛рд╡реНрд╣рдВрдЯреЗрдЬ рдЯреЛрдХрди рдЬреЛрдбрдгреНрдпрд╛рд╕ рд╡рд┐рд╕рд░реВ рдирдХрд╛ рдХрд┐рдВрд╡рд╛ рдкрд░реНрдпрд╛рд╡рд░рдг рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓ рдирд┐рд░реНрдпрд╛рдд рдХрд░реВ рдирдХрд╛ HORTON_SERVICE_APIKEY. рдЖрдореНрд╣рд╛рд▓рд╛ рдПрдХ рдЯреЛрдХрди рдорд┐рд│рддреЗ рдпреЗрдереЗ.

CRUD рд╡рд░реНрдЧ

рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬрдмрджреНрджрд▓ рдореЗрдЯрд╛ рдорд╛рд╣рд┐рддреА рд╕рд╛рдард╡рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬ рд╕рдВрдЧреНрд░рд╣ рдЕрд╕реЗрд▓.

database/security.py

рдорд╛рдЭреНрдпрд╛ рдорддреЗ, рдпреЗрдереЗ рдХрд╛рд╣реАрд╣реА рд╕реНрдкрд╖реНрдЯ рдХрд░рдгреНрдпрд╛рдЪреА рдЧрд░рдЬ рдирд╛рд╣реА рдЖрдгрд┐ рдмреЗрд╕ рдХреНрд▓рд╛рд╕ рд╕реНрд╡рддрдГрдЪ рдЕрдЧрджреА рд╕реЛрдкрд╛ рдЖрд╣реЗ.

get_app()

рдордзреНрдпреЗ рдНрдкреНрд▓рд┐рдХреЗрд╢рди рдСрдмреНрдЬреЗрдХреНрдЯ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдлрдВрдХреНрд╢рди рдЬреЛрдбреВ app.py

рд╕реНрдкрд╛рдпрд▓реЗрдЯрд░

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

рдЖрддреНрддрд╛рд╕рд╛рдареА рдЖрдордЪреНрдпрд╛рдХрдбреЗ рд╕рд░реНрд╡рд╛рдд рд╕реЛрдкреА рдНрдкреНрд▓рд┐рдХреЗрд╢рди рдирд┐рд░реНрдорд┐рддреА рдЕрд╕реЗрд▓, рдереЛрдбреНрдпрд╛ рд╡реЗрд│рд╛рдиреЗ рдЖрдореНрд╣реА рддреНрдпрд╛рдЪрд╛ рд╡рд┐рд╕реНрддрд╛рд░ рдХрд░реВ, рддрдерд╛рдкрд┐, рддреБрдореНрд╣рд╛рд▓рд╛ рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░реВ рдирдпреЗ рдореНрд╣рдгреВрди, рдпреЗрдереЗ рд╕рдВрджрд░реНрдн рдЕреЕрдк-рдХреНрд▓рд╛рд╕рдордзреНрдпреЗ. рдореА рддреБрдореНрд╣рд╛рд▓рд╛ рд╕реЗрдЯрд┐рдВрдЧреНрдЬ рдХреНрд▓рд╛рд╕рд╡рд░ рдПрдХ рдирдЬрд░ рдЯрд╛рдХрдгреНрдпрд╛рдЪрд╛ рд╕рд▓реНрд▓рд╛ рджреЗрддреЛ, рдХрд╛рд░рдг рддреЗ рдмрд╣реБрддреЗрдХ рд╕реЗрдЯрд┐рдВрдЧреНрдЬрд╕рд╛рдареА рдЬрдмрд╛рдмрджрд╛рд░ рдЖрд╣реЗ.

рдореБрдЦреНрдп рд╢рд░реАрд░

рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬрдЪреА рдпрд╛рджреА рдЧреЛрд│рд╛ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдгрд┐ рджреЗрдЦрд░реЗрдЦ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдПрдЬрдВрдЯ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

рддрд░, рдкреНрд░рдердо рдЖрдореНрд╣рд╛рд▓рд╛ рдлреЙрд╕реНрдЯ рдНрдкреНрд▓рд┐рдХреЗрд╢рди рдСрдмреНрдЬреЗрдХреНрдЯ рдорд┐рд│реЗрд▓ - рд╣реЗ рдЕрдЧрджреА рд╕реЛрдкреЗ рдЖрд╣реЗ. рдкреБрдвреЗ, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдПрдЬрдВрдЯрд╕рд╛рдареА рдПрдХ рд╡рд┐рд╖рдп рд╕реНрдкрд╖реНрдЯрдкрдгреЗ рдШреЛрд╖рд┐рдд рдХрд░рддреЛ... рддреЗ рдХрд╛рдп рдЖрд╣реЗ, рдЕрдВрддрд░реНрдЧрдд рдкреЕрд░рд╛рдореАрдЯрд░ рдХрд╛рдп рдЖрд╣реЗ рдЖрдгрд┐ рд╣реЗ рд╡реЗрдЧрд│реНрдпрд╛ рдкрджреНрдзрддреАрдиреЗ рдХрд╕реЗ рдорд╛рдВрдбрд▓реЗ рдЬрд╛рдК рд╢рдХрддреЗ рд╣реЗ рдпреЗрдереЗ рдирдореВрдж рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.

  1. рдХрд╛рдлреНрдХрд╛рдордзреАрд▓ рд╡рд┐рд╖рдп, рдЬрд░ рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдЕрдЪреВрдХ рд╡реНрдпрд╛рдЦреНрдпрд╛ рдЬрд╛рдгреВрди рдШреНрдпрд╛рдпрдЪреА рдЕрд╕реЗрд▓ рддрд░ рддреЗ рд╡рд╛рдЪрдгреЗ рдЪрд╛рдВрдЧрд▓реЗ рдЖрд╣реЗ рдмрдВрдж. рджрд╕реНрддрдРрд╡рдЬ, рдХрд┐рдВрд╡рд╛ рддреБрдореНрд╣реА рд╡рд╛рдЪреВ рд╢рдХрддрд╛ рд╕рдВрдХрд▓рди рд░рд╢рд┐рдпрди рднрд╛рд╖реЗрдд Habr├й рд╡рд░, рдЬрд┐рдереЗ рд╕рд░реНрд╡рдХрд╛рд╣реА рдЕрдЧрджреА рдЕрдЪреВрдХрдкрдгреЗ рдкреНрд░рддрд┐рдмрд┐рдВрдмрд┐рдд рд╣реЛрддреЗ :)

  2. рдЕрдВрддрд░реНрдЧрдд рдкреЕрд░рд╛рдореАрдЯрд░, рдлреЙрд╕реНрдЯ рдбреЙрдХрдордзреНрдпреЗ рдЪрд╛рдВрдЧрд▓реЗ рд╡рд░реНрдгрди рдХреЗрд▓реЗ рдЖрд╣реЗ, рдЖрдореНрд╣рд╛рд▓рд╛ рдереЗрдЯ рдХреЛрдбрдордзреНрдпреЗ рд╡рд┐рд╖рдп рдХреЙрдиреНрдлрд┐рдЧрд░ рдХрд░рдгреНрдпрд╛рдЪреА рдкрд░рд╡рд╛рдирдЧреА рджреЗрддреЗ, рдЕрд░реНрдерд╛рддрдЪ, рдпрд╛рдЪрд╛ рдЕрд░реНрде рдлреЙрд╕реНрдЯ рдбреЗрд╡реНрд╣рд▓рдкрд░рджреНрд╡рд╛рд░реЗ рдкреНрд░рджрд╛рди рдХреЗрд▓реЗрд▓реЗ рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде: рдзрд╛рд░рдгрд╛, рдзрд╛рд░рдгрд╛ рдзреЛрд░рдг (рдбреАрдлреЙрд▓реНрдЯрдиреБрд╕рд╛рд░ рд╣рдЯрд╡рд╛, рдкрд░рдВрддреБ рдЖрдкрдг рд╕реЗрдЯ рдХрд░реВ рд╢рдХрддрд╛ рд╕рдВрдХреНрд╖рд┐рдкреНрдд), рдкреНрд░рддрд┐ рд╡рд┐рд╖рдп рд╡рд┐рднрд╛рдЬрдирд╛рдВрдЪреА рд╕рдВрдЦреНрдпрд╛ (рд╡рд┐рднрд╛рдЬрдиреЗрдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рдкреЗрдХреНрд╖рд╛ рдХрдореА рдЬрд╛рдЧрддрд┐рдХ рдорд╣рддреНрддреНрд╡ рдЕрдиреБрдкреНрд░рдпреЛрдЧ рдлреЙрд╕реНрдЯ).

  3. рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рдПрдЬрдВрдЯ рдЬрд╛рдЧрддрд┐рдХ рдореВрд▓реНрдпрд╛рдВрд╕рд╣ рд╡реНрдпрд╡рд╕реНрдерд╛рдкрд┐рдд рд╡рд┐рд╖рдп рддрдпрд╛рд░ рдХрд░реВ рд╢рдХрддреЛ, рддрдерд╛рдкрд┐, рдорд▓рд╛ рд╕рд░реНрд╡рдХрд╛рд╣реА рд╕реНрдкрд╖реНрдЯрдкрдгреЗ рдШреЛрд╖рд┐рдд рдХрд░рдгреЗ рдЖрд╡рдбрддреЗ. рдпрд╛рд╡реНрдпрддрд┐рд░рд┐рдХреНрдд, рдПрдЬрдВрдЯ рдЬрд╛рд╣рд┐рд░рд╛рддреАрдордзреАрд▓ рд╡рд┐рд╖рдпрд╛рдЪреЗ рдХрд╛рд╣реА рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ (рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рд╡рд┐рднрд╛рдЬрдирд╛рдВрдЪреА рд╕рдВрдЦреНрдпрд╛ рдХрд┐рдВрд╡рд╛ рдзрд╛рд░рдгрд╛ рдзреЛрд░рдг) рдХреЙрдиреНрдлрд┐рдЧрд░ рдХреЗрд▓реЗ рдЬрд╛рдК рд╢рдХрдд рдирд╛рд╣реАрдд.

    рд╡рд┐рд╖рдп рд╡реНрдпрдХреНрддрд┐рдЪрд▓рд┐рддрдкрдгреЗ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХреЗрд▓реНрдпрд╛рд╢рд┐рд╡рд╛рдп рддреЛ рдХрд╕рд╛ рджрд┐рд╕рддреЛ рддреЗ рдпреЗрдереЗ рдЖрд╣реЗ:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

рдмрд░рдВ, рдЖрддрд╛ рдЖрдордЪрд╛ рдПрдЬрдВрдЯ рдХрд╛рдп рдХрд░реЗрд▓ рдпрд╛рдЪреЗ рд╡рд░реНрдгрди рдХрд░реВрдпрд╛ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

рдореНрд╣рдгреВрди, рдПрдЬрдВрдЯрдЪреНрдпрд╛ рд╕реБрд░реБрд╡рд╛рддреАрд▓рд╛, рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рдХреНрд▓рд╛рдпрдВрдЯрджреНрд╡рд╛рд░реЗ рд╡рд┐рдирдВрддреНрдпрд╛рдВрд╕рд╛рдареА aiohttp рд╕рддреНрд░ рдЙрдШрдбрддреЛ. рдЕрд╢рд╛ рдкреНрд░рдХрд╛рд░реЗ, рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рд╕реБрд░реВ рдХрд░рддрд╛рдирд╛, рдЬреЗрд╡реНрд╣рд╛ рдЖрдордЪрд╛ рдПрдЬрдВрдЯ рд▓реЙрдиреНрдЪ рдХреЗрд▓рд╛ рдЬрд╛рдИрд▓, рддреЗрд╡реНрд╣рд╛ рд▓рдЧреЗрдЪ рдПрдХ рд╕рддреНрд░ рдЙрдШрдбрд▓реЗ рдЬрд╛рдИрд▓ - рдПрдХ, рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рд╕рдВрдкреВрд░реНрдг рд╡реЗрд│ рдЪрд╛рд▓реВ рдЕрд╕реЗрд▓ (рдХрд┐рдВрд╡рд╛ рдЕрдиреЗрдХ, рдЬрд░ рддреБрдореНрд╣реА рдкреЕрд░рд╛рдореАрдЯрд░ рдмрджрд▓рд▓рд╛ рдЕрд╕реЗрд▓ рддрд░ рд╕рд╣рдорддреА рдбреАрдлреЙрд▓реНрдЯ рдпреБрдирд┐рдЯ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдПрдЬрдВрдЯрдХрдбреВрди).

рдкреБрдвреЗ, рдЖрдореНрд╣реА рдкреНрд░рд╡рд╛рд╣рд╛рдЪреЗ рдЕрдиреБрд╕рд░рдг рдХрд░рддреЛ (рдЖрдореНрд╣реА рд╕рдВрджреЗрд╢ рддреНрдпрд╛рдд рдареЗрд╡рддреЛ _, рдЖрдореНрд╣реА, рдпрд╛ рдПрдЬрдВрдЯрдордзреНрдпреЗ, рдЖрдордЪреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░реАрд▓ рд╕рдВрджреЗрд╢рд╛рдВрдЪреНрдпрд╛ рд╕рд╛рдордЧреНрд░реАрдЪреА рдХрд╛рд│рдЬреА рдШреЗрдд рдирд╛рд╣реА), рдЬрд░ рддреЗ рд╕рдзреНрдпрд╛рдЪреНрдпрд╛ рдСрдлрд╕реЗрдЯрдордзреНрдпреЗ рдЕрд╕реНрддрд┐рддреНрд╡рд╛рдд рдЕрд╕рддреАрд▓, рдЕрдиреНрдпрдерд╛ рдЖрдордЪреЗ рдЪрдХреНрд░ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдЖрдЧрдордирд╛рдЪреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░реЗрд▓. рдмрд░рдВ, рдЖрдордЪреНрдпрд╛ рд▓реВрдкрдордзреНрдпреЗ, рдЖрдореНрд╣реА рд╕рдВрджреЗрд╢рд╛рдЪреА рдкрд╛рд╡рддреА рд▓реЙрдЧ рдХрд░рддреЛ, рд╕рдХреНрд░рд┐рдп (get_securities рд░рд┐рдЯрд░реНрди рдХреЗрд╡рд│ рдбреАрдлреЙрд▓реНрдЯрдиреБрд╕рд╛рд░ рд╕рдХреНрд░рд┐рдп рдХрд░рддреЛ, рдХреНрд▓рд╛рдпрдВрдЯ рдХреЛрдб рдкрд╣рд╛) рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬрдЪреА рдпрд╛рджреА рдорд┐рд│рд╡рддреЛ рдЖрдгрд┐ рдбреЗрдЯрд╛рдмреЗрд╕рдордзреНрдпреЗ рд╕реЗрд╡реНрд╣ рдХрд░рддреЛ, рддреНрдпрд╛рдЪ рдЯрд┐рдХрд░рд╕рд╣ рд╕реБрд░рдХреНрд╖рд╛ рдЖрд╣реЗ рдХрд╛ рддреЗ рддрдкрд╛рд╕рддреЛ рдЖрдгрд┐ рдбреЗрдЯрд╛рдмреЗрд╕рдордзреНрдпреЗ рдПрдХреНрд╕рдЪреЗрдВрдЬ , рдЬрд░ рддреЗрдереЗ рдЕрд╕реЗрд▓ рддрд░ рддреЗ (рдкреЗрдкрд░) рдлрдХреНрдд рдЕрджреНрдпрддрдирд┐рдд рдХреЗрд▓реЗ рдЬрд╛рдИрд▓.

рдЪрд▓рд╛ рдЖрдордЪреА рдирд┐рд░реНрдорд┐рддреА рд╕реБрд░реВ рдХрд░реВрдпрд╛!

> docker-compose up -d
... ╨Ч╨░╨┐╤Г╤Б╨║ ╨║╨╛╨╜╤В╨╡╨╣╨╜╨╡╤А╨╛╨▓ ...
> faust -A horton.agents worker --without-web -l info

P.S. рд╢рдХреНрдпрддрд╛ рд╡реЗрдм рдШрдЯрдХ рдореА рд▓реЗрдЦрд╛рдВрдордзреНрдпреЗ рдлреЙрд╕реНрдЯрдЪрд╛ рд╡рд┐рдЪрд╛рд░ рдХрд░рдгрд╛рд░ рдирд╛рд╣реА, рдореНрд╣рдгреВрди рдЖрдореНрд╣реА рдпреЛрдЧреНрдп рдзреНрд╡рдЬ рд╕реЗрдЯ рдХреЗрд▓рд╛.

рдЖрдордЪреНрдпрд╛ рд▓реЙрдиреНрдЪ рдХрдорд╛рдВрдбрдордзреНрдпреЗ, рдЖрдореНрд╣реА рдорд╛рд╣рд┐рддреА рд▓реЙрдЧ рдЖрдЙрдЯрдкреБрдЯ рд▓реЗрд╡реНрд╣рд▓рд╕рд╣ рдНрдкреНрд▓рд┐рдХреЗрд╢рди рдСрдмреНрдЬреЗрдХреНрдЯ рдХреБрдареЗ рд╢реЛрдзрд╛рдпрдЪреЗ рдЖрдгрд┐ рддреНрдпрд╛рдЪреЗ рдХрд╛рдп рдХрд░рд╛рдпрдЪреЗ (рд╡рд░реНрдХрд░ рд▓рд╛рдБрдЪ рдХрд░рд╛) рд╕рд╛рдВрдЧрд┐рддрд▓реЗ. рдЖрдореНрд╣рд╛рд▓рд╛ рдЦрд╛рд▓реАрд▓ рдЖрдЙрдЯрдкреБрдЯ рдорд┐рд│рддреЗ:

рд╕реНрдкрд╛рдпрд▓реЗрдЯрд░

тФМ╞Тa┬╡SтАа v1.10.4тФмтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФР
тФВ id          тФВ horton                                            тФВ
тФВ transport   тФВ [URL('kafka://localhost:9092')]                   тФВ
тФВ store       тФВ memory:                                           тФВ
тФВ log         тФВ -stderr- (info)                                   тФВ
тФВ pid         тФВ 1271262                                           тФВ
тФВ hostname    тФВ host-name                                         тФВ
тФВ platform    тФВ CPython 3.8.2 (Linux x86_64)                      тФВ
тФВ drivers     тФВ                                                   тФВ
тФВ   transport тФВ aiokafka=1.1.6                                    тФВ
тФВ   web       тФВ aiohttp=3.6.2                                     тФВ
тФВ datadir     тФВ /path/to/project/horton-data                      тФВ
тФВ appdir      тФВ /path/to/project/horton-data/v1                   тФВ
тФФтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ┤тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФШ
... ╨╗╨╛╨│╨╕, ╨╗╨╛╨│╨╕, ╨╗╨╛╨│╨╕ ...

тФМTopic Partition SetтФАтФАтФАтФАтФАтФАтФАтФАтФАтФмтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФР
тФВ topic                      тФВ partitions тФВ
тФЬтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ╝тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФд
тФВ collect_securities         тФВ {0-7}      тФВ
тФВ horton-__assignor-__leader тФВ {0}        тФВ
тФФтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ┤тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФШ 

рддреЗ рдЬрд┐рд╡рдВрдд рдЖрд╣реЗ!!!

рдЪрд▓рд╛ рд╡рд┐рднрд╛рдЬрди рд╕рдВрдЪ рдкрд╛рд╣реВ. рдЬрд╕реЗ рдЖрдкрдг рдкрд╛рд╣реВ рд╢рдХрддреЛ, рдЖрдореНрд╣реА рдХреЛрдбрдордзреНрдпреЗ рдирд┐рдпреБрдХреНрдд рдХреЗрд▓реЗрд▓реНрдпрд╛ рдирд╛рд╡рд╛рд╕рд╣ рдПрдХ рд╡рд┐рд╖рдп рддрдпрд╛рд░ рдХреЗрд▓рд╛ рдЧреЗрд▓рд╛ рдЖрд╣реЗ, рд╡рд┐рднрд╛рдЬрдирд╛рдВрдЪреА рдбреАрдлреЙрд▓реНрдЯ рд╕рдВрдЦреНрдпрд╛ (8, рд╡рд░реВрди рдШреЗрддрд▓реЗрд▓реА topic_partitions - рдНрдкреНрд▓рд┐рдХреЗрд╢рди рдСрдмреНрдЬреЗрдХреНрдЯ рдкреЕрд░рд╛рдореАрдЯрд░), рдХрд╛рд░рдг рдЖрдореНрд╣реА рдЖрдордЪреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╕рд╛рдареА (рд╡рд┐рднрд╛рдЬрдирд╛рдВрджреНрд╡рд╛рд░реЗ) рд╡реИрдпрдХреНрддрд┐рдХ рдореВрд▓реНрдп рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХреЗрд▓реЗ рдирд╛рд╣реА. рд╡рд░реНрдХрд░рдордзреНрдпреЗ рд▓реЙрдиреНрдЪ рдХреЗрд▓реЗрд▓реНрдпрд╛ рдПрдЬрдВрдЯрд▓рд╛ рд╕рд░реНрд╡ 8 рд╡рд┐рднрд╛рдЬрдиреЗ рдирд┐рдпреБрдХреНрдд рдХреЗрд▓реА рдЧреЗрд▓реА рдЖрд╣реЗрдд, рдХрд╛рд░рдг рддреА рдПрдХрдЪ рдЖрд╣реЗ, рдкрд░рдВрддреБ рдХреНрд▓рд╕реНрдЯрд░рд┐рдВрдЧрдЪреНрдпрд╛ рднрд╛рдЧрд╛рдордзреНрдпреЗ рдпрд╛рдмрджреНрджрд▓ рдЕрдзрд┐рдХ рддрдкрд╢реАрд▓рд╡рд╛рд░ рдЪрд░реНрдЪрд╛ рдХреЗрд▓реА рдЬрд╛рдИрд▓.

рдмрд░рдВ, рдЖрддрд╛ рдЖрдкрдг рджреБрд╕рд░реНтАНрдпрд╛ рдЯрд░реНрдорд┐рдирд▓ рд╡рд┐рдВрдбреЛрд╡рд░ рдЬрд╛рдК рд╢рдХрддреЛ рдЖрдгрд┐ рдЖрдордЪреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░ рд░рд┐рдХрд╛рдорд╛ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡реВ рд╢рдХрддреЛ:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. рд╡рд╛рдкрд░реВрди @ рдЖрдореНрд╣реА рджрд╛рдЦрд╡рддреЛ рдХреА рдЖрдореНрд╣реА тАЬcollect_securitiesтАЭ рдирд╛рд╡рд╛рдЪреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд▓рд╛ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рдд рдЖрд╣реЛрдд.

рдпрд╛ рдкреНрд░рдХрд░рдгрд╛рдд, рд╕рдВрджреЗрд╢ рд╡рд┐рднрд╛рдЬрди 6 рд╡рд░ рдЧреЗрд▓рд╛ - рддреБрдореНрд╣реА kafdrop on рд╡рд░ рдЬрд╛рдКрди рд╣реЗ рддрдкрд╛рд╕реВ рд╢рдХрддрд╛ localhost:9000

рдЖрдордЪреНрдпрд╛ рдХрд╛рд░реНрдпрдХрд░реНрддреНрдпрд╛рд╕рд╣ рдЯрд░реНрдорд┐рдирд▓ рд╡рд┐рдВрдбреЛрд╡рд░ рдЧреЗрд▓реНрдпрд╛рд╡рд░, рдЖрдореНрд╣рд╛рд▓рд╛ loguru рд╡рд╛рдкрд░реВрди рдкрд╛рдард╡рд▓реЗрд▓рд╛ рдЖрдирдВрджрд╛рдЪрд╛ рд╕рдВрджреЗрд╢ рджрд┐рд╕реЗрд▓:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

рдЖрдореНрд╣реА рдореЛрдВрдЧреЛ (Robo3T рдХрд┐рдВрд╡рд╛ Studio3T рд╡рд╛рдкрд░реВрди) рдордзреНрдпреЗ рджреЗрдЦреАрд▓ рдкрд╛рд╣реВ рд╢рдХрддреЛ рдЖрдгрд┐ рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬ рдбреЗрдЯрд╛рдмреЗрд╕рдордзреНрдпреЗ рдЕрд╕рд▓реНрдпрд╛рдЪреЗ рдкрд╛рд╣реВ рд╢рдХрддреЛ:

рдореА рдЕрдмреНрдЬрд╛рдзреАрд╢ рдирд╛рд╣реА рдЖрдгрд┐ рдореНрд╣рдгреВрди рдЖрдореНрд╣реА рдкрд╣рд┐рд▓реНрдпрд╛ рдкрд╛рд╣рдгреНрдпрд╛рдЪреНрдпрд╛ рдкрд░реНрдпрд╛рдпрд╛рд╡рд░ рд╕рдорд╛рдзрд╛рдиреА рдЖрд╣реЛрдд.

рдлреЙрд╕реНрдЯ, рднрд╛рдЧ II рд╡рд░ рдкрд╛рд░реНрд╢реНрд╡рднреВрдореА рдХрд╛рд░реНрдпреЗ: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШрдлреЙрд╕реНрдЯ, рднрд╛рдЧ II рд╡рд░ рдкрд╛рд░реНрд╢реНрд╡рднреВрдореА рдХрд╛рд░реНрдпреЗ: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШ

рдЖрдирдВрдж рдЖрдгрд┐ рдЖрдирдВрдж - рдкрд╣рд┐рд▓рд╛ рдПрдЬрдВрдЯ рддрдпрд╛рд░ рдЖрд╣реЗ :)

рдПрдЬрдВрдЯ рддрдпрд╛рд░, рдирд╡реАрди рдПрдЬрдВрдЯ рдЪрд┐рд░рдВрдЬреАрд╡!

рд╣реЛрдп, рд╕рдЬреНрдЬрдирд╛рдВрдиреЛ, рдЖрдореНрд╣реА рдпрд╛ рд▓реЗрдЦрд╛рджреНрд╡рд╛рд░реЗ рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реНрдпрд╛ рдорд╛рд░реНрдЧрд╛рдЪрд╛ рдлрдХреНрдд 1/3 рдХрд╡реНрд╣рд░ рдХреЗрд▓рд╛ рдЖрд╣реЗ, рдкрд░рдВрддреБ рдирд┐рд░рд╛рд╢ рд╣реЛрдК рдирдХрд╛, рдХрд╛рд░рдг рдЖрддрд╛ рддреЗ рд╕реЛрдкреЗ рд╣реЛрдИрд▓.

рддреНрдпрд╛рдореБрд│реЗ рдЖрддрд╛ рдЖрдореНрд╣рд╛рд▓рд╛ рдПрдХрд╛ рдПрдЬрдВрдЯрдЪреА рдЧрд░рдЬ рдЖрд╣реЗ рдЬреЛ рдореЗрдЯрд╛ рдорд╛рд╣рд┐рддреА рд╕рдВрдХрд▓рд┐рдд рдХрд░рддреЛ рдЖрдгрд┐ рд╕рдВрдЧреНрд░рд╣ рджрд╕реНрддрдРрд╡рдЬрд╛рдд рдареЗрд╡рддреЛ:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

рд╣рд╛ рдПрдЬрдВрдЯ рд╡рд┐рд╢рд┐рд╖реНрдЯ рд╕реБрд░рдХреНрд╖рд┐рддрддреЗрдмрджреНрджрд▓ рдорд╛рд╣рд┐рддреАрд╡рд░ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд░рдгрд╛рд░ рдЕрд╕рд▓реНрдпрд╛рдиреЗ, рдЖрдореНрд╣рд╛рд▓рд╛ рд╕рдВрджреЗрд╢рд╛рдордзреНрдпреЗ рдпрд╛ рд╕реБрд░рдХреНрд╖рд┐рддрддреЗрдЪреЗ рдЯрд┐рдХрд░ (рдЪрд┐рдиреНрд╣) рд╕реВрдЪрд┐рдд рдХрд░рд╛рд╡реЗ рд▓рд╛рдЧреЗрд▓. рдлреЙрд╕реНрдЯ рдордзреНрдпреЗ рдпрд╛ рдЙрджреНрджреЗрд╢рд╛рд╕рд╛рдареА рдЖрд╣реЗрдд рд░реЗрдХреЙрд░реНрдб тАФ рд╡рд░реНрдЧ рдЬреЗ рдПрдЬрдВрдЯ рд╡рд┐рд╖рдпрд╛рдордзреНрдпреЗ рд╕рдВрджреЗрд╢ рдпреЛрдЬрдирд╛ рдШреЛрд╖рд┐рдд рдХрд░рддрд╛рдд.

рдпрд╛ рдкреНрд░рдХрд░рдгрд╛рдд, рдЪрд▓рд╛ рдЬрд╛рдКрдпрд╛ records.py рдЖрдгрд┐ рдпрд╛ рд╡рд┐рд╖рдпрд╛рд╕рд╛рдареА рд╕рдВрджреЗрд╢ рдХрд╕рд╛ рдЕрд╕рд╛рд╡рд╛ рдпрд╛рдЪреЗ рд╡рд░реНрдгрди рдХрд░рд╛:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

рддреБрдореНрд╣реА рдЕрдВрджрд╛рдЬ рдХреЗрд▓рд╛ рдЕрд╕реЗрд▓, рдлреЙрд╕реНрдЯ рдореЗрд╕реЗрдЬ рд╕реНрдХреАрдорд╛рдЪреЗ рд╡рд░реНрдгрди рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдкрд╛рдпрдерди рдкреНрд░рдХрд╛рд░ рднрд╛рд╖реНрдп рд╡рд╛рдкрд░рддреЗ, рдореНрд╣рдгреВрдирдЪ рд▓рд╛рдпрдмреНрд░рд░реАрджреНрд╡рд╛рд░реЗ рд╕рдорд░реНрдерд┐рдд рдХрд┐рдорд╛рди рдЖрд╡реГрддреНрддреА рдЖрд╣реЗ 3.6.

рдЪрд▓рд╛ рдПрдЬрдВрдЯрдХрдбреЗ рдкрд░рдд рдпрд╛, рдкреНрд░рдХрд╛рд░ рд╕реЗрдЯ рдХрд░рд╛ рдЖрдгрд┐ рддреЗ рдЬреЛрдбрд╛:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

рддреБрдореНрд╣реА рдмрдШреВ рд╢рдХрддрд╛, рдЖрдореНрд╣реА рд╡рд┐рд╖рдп рдЖрд░рдВрдн рдХрд░рдгреНрдпрд╛рдЪреНрдпрд╛ рдкрджреНрдзрддреАрд╕рд╛рдареА рд╕реНрдХреАрдорд╕рд╣ рдПрдХ рдирд╡реАрди рдкреЕрд░рд╛рдореАрдЯрд░ рдкрд╛рд╕ рдХрд░рддреЛ - value_type. рдкреБрдвреЗ, рд╕рд░реНрд╡ рдХрд╛рд╣реА рд╕рдорд╛рди рдпреЛрдЬрдиреЗрдЪреЗ рдЕрдиреБрд╕рд░рдг рдХрд░рддреЗ, рдореНрд╣рдгреВрди рдорд▓рд╛ рдЗрддрд░ рдХрд╢рд╛рд╡рд░рд╣реА рд╡рд┐рдЪрд╛рд░ рдХрд░рдгреНрдпрд╛рдд рдХрд╛рд╣реА рдЕрд░реНрде рджрд┐рд╕рдд рдирд╛рд╣реА.

рдмрд░рдВ, рдЕрдВрддрд┐рдо рд╕реНрдкрд░реНрд╢ рдореНрд╣рдгрдЬреЗ рдореЗрдЯрд╛ рдорд╛рд╣рд┐рддреА рд╕рдВрдХрд▓рди рдПрдЬрдВрдЯрд▓рд╛ collect_securitites рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдХреЙрд▓ рдЬреЛрдбрдгреЗ:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

рдЖрдореНрд╣реА рд╕рдВрджреЗрд╢рд╛рд╕рд╛рдареА рдкреВрд░реНрд╡реА рдШреЛрд╖рд┐рдд рдХреЗрд▓реЗрд▓реА рдпреЛрдЬрдирд╛ рд╡рд╛рдкрд░рддреЛ. рдпрд╛ рдкреНрд░рдХрд░рдгрд╛рдд, рдореА .cast рдкрджреНрдзрдд рд╡рд╛рдкрд░рд▓реА рдХрд╛рд░рдг рдЖрдореНрд╣рд╛рд▓рд╛ рдПрдЬрдВрдЯрдЪреНрдпрд╛ рдирд┐рдХрд╛рд▓рд╛рдЪреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рдгреНрдпрд╛рдЪреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдирд╛рд╣реА, рдкрд░рдВрддреБ рд╣реЗ рд▓рдХреНрд╖рд╛рдд рдШреЗрдгреНрдпрд╛рд╕рд╛рд░рдЦреЗ рдЖрд╣реЗ рдорд╛рд░реНрдЧ рд╡рд┐рд╖рдпрд╛рд╡рд░ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд╛:

  1. рдХрд╛рд╕реНрдЯ - рдЕрд╡рд░реЛрдзрд┐рдд рдХрд░рдд рдирд╛рд╣реА рдХрд╛рд░рдг рддреЗ рдкрд░рд┐рдгрд╛рдорд╛рдЪреА рдЕрдкреЗрдХреНрд╖рд╛ рдХрд░рдд рдирд╛рд╣реА. рддреБрдореНрд╣реА рдирд┐рдХрд╛рд▓ рджреБрд╕рд▒реНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░ рд╕рдВрджреЗрд╢ рдореНрд╣рдгреВрди рдкрд╛рдард╡реВ рд╢рдХрдд рдирд╛рд╣реА.

  2. рдкрд╛рдард╡рд╛ - рдЕрд╡рд░реЛрдзрд┐рдд рдХрд░рдд рдирд╛рд╣реА рдХрд╛рд░рдг рддреЗ рдкрд░рд┐рдгрд╛рдорд╛рдЪреА рдЕрдкреЗрдХреНрд╖рд╛ рдХрд░рдд рдирд╛рд╣реА. рдирд┐рдХрд╛рд▓ рдЬреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░ рдЬрд╛рдИрд▓ рддреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рдордзреНрдпреЗ рддреБрдореНрд╣реА рдПрдЬрдВрдЯ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реВ рд╢рдХрддрд╛.

  3. рд╡рд┐рдЪрд╛рд░рд╛ - рдирд┐рдХрд╛рд▓рд╛рдЪреА рд╡рд╛рдЯ рдкрд╣рд╛. рдирд┐рдХрд╛рд▓ рдЬреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рд╡рд░ рдЬрд╛рдИрд▓ рддреНрдпрд╛ рд╡рд┐рд╖рдпрд╛рдордзреНрдпреЗ рддреБрдореНрд╣реА рдПрдЬрдВрдЯ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реВ рд╢рдХрддрд╛.

рддрд░, рдЖрдЬ рдПрдЬрдВрдЯреНрд╕рд╕рд╛рдареА рдПрд╡рдвреЗрдЪ!

рдбреНрд░реАрдо рдЯреАрдо

рдореА рдпрд╛ рднрд╛рдЧрд╛рдд рд▓рд┐рд╣рд┐рдгреНрдпрд╛рдЪреЗ рд╡рдЪрди рджрд┐рд▓реЗрд▓реА рд╢реЗрд╡рдЯрдЪреА рдЧреЛрд╖реНрдЯ рдореНрд╣рдгрдЬреЗ рдЖрдЬреНрдЮрд╛. рдЖрдзреА рд╕рд╛рдВрдЧрд┐рддрд▓реНрдпрд╛рдкреНрд░рдорд╛рдгреЗ, рдлреЙрд╕реНрдЯ рдордзреАрд▓ рдХрдорд╛рдВрдб рдХреНрд▓рд┐рдХрдЪреНрдпрд╛ рдЖрд╕рдкрд╛рд╕ рд░реЕрдкрд░ рдЕрд╕рддрд╛рдд. рдЦрд░рдВ рддрд░, -A рдХреА рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рддрд╛рдирд╛ рдлреЙрд╕реНрдЯ рдЖрдордЪреА рдХрд╕реНрдЯрдо рдХрдорд╛рдВрдб рддреНрдпрд╛рдЪреНрдпрд╛ рдЗрдВрдЯрд░рдлреЗрд╕рд▓рд╛ рдЬреЛрдбрддреЗ

рдордзреНрдпреЗ рдШреЛрд╖рд┐рдд рдПрдЬрдВрдЯреНрд╕ рдирдВрддрд░ agents.py рдбреЗрдХреЛрд░реЗрдЯрд░рд╕рд╣ рдлрдВрдХреНрд╢рди рдЬреЛрдбрд╛ app.commandрдкрджреНрдзрдд рдХреЙрд▓ рдХрд░рдгреЗ рдЯрд╛рдХрд▓реЗ ╤Г рдЬрдорд╛_рд╕реБрд░рдХреНрд╖рд╛:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

рдЕрд╢рд╛ рдкреНрд░рдХрд╛рд░реЗ, рдЬрд░ рдЖрдкрдг рдЖрдЬреНрдЮрд╛рдВрдЪреА рдпрд╛рджреА рдХреЙрд▓ рдХреЗрд▓реА рддрд░ рдЖрдордЪреА рдирд╡реАрди рдХрдорд╛рдВрдб рддреНрдпрд╛рдд рдЕрд╕реЗрд▓:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

рдЖрдореНрд╣реА рддреЗ рдЗрддрд░рд╛рдВрдкреНрд░рдорд╛рдгреЗ рд╡рд╛рдкрд░реВ рд╢рдХрддреЛ, рдореНрд╣рдгреВрди рдЪрд▓рд╛ рдлреЙрд╕реНрдЯ рд╡рд░реНрдХрд░ рд░реАрд╕реНрдЯрд╛рд░реНрдЯ рдХрд░реВ рдЖрдгрд┐ рд╕рд┐рдХреНрдпреБрд░рд┐рдЯреАрдЬрдЪрд╛ рдкреВрд░реНрдг рд╕рдВрдЧреНрд░рд╣ рд╕реБрд░реВ рдХрд░реВ:

> faust -A horton.agents start-collect-securities

рдкреБрдвреЗ рдХрд╛рдп рд╣реЛрдгрд╛рд░?

рдкреБрдвреАрд▓ рднрд╛рдЧрд╛рдд, рдЙрд░реНрд╡рд░рд┐рдд рдПрдЬрдВрдЯреНрд╕рдЪрд╛ рдЙрджрд╛рд╣рд░рдг рдореНрд╣рдгреВрди рд╡рд╛рдкрд░ рдХрд░реВрди, рдЖрдореНрд╣реА рд╡рд░реНрд╖рднрд░рд╛рддреАрд▓ рдЯреНрд░реЗрдбрд┐рдВрдЧрдЪреНрдпрд╛ рдмрдВрдж рдХрд┐рдВрдорддреА рдЖрдгрд┐ рдПрдЬрдВрдЯреНрд╕рдЪреНрдпрд╛ рдХреНрд░реЙрди рд▓реЙрдиреНрдЪрдордзреНрдпреЗ рдЯреЛрдХрд╛рдЪрд╛ рд╢реЛрдз рдШреЗрдгреНрдпрд╛рд╕рд╛рдареА рд╕рд┐рдВрдХ рдпрдВрддреНрд░рдгрд╛ рд╡рд┐рдЪрд╛рд░рд╛рдд рдШреЗрдК.

рдЖрдЬрд╕рд╛рдареА рдПрд╡рдвреЗрдЪ! рд╡рд╛рдЪрд▓реНрдпрд╛рдмрджреНрджрд▓ рдзрдиреНрдпрд╡рд╛рдж :)

рдпрд╛ рднрд╛рдЧрд╛рд╕рд╛рдареА рдХреЛрдб

рдлреЙрд╕реНрдЯ, рднрд╛рдЧ II рд╡рд░ рдкрд╛рд░реНрд╢реНрд╡рднреВрдореА рдХрд╛рд░реНрдпреЗ: рдПрдЬрдВрдЯ рдЖрдгрд┐ рд╕рдВрдШ

P.S. рд╢реЗрд╡рдЯрдЪреНрдпрд╛ рднрд╛рдЧрд╛рдЦрд╛рд▓реА рдорд▓рд╛ рдлрд╛рд╕реНрдЯ рдЖрдгрд┐ рдХрдВрдлреНрд▓реБрдПрдВрдЯ рдХрд╛рдлреНрдХрд╛ (рд╕рдВрдЧрдорд╛рдд рдХреЛрдгрддреА рд╡реИрд╢рд┐рд╖реНрдЯреНрдпреЗ рдЖрд╣реЗрдд?). рдЕрд╕реЗ рджрд┐рд╕рддреЗ рдХреА confluent рдЕрдиреЗрдХ рдорд╛рд░реНрдЧрд╛рдВрдиреА рдЕрдзрд┐рдХ рдХрд╛рд░реНрдпрд╢реАрд▓ рдЖрд╣реЗ, рдкрд░рдВрддреБ рд╡рд╕реНрддреБрд╕реНрдерд┐рддреА рдЕрд╢реА рдЖрд╣реЗ рдХреА рдлреЙрд╕реНрдЯрд▓рд╛ confluent рд╕рд╛рдареА рдкреВрд░реНрдг рдХреНрд▓рд╛рдпрдВрдЯ рд╕рдорд░реНрдерди рдирд╛рд╣реА - рд╣реЗ рдЦрд╛рд▓реАрд▓ рдкреНрд░рдорд╛рдгреЗ рдЖрд╣реЗ рджрд╕реНрддрдРрд╡рдЬрд╛рддреАрд▓ рдХреНрд▓рд╛рдпрдВрдЯ рдирд┐рд░реНрдмрдВрдзрд╛рдВрдЪреЗ рд╡рд░реНрдгрди.

рд╕реНрддреНрд░реЛрдд: www.habr.com

рдПрдХ рдЯрд┐рдкреНрдкрдгреА рдЬреЛрдбрд╛