Faust рдорд╛ рдкреГрд╖реНрдарднреВрдорд┐ рдХрд╛рд░реНрдпрд╣рд░реВ, рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВ

Faust рдорд╛ рдкреГрд╖реНрдарднреВрдорд┐ рдХрд╛рд░реНрдпрд╣рд░реВ, рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВ

рд╕рд╛рдордЧреНрд░реАрдХреЛ рддрд╛рд▓рд┐рдХрд╛

  1. рднрд╛рдЧ I: рдкрд░рд┐рдЪрдп

  2. рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВ

рд╣рд╛рдореА рдпрд╣рд╛рдБ рдХреЗ рдЧрд░реНрджреИрдЫреМрдВ?

рддреНрдпрд╕реИрд▓реЗ, рджреЛрд╕реНрд░реЛ рднрд╛рдЧред рдкрд╣рд┐рд▓реЗ рд▓реЗрдЦрд┐рдП рдЬрд╕реНрддреИ, рдпрд╕рдорд╛ рд╣рд╛рдореА рдирд┐рдореНрди рдЧрд░реНрдиреЗрдЫреМрдВ:

  1. рд╣рд╛рдореАрд▓рд╛рдИ рдЪрд╛рд╣рд┐рдиреЗ рдЕрдиреНрддрд┐рдо рдмрд┐рдиреНрджреБрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐ рдЕрдиреБрд░реЛрдзрд╣рд░реВрдХреЛ рд╕рд╛рде aiohttp рдорд╛ alphavantage рдХреЛ рд▓рд╛рдЧрд┐ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ рдЧреНрд░рд╛рд╣рдХ рд▓реЗрдЦреМрдВред

  2. рдПрдХ рдПрдЬреЗрдиреНрдЯ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реМрдВ рдЬрд╕рд▓реЗ рдзрд┐рддреЛрдорд╛ рдбрд╛рдЯрд╛ рд░ рддрд┐рдиреАрд╣рд░реВрдорд╛ рдореЗрдЯрд╛ рдЬрд╛рдирдХрд╛рд░реА рд╕рдЩреНрдХрд▓рди рдЧрд░реНрдиреЗрдЫред

рддрд░, рдпреЛ рд╣рд╛рдореА рдкрд░рд┐рдпреЛрдЬрдирд╛рдХреЛ рд▓рд╛рдЧрд┐ рдЖрдлреИрдВ рдЧрд░реНрдиреЗрдЫреМрдВ, рд░ рдлрд╕реНрдЯ рдЕрдиреБрд╕рдиреНрдзрд╛рдирдХреЛ рд╕рдиреНрджрд░реНрднрдорд╛, рд╣рд╛рдореА рдХрд╛рдлреНрдХрд╛рдмрд╛рдЯ рдШрдЯрдирд╛рд╣рд░реВ рд╕реНрдЯреНрд░рд┐рдо рдЧрд░реНрдиреЗ рдПрдЬреЗрдиреНрдЯрд╣рд░реВ рдХрд╕рд░реА рд▓реЗрдЦреНрдиреЗ, рд╕рд╛рдереИ рдХрд╕рд░реА рдЖрджреЗрд╢рд╣рд░реВ рд▓реЗрдЦреНрдиреЗ (рдХреНрд▓рд┐рдХ рд░реИрдкрд░), рд╣рд╛рдореНрд░реЛ рдорд╛рдорд▓рд╛рдорд╛ - рдПрдЬреЗрдиреНрдЯрд▓реЗ рдирд┐рдЧрд░рд╛рдиреА рдЧрд░рд┐рд░рд╣реЗрдХреЛ рд╡рд┐рд╖рдпрдорд╛ рдореНрдпрд╛рдиреБрдЕрд▓ рдкреБрд╢ рд╕рдиреНрджреЗрд╢рд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐ред

рдкреНрд░рд╢рд┐рдХреНрд╖рдг

AlphaVantage рдЧреНрд░рд╛рд╣рдХ

рдкрд╣рд┐рд▓реЗ, alphavantage рдорд╛ рдЕрдиреБрд░реЛрдзрд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рдПрдЙрдЯрд╛ рд╕рд╛рдиреЛ aiohttp рдХреНрд▓рд╛рдЗрдиреНрдЯ рд▓реЗрдЦреМрдВред

alphavantage.py

рдмрд┐рдШреНрдирдХрд░реНрддрд╛

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

рд╡рд╛рд╕реНрддрд╡рдорд╛, рдпрд╕рдмрд╛рдЯ рд╕рдмреИ рдХреБрд░рд╛ рд╕реНрдкрд╖реНрдЯ рдЫ:

  1. AlphaVantage API рдПрдХрджрдо рд╕рд░рд▓ рд░ рд╕реБрдиреНрджрд░ рдврдВрдЧрд▓реЗ рдбрд┐рдЬрд╛рдЗрди рдЧрд░рд┐рдПрдХреЛ рдЫ, рддреНрдпрд╕реИрд▓реЗ рдореИрд▓реЗ рд╡рд┐рдзрд┐ рдорд╛рд░реНрдлрдд рд╕рдмреИ рдЕрдиреБрд░реЛрдзрд╣рд░реВ рдЧрд░реНрдиреЗ рдирд┐рд░реНрдгрдп рдЧрд░реЗрдВ construct_query рдЬрд╣рд╛рдБ рдмрд╛рд░реАрдорд╛ рдПрдХ http рдХрд▓ рдЫред

  2. рдо рд╕рдмреИ рдХреНрд╖реЗрддреНрд░рд╣рд░реВ рд▓реНрдпрд╛рдЙрдБрдЫреБ snake_case рд╕реБрд╡рд┐рдзрд╛ рдХреЛ рд▓рд╛рдЧреАред

  3. рдЦреИрд░, рд╕реБрдиреНрджрд░ рд░ рдЬрд╛рдирдХрд╛рд░реАрдореВрд▓рдХ рдЯреНрд░реЗрд╕рдмреНрдпрд╛рдХ рдЖрдЙрдЯрдкреБрдЯрдХреЛ рд▓рд╛рдЧрд┐ logger.catch рд╕рдЬрд╛рд╡рдЯред

PS рд╕реНрдерд╛рдиреАрдп рд░реВрдкрдорд╛ config.yml рдорд╛ alphavantage рдЯреЛрдХрди рдердкреНрди рдирдмрд┐рд░реНрд╕рдиреБрд╣реЛрд╕реН, рд╡рд╛ рд╡рд╛рддрд╛рд╡рд░рдг рдЪрд░ рдирд┐рд░реНрдпрд╛рдд рдЧрд░реНрдиреБрд╣реЛрд╕реНред HORTON_SERVICE_APIKEYред рд╣рд╛рдореА рдПрдЙрдЯрд╛ рдЯреЛрдХрди рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдЫреМрдВ рдпрд╣рд╛рдБ.

CRUD рд╡рд░реНрдЧ

рд╣рд╛рдореАрд╕рдБрдЧ рдзрд┐рддреЛрдкрддреНрд░рд╣рд░реВрдХреЛ рдмрд╛рд░реЗрдорд╛ рдореЗрдЯрд╛ рдЬрд╛рдирдХрд╛рд░реА рднрдгреНрдбрд╛рд░рдг рдЧрд░реНрди рдзрд┐рддреЛрд╣рд░реВ рд╕рдВрдЧреНрд░рд╣ рд╣реБрдиреЗрдЫред

database/security.py

рдореЗрд░реЛ рд╡рд┐рдЪрд╛рд░рдорд╛, рдпрд╣рд╛рдБ рдХреЗрд╣рд┐ рд╡реНрдпрд╛рдЦреНрдпрд╛ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫреИрди, рд░ рдЖрдзрд╛рд░ рд╡рд░реНрдЧ рдЖрдлреИрдВрдорд╛ рдПрдХрджрдо рд╕рд░рд▓ рдЫред

get_app()

рдПрдЙрдЯрд╛ рдПрдкреНрд▓рд┐рдХреЗрд╕рди рд╡рд╕реНрддреБ рдмрдирд╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐ рдПрдЙрдЯрд╛ рдкреНрд░рдХрд╛рд░реНрдп рдердкреМрдВ app.py

рдмрд┐рдШреНрдирдХрд░реНрддрд╛

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

рдЕрд╣рд┐рд▓реЗрдХреЛ рд▓рд╛рдЧрд┐ рд╣рд╛рдореАрд╕рдБрдЧ рд╕рдмреИрднрдиреНрджрд╛ рд╕рд░рд▓ рдПрдкреНрд▓рд┐рдХреЗрд╕рди рд╕рд┐рд░реНрдЬрдирд╛ рд╣реБрдиреЗрдЫ, рдХреЗрд╣рд┐ рд╕рдордп рдкрдЫрд┐ рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ рд╡рд┐рд╕реНрддрд╛рд░ рдЧрд░реНрдиреЗрдЫреМрдВ, рдпрджреНрдпрдкрд┐, рддрдкрд╛рдИрд▓рд╛рдИ рдкрд░реНрдЦрд┐рди рдирджрд┐рди, рдпрд╣рд╛рдБ рд╕рдиреНрджрд░реНрднрд╣рд░реВ рдПрдк рдХреНрд▓рд╛рд╕рдорд╛ред рдо рддрдкрд╛рдЗрдБрд▓рд╛рдИ рд╕реЗрдЯрд┐рдЩ рд╡рд░реНрдЧрдорд╛ рдПрдХ рдирдЬрд░ рд▓рд┐рди рд╕рд▓реНрд▓рд╛рд╣ рджрд┐рдиреНрдЫреБ, рдХрд┐рдирдХрд┐ рдпреЛ рдЕрдзрд┐рдХрд╛рдВрд╢ рд╕реЗрдЯрд┐рдЩрд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ рдЬрд┐рдореНрдореЗрд╡рд╛рд░ рдЫред

рдореБрдЦреНрдп рднрд╛рдЧ

рдзрд┐рддреЛрдкрддреНрд░рд╣рд░реВрдХреЛ рд╕реВрдЪреА рд╕рдЩреНрдХрд▓рди рд░ рдорд░реНрдорддрдХрд╛ рд▓рд╛рдЧрд┐ рдПрдЬреЗрдиреНрдЯ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

рддреНрдпрд╕реЛрднрдП, рдкрд╣рд┐рд▓реЗ рд╣рд╛рдореАрд▓реЗ рдлрд╕реНрдЯ рдПрдкреНрд▓рд┐рдХреЗрд╕рди рд╡рд╕реНрддреБ рдкрд╛рдЙрдБрдЫреМрдВ - рдпреЛ рдПрдХрджрдо рд╕рд░рд▓ рдЫред рдЕрд░реНрдХреЛ, рд╣рд╛рдореА рд╕реНрдкрд╖реНрдЯ рд░реВрдкрдорд╛ рд╣рд╛рдореНрд░реЛ рдПрдЬреЗрдиреНрдЯрдХреЛ рд▓рд╛рдЧрд┐ рдПрдЙрдЯрд╛ рд╡рд┐рд╖рдп рдШреЛрд╖рдгрд╛ рдЧрд░реНрдЫреМрдВ... рдпрд╣рд╛рдБ рдпреЛ рдХреЗ рд╣реЛ, рдЖрдиреНрддрд░рд┐рдХ рдорд╛рдкрджрдгреНрдб рдХреЗ рд╣реЛ рд░ рдпрд╕рд▓рд╛рдИ рдХрд╕рд░реА рдлрд░рдХ рддрд░рд┐рдХрд╛рд▓реЗ рд╡реНрдпрд╡рд╕реНрдерд┐рдд рдЧрд░реНрди рд╕рдХрд┐рдиреНрдЫ рднрдиреНрдиреЗ рдХреБрд░рд╛ рдЙрд▓реНрд▓реЗрдЦ рдЧрд░реНрди рд▓рд╛рдпрдХ рдЫред

  1. рдХрд╛рдлреНрдХрд╛рдорд╛ рд╡рд┐рд╖рдпрд╣рд░реВ, рдпрджрд┐ рд╣рд╛рдореА рд╕рд╣реА рдкрд░рд┐рднрд╛рд╖рд╛ рдЬрд╛рдиреНрди рдЪрд╛рд╣рдиреНрдЫреМрдВ рднрдиреЗ, рдпреЛ рдкрдвреНрди рд░рд╛рдореНрд░реЛ рдЫ рдмрдиреНрдж рдХрд╛рдЧрдЬрд╛рддрд╡рд╛ рддрдкрд╛рдИрдВ рдкрдвреНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдХрдореНрдкреЗрдиреНрдбрд┐рдпрдо рд░реВрд╕реАрдорд╛ Habr├й рдорд╛, рдЬрд╣рд╛рдБ рд╕рдмреИ рдХреБрд░рд╛ рдкрдирд┐ рдПрдХрджрдо рд╕рд╣реА рд░реВрдкрдорд╛ рдкреНрд░рддрд┐рдмрд┐рдореНрдмрд┐рдд рд╣реБрдиреНрдЫ :)

  2. рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░ рдЖрдиреНрддрд░рд┐рдХ, faust рдХрд╛рдЧрдЬрд╛рддрдорд╛ рд░рд╛рдореНрд░реЛрд╕рдБрдЧ рд╡рд░реНрдгрди рдЧрд░рд┐рдПрдХреЛ рдЫ, рд╣рд╛рдореАрд▓рд╛рдИ рдХреЛрдбрдорд╛ рд╕реАрдзрд╛ рд╡рд┐рд╖рдп рдХрдиреНрдлрд┐рдЧрд░ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫ, рдЕрд╡рд╢реНрдп рдкрдирд┐, рдпрд╕рдХреЛ рдорддрд▓рдм рдлрд╛рд╕реНрдЯ рд╡рд┐рдХрд╛рд╕рдХрд░реНрддрд╛рд╣рд░реВ рджреНрд╡рд╛рд░рд╛ рдкреНрд░рджрд╛рди рдЧрд░рд┐рдПрдХреЛ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ, рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐: рдкреНрд░рддрд┐рдзрд╛рд░рдг, рдЕрд╡рдзрд╛рд░рдг рдиреАрддрд┐ (рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рд░реВрдкрдорд╛ рдореЗрдЯрд╛рдЙрдиреБрд╣реЛрд╕реН, рддрд░ рддрдкрд╛рдЗрдБ рд╕реЗрдЯ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫред рд╕рдВрдХреБрдЪрд┐рддрдкреНрд░рддрд┐ рд╡рд┐рд╖рдп рд╡рд┐рднрд╛рдЬрдирдХреЛ рд╕рдВрдЦреНрдпрд╛ (рд╡рд┐рднрд╛рдЬрдирд╣рд░реВрдЧрд░реНрди, рдЙрджрд╛рд╣рд░рдг рдХреЛ рд▓рд╛рдЧреА, рднрдиреНрджрд╛ рдХрдо рд╡рд┐рд╢реНрд╡рд╡реНрдпрд╛рдкреА рдорд╣рддреНрд╡ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╣рд░реВ рдлрд╛рд╕реНрдЯ)ред

  3. рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛, рдПрдЬреЗрдиреНрдЯрд▓реЗ рд╡рд┐рд╢реНрд╡рд╡реНрдпрд╛рдкреА рдорд╛рдирд╣рд░реВрд╕рдБрдЧ рд╡реНрдпрд╡рд╕реНрдерд┐рдд рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рд╕рдХреНрдЫ, рддрдерд╛рдкрд┐, рдо рд╕рдмреИ рдХреБрд░рд╛ рд╕реНрдкрд╖реНрдЯ рд░реВрдкрдорд╛ рдШреЛрд╖рдгрд╛ рдЧрд░реНрди рдЪрд╛рд╣рдиреНрдЫреБред рдердк рд░реВрдкрдорд╛, рдПрдЬреЗрдиреНрдЯ рд╡рд┐рдЬреНрдЮрд╛рдкрдирдорд╛ рд╡рд┐рд╖рдпрдХреЛ рдХреЗрд╣реА рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ (рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рд╡рд┐рднрд╛рдЬрдирд╣рд░реВрдХреЛ рд╕рдВрдЦреНрдпрд╛ рд╡рд╛ рдЕрд╡рдзрд╛рд░рдг рдиреАрддрд┐) рдХрдиреНрдлрд┐рдЧрд░ рдЧрд░реНрди рд╕рдХрд┐рдБрджреИрдиред

    рдореНрдпрд╛рдиреБрдЕрд▓ рд░реВрдкрдорд╛ рд╡рд┐рд╖рдп рдкрд░рд┐рднрд╛рд╖рд┐рдд рдирдЧрд░реА рдпреЛ рдХрд╕реНрддреЛ рджреЗрдЦрд┐рди рд╕рдХреНрдЫ рдпрд╣рд╛рдБ рдЫ:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

рдареАрдХ рдЫ, рдЕрдм рд╣рд╛рдореНрд░реЛ рдПрдЬреЗрдиреНрдЯрд▓реЗ рдХреЗ рдЧрд░реНрдЫ рднрдиреЗрд░ рд╡рд░реНрдгрди рдЧрд░реМрдВ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

рддреНрдпрд╕реЛрднрдП, рдПрдЬреЗрдиреНрдЯрдХреЛ рд╕реБрд░реБрдорд╛, рд╣рд╛рдореА рд╣рд╛рдореНрд░реЛ рдЧреНрд░рд╛рд╣рдХ рдорд╛рд░реНрдлрдд рдЕрдиреБрд░реЛрдзрд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐ aiohttp рд╕рддреНрд░ рдЦреЛрд▓реНрдЫреМрдВред рдпрд╕рд░реА, рдХрд╛рдорджрд╛рд░ рд╕реБрд░реБ рдЧрд░реНрджрд╛, рдЬрдм рд╣рд╛рдореНрд░реЛ рдПрдЬреЗрдиреНрдЯ рд╕реБрд░реБ рд╣реБрдиреНрдЫ, рдПрдХ рд╕рддреНрд░ рддреБрд░реБрдиреНрддреИ рдЦреЛрд▓рд┐рдиреЗрдЫ - рдПрдХ, рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рд╕рдореНрдкреВрд░реНрдг рд╕рдордпрдХреЛ рд▓рд╛рдЧрд┐ (рд╡рд╛ рдзреЗрд░реИ, рдпрджрд┐ рддрдкрд╛рдИрдВрд▓реЗ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░ рдкрд░рд┐рд╡рд░реНрддрди рдЧрд░реНрдиреБрднрдпреЛ рднрдиреЗред рд╕рд╣рдорддрд┐ рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдПрдХрд╛рдЗ рднрдПрдХреЛ рдПрдЬреЗрдиреНрдЯрдмрд╛рдЯ)ред

рдЕрд░реНрдХреЛ, рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдкрдЫреНрдпрд╛рдЙрдБрдЫреМрдВ (рд╣рд╛рдореА рд╕рдиреНрджреЗрд╢ рд░рд╛рдЦреНрдЫреМрдВ _, рдХрд┐рдирдХрд┐ рд╣рд╛рдореА, рдпрд╕ рдПрдЬреЗрдиреНрдЯрдорд╛, рд╣рд╛рдореНрд░реЛ рд╡рд┐рд╖рдпрдмрд╛рдЯ рд╕рдиреНрджреЗрд╢рд╣рд░реВрдХреЛ рд╕рд╛рдордЧреНрд░реАрдХреЛ рд╡рд╛рд╕реНрддрд╛ рдЧрд░реНрджреИрдиреМрдВ, рдпрджрд┐ рддрд┐рдиреАрд╣рд░реВ рд╣рд╛рд▓рдХреЛ рдЕрдлрд╕реЗрдЯрдорд╛ рдЕрд╡рд╕реНрдерд┐рдд рдЫрдиреН рднрдиреЗ, рдЕрдиреНрдпрдерд╛ рд╣рд╛рдореНрд░реЛ рдЪрдХреНрд░ рддрд┐рдиреАрд╣рд░реВрдХреЛ рдЖрдЧрдордирдХреЛ рд▓рд╛рдЧрд┐ рдкрд░реНрдЦрдиреЗрдЫред рдЦреИрд░, рд╣рд╛рдореНрд░реЛ рд▓реБрдк рднрд┐рддреНрд░, рд╣рд╛рдореА рд╕рдиреНрджреЗрд╢рдХреЛ рд░рд╕рд┐рдж рд▓рдЧ рдЧрд░реНрдЫреМрдВ, рд╕рдХреНрд░рд┐рдп (get_securities рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рд░реВрдкрдорд╛ рдорд╛рддреНрд░ рд╕рдХреНрд░рд┐рдп рдлрд░реНрдХрд╛рдЙрдБрдЫ, рдЧреНрд░рд╛рд╣рдХ рдХреЛрдб рд╣реЗрд░реНрдиреБрд╣реЛрд╕реН) рд╕рд┐рдХреНрдпреЛрд░рд┐рдЯреАрд╣рд░реВрдХреЛ рд╕реВрдЪреА рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрдЫреМрдВ рд░ рдбрд╛рдЯрд╛рдмреЗрд╕рдорд╛ рдмрдЪрдд рдЧрд░реНрдЫреМрдВ, рдПрдЙрдЯреИ рдЯрд┐рдХрд░ рд░ рддреНрдпрд╣рд╛рдБ рд╕реБрд░рдХреНрд╖рд╛ рдЫ рдХрд┐ рдЫреИрди рднрдиреЗрд░ рдЬрд╛рдБрдЪ рдЧрд░реНрджреИред рдбрд╛рдЯрд╛рдмреЗрд╕рдорд╛ рд╡рд┐рдирд┐рдордп, рдпрджрд┐ рддреНрдпрд╣рд╛рдБ рдЫ рднрдиреЗ, рддреНрдпрд╕рдкрдЫрд┐ рдпреЛ (рдХрд╛рдЧрдЬ) рдорд╛рддреНрд░ рдЕрджреНрдпрд╛рд╡рдзрд┐рдХ рд╣реБрдиреЗрдЫред

рд╣рд╛рдореНрд░реЛ рд╕рд┐рд░реНрдЬрдирд╛ рд╕реБрд░реБ рдЧрд░реМрдВ!

> docker-compose up -d
... ╨Ч╨░╨┐╤Г╤Б╨║ ╨║╨╛╨╜╤В╨╡╨╣╨╜╨╡╤А╨╛╨▓ ...
> faust -A horton.agents worker --without-web -l info

PS рд╕реБрд╡рд┐рдзрд╛рд╣рд░реВ рд╡реЗрдм рдХрдореНрдкреЛрдиреЗрдиреНрдЯ рдо рд▓реЗрдЦрд╣рд░реВрдорд╛ рдлрд╛рд╕реНрдЯрд▓рд╛рдИ рд╡рд┐рдЪрд╛рд░ рдЧрд░реНрджрд┐рди, рддреНрдпрд╕реИрд▓реЗ рд╣рд╛рдореАрд▓реЗ рдЙрдкрдпреБрдХреНрдд рдЭрдгреНрдбрд╛ рд╕реЗрдЯ рдЧрд░реНрдпреМрдВред

рд╣рд╛рдореНрд░реЛ рд▓рдиреНрдЪ рдХрдорд╛рдгреНрдбрдорд╛, рд╣рд╛рдореАрд▓реЗ рдлрд╛рд╕реНрдЯрд▓рд╛рдИ рдЬрд╛рдирдХрд╛рд░реА рд▓рдЧ рдЖрдЙрдЯрдкреБрдЯ рд╕реНрддрд░рдХреЛ рд╕рд╛рде рдПрдкреНрд▓рд┐рдХреЗрд╕рди рд╡рд╕реНрддреБ рдХрд╣рд╛рдБ рдЦреЛрдЬреНрдиреЗ рд░ рдпрд╕рдХреЛ рд╕рд╛рде рдХреЗ рдЧрд░реНрдиреЗ (рдХрд╛рд░реНрдпрдХрд░реНрддрд╛ рд╕реБрд░реВ рдЧрд░реНрдиреБрд╣реЛрд╕реН) рднрдиреНрдпреМрдВред рд╣рд╛рдореАрд▓реЗ рдирд┐рдореНрди рдЖрдЙрдЯрдкреБрдЯ рдкрд╛рдЙрдБрдЫреМрдВ:

рдмрд┐рдШреНрдирдХрд░реНрддрд╛

тФМ╞Тa┬╡SтАа v1.10.4тФмтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФР
тФВ id          тФВ horton                                            тФВ
тФВ transport   тФВ [URL('kafka://localhost:9092')]                   тФВ
тФВ store       тФВ memory:                                           тФВ
тФВ log         тФВ -stderr- (info)                                   тФВ
тФВ pid         тФВ 1271262                                           тФВ
тФВ hostname    тФВ host-name                                         тФВ
тФВ platform    тФВ CPython 3.8.2 (Linux x86_64)                      тФВ
тФВ drivers     тФВ                                                   тФВ
тФВ   transport тФВ aiokafka=1.1.6                                    тФВ
тФВ   web       тФВ aiohttp=3.6.2                                     тФВ
тФВ datadir     тФВ /path/to/project/horton-data                      тФВ
тФВ appdir      тФВ /path/to/project/horton-data/v1                   тФВ
тФФтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ┤тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФШ
... ╨╗╨╛╨│╨╕, ╨╗╨╛╨│╨╕, ╨╗╨╛╨│╨╕ ...

тФМTopic Partition SetтФАтФАтФАтФАтФАтФАтФАтФАтФАтФмтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФР
тФВ topic                      тФВ partitions тФВ
тФЬтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ╝тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФд
тФВ collect_securities         тФВ {0-7}      тФВ
тФВ horton-__assignor-__leader тФВ {0}        тФВ
тФФтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФ┤тФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФАтФШ 

рдЬрд┐рдЙрджреИ рдЫ!!!

рд╡рд┐рднрд╛рдЬрди рд╕реЗрдЯ рд╣реЗрд░реМрдВред рд╣рд╛рдореАрд▓реЗ рджреЗрдЦреНрди рд╕рдХреНрдЫреМрдВ, рд╣рд╛рдореАрд▓реЗ рдХреЛрдбрдорд╛ рддреЛрдХреЗрдХреЛ рдирд╛рдордХреЛ рд╕рд╛рде рдПрдЙрдЯрд╛ рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░рд┐рдПрдХреЛ рдерд┐рдпреЛ, рд╡рд┐рднрд╛рдЬрдирд╣рд░реВрдХреЛ рдкреВрд░реНрд╡рдирд┐рд░реНрдзрд╛рд░рд┐рдд рд╕рдВрдЦреНрдпрд╛ (8, рдмрд╛рдЯ рд▓рд┐рдЗрдПрдХреЛред topic_partitions - рдЕрдиреБрдкреНрд░рдпреЛрдЧ рд╡рд╕реНрддреБ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░), рдХрд┐рдирдХрд┐ рд╣рд╛рдореАрд▓реЗ рд╣рд╛рдореНрд░реЛ рд╡рд┐рд╖рдп (рд╡рд┐рднрд╛рдЬрди рдорд╛рд░реНрдлрдд) рдХреЛ рд▓рд╛рдЧрд┐ рдПрдХ рд╡реНрдпрдХреНрддрд┐рдЧрдд рдорд╛рди рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реЗрдиреМрдВред рдХрд╛рд░реНрдпрдХрд░реНрддрд╛рдорд╛ рд╕реБрд░реБ рдЧрд░рд┐рдПрдХреЛ рдПрдЬреЗрдиреНрдЯрд▓рд╛рдИ рд╕рдмреИ 8 рд╡рд┐рднрд╛рдЬрдирд╣рд░реВ рддреЛрдХрд┐рдПрдХреЛ рдЫ, рдХрд┐рдирдХрд┐ рдпреЛ рдПрдХ рдорд╛рддреНрд░ рд╣реЛ, рддрд░ рдпреЛ рдХреНрд▓рд╕реНрдЯрд░рд┐рдЩрдХреЛ рдмрд╛рд░реЗрдорд╛ рднрд╛рдЧрдорд╛ рдердк рд╡рд┐рд╕реНрддрд╛рд░рдорд╛ рдЫрд▓рдлрд▓ рдЧрд░рд┐рдиреЗрдЫред

рдареАрдХ рдЫ, рдЕрдм рд╣рд╛рдореА рдЕрд░реНрдХреЛ рдЯрд░реНрдорд┐рдирд▓ рд╡рд┐рдиреНрдбреЛрдорд╛ рдЬрд╛рди рд╕рдХреНрдЫреМрдВ рд░ рд╣рд╛рдореНрд░реЛ рд╢реАрд░реНрд╖рдХрдорд╛ рдЦрд╛рд▓реА рд╕рдиреНрджреЗрд╢ рдкрдард╛рдЙрди рд╕рдХреНрдЫреМрдВ:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИ @ рд╣рд╛рдореАрд▓реЗ рджреЗрдЦрд╛рдЙрдБрдЫреМрдВ рдХрд┐ рд╣рд╛рдореАрд▓реЗ "collect_securities" рдирд╛рдордХ рд╡рд┐рд╖рдпрдорд╛ рд╕рдиреНрджреЗрд╢ рдкрдард╛рдЙрдБрджреИрдЫреМрдВред

рдпрд╕ рдЕрд╡рд╕реНрдерд╛рдорд╛, рд╕рдиреНрджреЗрд╢ рд╡рд┐рднрд╛рдЬрди 6 рдорд╛ рдЧрдпреЛ - рддрдкрд╛рдИрдВ kafdrop on рдорд╛ рдЧрдПрд░ рдпреЛ рдЬрд╛рдБрдЪ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ localhost:9000

рд╣рд╛рдореНрд░реЛ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛рд╕рдБрдЧ рдЯрд░реНрдорд┐рдирд▓ рд╕рдЮреНрдЭреНрдпрд╛рд▓рдорд╛ рдЬрд╛рдБрджрд╛, рд╣рд╛рдореА loguru рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдкрдард╛рдЗрдПрдХреЛ рдЦреБрд╢реА рд╕рдиреНрджреЗрд╢ рджреЗрдЦреНрдиреЗрдЫреМрдВ:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

рд╣рд╛рдореА рдордЩреНрдЧреЛ (Robo3T рд╡рд╛ Studio3T рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░) рдорд╛ рдкрдирд┐ рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВ рд░ рдзрд┐рддреЛрд╣рд░реВ рдбрд╛рдЯрд╛рдмреЗрд╕рдорд╛ рдЫрдиреН рднрдиреЗрд░ рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВ:

рдо рдПрдХ рдЕрд░рдмрдкрддрд┐ рд╣реЛрдЗрди, рд░ рддреНрдпрд╕реИрд▓реЗ рд╣рд╛рдореА рдкрд╣рд┐рд▓реЛ рджреГрд╢реНрдп рд╡рд┐рдХрд▓реНрдк рд╕рдВрдЧ рд╕рдиреНрддреБрд╖реНрдЯ рдЫреМрдВред

Faust рдорд╛ рдкреГрд╖реНрдарднреВрдорд┐ рдХрд╛рд░реНрдпрд╣рд░реВ, рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВFaust рдорд╛ рдкреГрд╖реНрдарднреВрдорд┐ рдХрд╛рд░реНрдпрд╣рд░реВ, рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВ

рдЦреБрд╢реА рд░ рдЖрдирдиреНрдж - рдкрд╣рд┐рд▓реЛ рдПрдЬреЗрдиреНрдЯ рддрдпрд╛рд░ рдЫ :)

рдПрдЬреЗрдиреНрдЯ рддрдпрд╛рд░, рдирдпрд╛рдБ рдПрдЬреЗрдиреНрдЯ рджреАрд░реНрдШрд╛рдпреБ!

рд╣реЛ, рд╕рдЬреНрдЬрдирд╣рд░реВ, рд╣рд╛рдореАрд▓реЗ рдпрд╕ рд▓реЗрдЦрджреНрд╡рд╛рд░рд╛ рддрдпрд╛рд░ рдкрд╛рд░рд┐рдПрдХреЛ рдорд╛рд░реНрдЧрдХреЛ 1/3 рдорд╛рддреНрд░ рдХрднрд░ рдЧрд░реЗрдХрд╛ рдЫреМрдВ, рддрд░ рдирд┐рд░рд╛рд╢ рдирд╣реБрдиреБрд╣реЛрд╕реН, рдХрд┐рдирдХрд┐ рдЕрдм рдпреЛ рд╕рдЬрд┐рд▓реЛ рд╣реБрдиреЗрдЫред

рддреНрдпрд╕реЛрднрдП рдЕрдм рд╣рд╛рдореАрд▓рд╛рдИ рдореЗрдЯрд╛ рдЬрд╛рдирдХрд╛рд░реА рд╕рдЩреНрдХрд▓рди рдЧрд░реНрдиреЗ рд░ рд╕рдЩреНрдХрд▓рди рдХрд╛рдЧрдЬрд╛рддрдорд╛ рд░рд╛рдЦреНрдиреЗ рдПрдЬреЗрдиреНрдЯ рдЪрд╛рд╣рд┐рдиреНрдЫ:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

рдпрд╕ рдПрдЬреЗрдиреНрдЯрд▓реЗ рдХреБрдиреИ рд╡рд┐рд╢реЗрд╖ рд╕реБрд░рдХреНрд╖рд╛рдХреЛ рдмрд╛рд░реЗрдорд╛ рдЬрд╛рдирдХрд╛рд░реА рдкреНрд░рд╢реЛрдзрди рдЧрд░реНрдиреЗ рднрдПрдХреЛрд▓реЗ, рд╣рд╛рдореАрд▓реЗ рд╕рдиреНрджреЗрд╢рдорд╛ рдпреЛ рд╕реБрд░рдХреНрд╖рд╛рдХреЛ рдЯрд┐рдХрд░ (рдкреНрд░рддреАрдХ) рд╕рдВрдХреЗрдд рдЧрд░реНрдиреБрдкрд░реНрдЫред рдпрд╕ рдЙрджреНрджреЗрд╢реНрдпрдХрд╛ рд▓рд╛рдЧрд┐ рдлрд╛рд╕реНрдЯрдорд╛ рдЫрдиреН рд░реЗрдХрд░реНрдб тАФ рдПрдЬреЗрдиреНрдЯ рд╢реАрд░реНрд╖рдХрдорд╛ рд╕рдиреНрджреЗрд╢ рдпреЛрдЬрдирд╛ рдШреЛрд╖рдгрд╛ рдЧрд░реНрдиреЗ рдХрдХреНрд╖рд╛рд╣рд░реВред

рдпрд╕ рдЕрд╡рд╕реНрдерд╛рдорд╛, рдЬрд╛рдФрдВ records.py рд░ рдпрд╕ рд╡рд┐рд╖рдпрдХреЛ рд▓рд╛рдЧрд┐ рд╕рдиреНрджреЗрд╢ рдХрд╕реНрддреЛ рд╣реБрдиреБрдкрд░реНрдЫ рд╡рд░реНрдгрди рдЧрд░реНрдиреБрд╣реЛрд╕реН:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

рддрдкрд╛рдИрдВрд▓реЗ рдЕрдиреБрдорд╛рди рдЧрд░реНрдиреБрднрдПрдХреЛ рд╣реБрди рд╕рдХреНрдЫ, рдлрд╛рд╕реНрдЯрд▓реЗ рд╕рдиреНрджреЗрд╢ рд╕реНрдХрд┐рдорд╛ рд╡рд░реНрдгрди рдЧрд░реНрди рдкрд╛рдЗрдерди рдкреНрд░рдХрд╛рд░ рдПрдиреЛрдЯреЗрд╕рди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджрдЫ, рддреНрдпрд╕реИрд▓реЗ рдкреБрд╕реНрддрдХрд╛рд▓рдпрджреНрд╡рд╛рд░рд╛ рд╕рдорд░реНрдерд┐рдд рдиреНрдпреВрдирддрдо рд╕рдВрд╕реНрдХрд░рдг рд╣реЛред 3.6.

рдПрдЬреЗрдиреНрдЯрдорд╛ рдлрд░реНрдХреМрдВ, рдкреНрд░рдХрд╛рд░рд╣рд░реВ рд╕реЗрдЯ рдЧрд░реНрдиреБрд╣реЛрд╕реН рд░ рдпрд╕рд▓рд╛рдИ рдердкреНрдиреБрд╣реЛрд╕реН:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

рддрдкрд╛рдИрд▓реЗ рджреЗрдЦреНрди рд╕рдХреНрдиреБрд╣реБрдиреЗ рд░реВрдкрдорд╛, рд╣рд╛рдореА рд╡рд┐рд╖рдп рдкреНрд░рд╛рд░рдореНрднрд┐рдХ рд╡рд┐рдзрд┐ - value_type рдорд╛ рдпреЛрдЬрдирд╛рдХреЛ рд╕рд╛рде рдирдпрд╛рдБ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░ рдкрд╛рд╕ рдЧрд░реНрдЫреМрдВред рдпрд╕рдмрд╛рд╣реЗрдХ, рд╕рдмреИ рдХреБрд░рд╛ рдПрдЙрдЯреИ рдпреЛрдЬрдирд╛ рдкрдЫреНрдпрд╛рдЙрдБрдЫ, рддреНрдпрд╕реИрд▓реЗ рдо рдЕрд░реВ рдХреБрдиреИ рдХреБрд░рд╛рдорд╛ рдмрд╕реНрдиреЗ рдХреБрдиреИ рдмрд┐рдиреНрджреБ рджреЗрдЦреНрджрд┐рдиред

рдареАрдХ рдЫ, рдЕрдиреНрддрд┐рдо рдЯрдЪ рднрдиреЗрдХреЛ рдореЗрдЯрд╛ рдЬрд╛рдирдХрд╛рд░реА рд╕рдЩреНрдХрд▓рди рдПрдЬреЗрдиреНрдЯрд▓рд╛рдИ рдХрд▓_рд╕реЗрдХреНрдпреБрд░рд╛рдЗрдЯрд╛рдЗрдЯрд╣рд░реВрдорд╛ рдХрд▓ рдердкреНрдиреБ рд╣реЛ:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

рд╣рд╛рдореА рд╕рдиреНрджреЗрд╢рдХреЛ рд▓рд╛рдЧрд┐ рдкрд╣рд┐рд▓реЗ рдШреЛрд╖рдгрд╛ рдЧрд░рд┐рдПрдХреЛ рдпреЛрдЬрдирд╛ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫреМрдВред рдпрд╕ рдЕрд╡рд╕реНрдерд╛рдорд╛, рдореИрд▓реЗ .cast рд╡рд┐рдзрд┐ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрдВ рдХрд┐рдирдХрд┐ рд╣рд╛рдореАрд▓реЗ рдПрдЬреЗрдиреНрдЯрдмрд╛рдЯ рдирддрд┐рдЬрд╛ рдкрд░реНрдЦрдиреБ рдкрд░реНрджреИрди, рддрд░ рдпреЛ рдЙрд▓реНрд▓реЗрдЦ рдЧрд░реНрди рд▓рд╛рдпрдХ рдЫред рддрд░рд┐рдХрд╛ рд╢реАрд░реНрд╖рдХрдорд╛ рд╕рдиреНрджреЗрд╢ рдкрдард╛рдЙрдиреБрд╣реЛрд╕реН:

  1. рдХрд╛рд╕реНрдЯ - рдмреНрд▓рдХ рдЧрд░реНрджреИрди рдХрд┐рдирдХрд┐ рдпрд╕рд▓реЗ рдкрд░рд┐рдгрд╛рдордХреЛ рдЖрд╢рд╛ рдЧрд░реНрджреИрдиред рддрдкрд╛рдИрдВрд▓реЗ рд╕рдиреНрджреЗрд╢рдХреЛ рд░реВрдкрдорд╛ рдЕрд░реНрдХреЛ рд╢реАрд░реНрд╖рдХрдорд╛ рдкрд░рд┐рдгрд╛рдо рдкрдард╛рдЙрди рд╕рдХреНрдиреБрд╣реБрдиреНрдиред

  2. рдкрдард╛рдЙрдиреБрд╣реЛрд╕реН - рдмреНрд▓рдХ рдЧрд░реНрджреИрди рдХрд┐рдирдХрд┐ рдпрд╕рд▓реЗ рдкрд░рд┐рдгрд╛рдордХреЛ рдЖрд╢рд╛ рдЧрд░реНрджреИрдиред рддрдкрд╛рдИрд▓реЗ рдПрдЙрдЯрд╛ рдПрдЬреЗрдиреНрдЯрд▓рд╛рдИ рд╢реАрд░реНрд╖рдХрдорд╛ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдЬрд╕рдорд╛ рдкрд░рд┐рдгрд╛рдо рдЬрд╛рдиреНрдЫред

  3. рд╕реЛрдзреНрдиреБрд╣реЛрд╕реН - рдкрд░рд┐рдгрд╛рдордХреЛ рд▓рд╛рдЧрд┐ рдкрд░реНрдЦрдиреБрд╣реЛрд╕реНред рддрдкрд╛рдИрд▓реЗ рдПрдЙрдЯрд╛ рдПрдЬреЗрдиреНрдЯрд▓рд╛рдИ рд╢реАрд░реНрд╖рдХрдорд╛ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдЬрд╕рдорд╛ рдкрд░рд┐рдгрд╛рдо рдЬрд╛рдиреНрдЫред

рддреНрдпрд╕реЛрднрдП, рдЖрдЬрдХреЛ рд▓рд╛рдЧрд┐ рдПрдЬреЗрдиреНрдЯрд╣рд░реВрд╕рдБрдЧ рддреНрдпреЛ рд╕рдмреИ рд╣реЛ!

рд╕рдкрдирд╛рдХреЛ рдЯреЛрд▓реА

рдореИрд▓реЗ рдпрд╕ рднрд╛рдЧрдорд╛ рд▓реЗрдЦреНрдиреЗ рд╡рд╛рдЪрд╛ рдЧрд░реЗрдХреЛ рдЕрдиреНрддрд┐рдо рдХреБрд░рд╛ рднрдиреЗрдХреЛ рдЖрджреЗрд╢рд╣рд░реВ рд╣реЛред рдкрд╣рд┐рд▓реЗ рдЙрд▓реНрд▓реЗрдЦ рдЧрд░рд┐рдПрдЭреИрдВ, рдлрд╛рд╕реНрдЯрдорд╛ рдЖрджреЗрд╢рд╣рд░реВ рдХреНрд▓рд┐рдХрдХреЛ рд╡рд░рд┐рдкрд░рд┐ рд░реНрдпрд╛рдкрд░ рд╣реБрдиреНред рд╡рд╛рд╕реНрддрд╡рдорд╛, faust рд▓реЗ -A рдХреБрдЮреНрдЬреА рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрджрд╛ рд╣рд╛рдореНрд░реЛ рдЕрдиреБрдХреВрд▓рди рдЖрджреЗрд╢рд▓рд╛рдИ рдпрд╕рдХреЛ рдЗрдиреНрдЯрд░рдлреЗрд╕рдорд╛ рд╕рдВрд▓рдЧреНрди рдЧрд░реНрджрдЫ

рдШреЛрд╖рдгрд╛ рдкрдЫрд┐ рдПрдЬреЗрдиреНрдЯрд╣рд░реВ рдорд╛ agents.py рдбреЗрдХреЛрд░реЗрдЯрд░рд╕рдБрдЧ рдПрдЙрдЯрд╛ рдкреНрд░рдХрд╛рд░реНрдп рдердкреНрдиреБрд╣реЛрд╕реН app.commandрд╡рд┐рдзрд┐ рдХрд▓ рдЧрд░реНрджреИ рдЬрд╛рдд ╤Г рд╕рдЩреНрдХрд▓рди_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

рдпрд╕реИрд▓реЗ, рдпрджрд┐ рд╣рд╛рдореАрд▓реЗ рдЖрджреЗрд╢рд╣рд░реВрдХреЛ рд╕реВрдЪреАрд▓рд╛рдИ рдХрд▓ рдЧрд░реНрдЫреМрдВ, рд╣рд╛рдореНрд░реЛ рдирдпрд╛рдБ рдЖрджреЗрд╢ рдпрд╕рдорд╛ рд╣реБрдиреЗрдЫ:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ рдЕрд░реВ рдХрд╕реИрд▓рд╛рдИ рдЬрд╕реНрддреИ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рддреНрдпрд╕реИрд▓реЗ рдлрд╛рд╕реНрдЯ рдХрд╛рд░реНрдпрдХрд░реНрддрд╛рд▓рд╛рдИ рдкреБрди: рд╕реБрд░реБ рдЧрд░реМрдВ рд░ рдзрд┐рддреЛрд╣рд░реВрдХреЛ рдкреВрд░реНрдг рд╕рдВрдЧреНрд░рд╣ рд╕реБрд░реБ рдЧрд░реМрдВ:

> faust -A horton.agents start-collect-securities

рдЕрдм рдХреЗ рд╣реБрдиреЗ рд╣реЛ ?

рдЕрд░реНрдХреЛ рднрд╛рдЧрдорд╛, рдЙрджрд╛рд╣рд░рдгрдХреЛ рд░реВрдкрдорд╛ рдмрд╛рдБрдХреА рдПрдЬреЗрдиреНрдЯрд╣рд░реВ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИ, рд╣рд╛рдореА рд╡рд░реНрд╖рдХреЛ рд▓рд╛рдЧрд┐ рд╡реНрдпрд╛рдкрд╛рд░рдХреЛ рдмрдиреНрдж рдореВрд▓реНрдп рд░ рдПрдЬреЗрдиреНрдЯрд╣рд░реВрдХреЛ рдХреНрд░реЛрди рдкреНрд░рдХреНрд╖реЗрдкрдгрдорд╛ рдЪрд░рдордХреЛ рдЦреЛрдЬреА рдЧрд░реНрди рд╕рд┐рдВрдХ рд╕рдВрдпрдиреНрддреНрд░рд▓рд╛рдИ рд╡рд┐рдЪрд╛рд░ рдЧрд░реНрдиреЗрдЫреМрдВред

рдЖрдЬрдХреЛ рд▓рд╛рдЧрд┐ рдпрддрд┐ рд╣реЛ! рдкрдвреНрдиреБ рднрдПрдХреЛрдорд╛ рдзрдиреНрдпрд╡рд╛рдж :)

рдпрд╕ рднрд╛рдЧрдХреЛ рд▓рд╛рдЧрд┐ рдХреЛрдб

Faust рдорд╛ рдкреГрд╖реНрдарднреВрдорд┐ рдХрд╛рд░реНрдпрд╣рд░реВ, рднрд╛рдЧ II: рдПрдЬреЗрдиреНрдЯ рд░ рдЯреЛрд▓реАрд╣рд░реВ

PS рдЕрдиреНрддрд┐рдо рднрд╛рдЧ рдЕрдиреНрддрд░реНрдЧрдд рдорд▓рд╛рдИ рдлрд╛рд╕реНрдЯ рд░ рдХрдиреНрдлреНрд▓реБрдПрдиреНрдЯ рдХрд╛рдлреНрдХрд╛рдХреЛ рдмрд╛рд░реЗрдорд╛ рд╕реЛрдзрд┐рдПрдХреЛ рдерд┐рдпреЛ (рд╕рдВрдЧрдордорд╛ рдХреЗ рд╕реБрд╡рд┐рдзрд╛рд╣рд░реВ рдЫрдиреН?)ред рдпрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫ рдХрд┐ confluent рдзреЗрд░реИ рддрд░рд┐рдХрд╛рдорд╛ рдЕрдзрд┐рдХ рдХрд╛рд░реНрдпрд╛рддреНрдордХ рдЫ, рддрд░ рддрдереНрдп рдпреЛ рд╣реЛ рдХрд┐ faust рд╕рдВрдЧ confluent рдХреЛ рд▓рд╛рдЧреА рдкреВрд░реНрдг рдЧреНрд░рд╛рд╣рдХ рд╕рдорд░реНрдерди рдЫреИрди - рдпреЛ рдирд┐рдореНрди рдмрд╛рдЯ рдЫ рдХрд╛рдЧрдЬрд╛рддрдорд╛ рдЧреНрд░рд╛рд╣рдХ рдкреНрд░рддрд┐рдмрдиреНрдзрд╣рд░реВрдХреЛ рд╡рд┐рд╡рд░рдг.

рд╕реНрд░реЛрдд: www.habr.com

рдПрдХ рдЯрд┐рдкреНрдкрдгреА рдердкреНрди