āĻŦāĻŋāĻˇāĻ¯āĻŧāĻŦāĻ¸ā§āĻ¤ā§ āĻ¸ā§āĻāĻŋ
-
āĻĒāĻžāĻ°ā§āĻ II: āĻāĻā§āĻ¨ā§āĻ āĻāĻŦāĻ āĻĻāĻ˛
āĻāĻŽāĻ°āĻž āĻāĻāĻžāĻ¨ā§ āĻāĻŋ āĻāĻ°āĻāĻŋ?
āĻ¸ā§āĻ¤āĻ°āĻžāĻ, āĻ¤āĻžāĻ, āĻĻā§āĻŦāĻŋāĻ¤ā§āĻ¯āĻŧ āĻ āĻāĻļ. āĻ¯ā§āĻŽāĻ¨āĻāĻŋ āĻāĻā§ āĻ˛ā§āĻāĻž āĻšāĻ¯āĻŧā§āĻā§, āĻāĻ¤ā§ āĻāĻŽāĻ°āĻž āĻ¨āĻŋāĻŽā§āĻ¨āĻ˛āĻŋāĻāĻŋāĻ¤āĻā§āĻ˛āĻŋ āĻāĻ°āĻŦ:
-
āĻāĻŽāĻžāĻĻā§āĻ° āĻĒā§āĻ°āĻ¯āĻŧā§āĻāĻ¨ā§āĻ¯āĻŧ āĻļā§āĻˇ āĻĒāĻ¯āĻŧā§āĻ¨ā§āĻāĻā§āĻ˛āĻŋāĻ° āĻāĻ¨ā§āĻ¯ āĻ āĻ¨ā§āĻ°ā§āĻ§ āĻ¸āĻš aiohttp-āĻ alphavantage-āĻāĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ āĻā§āĻ āĻā§āĻ˛āĻžāĻ¯āĻŧā§āĻ¨ā§āĻ āĻ˛āĻŋāĻāĻŋāĨ¤
-
āĻāĻ¸ā§āĻ¨ āĻāĻāĻāĻŋ āĻāĻā§āĻ¨ā§āĻ āĻ¤ā§āĻ°āĻŋ āĻāĻ°āĻŋ āĻ¯āĻžāĻ°āĻž āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻ āĻāĻŦāĻ āĻ¤āĻžāĻĻā§āĻ° āĻāĻĒāĻ° āĻŽā§āĻāĻž āĻ¤āĻĨā§āĻ¯ āĻ¸āĻāĻā§āĻ°āĻš āĻāĻ°āĻŦā§āĨ¤
āĻāĻŋāĻ¨ā§āĻ¤ā§, āĻāĻŽāĻ°āĻž āĻāĻ āĻĒā§āĻ°āĻāĻ˛ā§āĻĒā§āĻ° āĻāĻ¨ā§āĻ¯āĻ āĻāĻāĻŋ āĻāĻ°āĻŦ, āĻāĻŦāĻ āĻĢāĻžāĻ¸ā§āĻ āĻāĻŦā§āĻˇāĻŖāĻžāĻ° āĻĒāĻ°āĻŋāĻĒā§āĻ°ā§āĻā§āĻˇāĻŋāĻ¤ā§, āĻāĻŽāĻ°āĻž āĻļāĻŋāĻāĻŦ āĻā§āĻāĻžāĻŦā§ āĻāĻā§āĻ¨ā§āĻ āĻ˛āĻŋāĻāĻ¤ā§ āĻšāĻ¯āĻŧ āĻ¯āĻž āĻāĻžāĻĢāĻāĻž āĻĨā§āĻā§ āĻāĻā§āĻ¨ā§āĻāĻā§āĻ˛āĻŋ āĻĒā§āĻ°āĻ¸ā§āĻ¸ āĻāĻ°ā§, āĻ¸ā§āĻāĻ¸āĻžāĻĨā§ āĻā§āĻāĻžāĻŦā§ āĻāĻŽāĻžāĻ¨ā§āĻĄ āĻ˛āĻŋāĻāĻ¤ā§ āĻšāĻ¯āĻŧ (āĻā§āĻ˛āĻŋāĻ āĻ°ā§āĻ¯āĻžāĻĒāĻžāĻ°), āĻāĻŽāĻžāĻĻā§āĻ° āĻā§āĻˇā§āĻ¤ā§āĻ°ā§ - āĻāĻā§āĻ¨ā§āĻ āĻ¯ā§ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻ¨āĻāĻ°āĻĻāĻžāĻ°āĻŋ āĻāĻ°āĻā§ āĻ¸ā§āĻ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻŽā§āĻ¯āĻžāĻ¨ā§āĻ¯āĻŧāĻžāĻ˛ āĻĒā§āĻļ āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻ° āĻāĻ¨ā§āĻ¯āĨ¤
āĻĒā§āĻ°āĻļāĻŋāĻā§āĻˇāĻŖ
āĻāĻ˛āĻĢāĻžāĻā§āĻ¯āĻžāĻ¨ā§āĻā§āĻ āĻā§āĻ˛āĻžāĻ¯āĻŧā§āĻ¨ā§āĻ
āĻĒā§āĻ°āĻĨāĻŽā§, āĻ ā§āĻ¯āĻžāĻ˛āĻĢāĻžāĻāĻžāĻ¨ā§āĻā§āĻā§āĻ° āĻ āĻ¨ā§āĻ°ā§āĻ§ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ āĻā§āĻ aiohttp āĻā§āĻ˛āĻžāĻ¯āĻŧā§āĻ¨ā§āĻ āĻ˛āĻŋāĻāĻŋāĨ¤
āĻāĻā§āĻˇāĻ
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
āĻĒā§āĻ°āĻā§āĻ¤āĻĒāĻā§āĻˇā§, āĻāĻāĻŋ āĻĨā§āĻā§ āĻ¸āĻŦāĻāĻŋāĻā§ āĻĒāĻ°āĻŋāĻˇā§āĻāĻžāĻ°:
-
AlphaVantage API āĻŦā§āĻļ āĻ¸āĻšāĻ āĻāĻŦāĻ āĻ¸ā§āĻ¨ā§āĻĻāĻ°āĻāĻžāĻŦā§ āĻĄāĻŋāĻāĻžāĻāĻ¨ āĻāĻ°āĻž āĻšāĻ¯āĻŧā§āĻā§, āĻ¤āĻžāĻ āĻāĻŽāĻŋ āĻĒāĻĻā§āĻ§āĻ¤āĻŋāĻ° āĻŽāĻžāĻ§ā§āĻ¯āĻŽā§ āĻ¸āĻŽāĻ¸ā§āĻ¤ āĻ āĻ¨ā§āĻ°ā§āĻ§ āĻāĻ°āĻžāĻ° āĻ¸āĻŋāĻĻā§āĻ§āĻžāĻ¨ā§āĻ¤ āĻ¨āĻŋāĻ¯āĻŧā§āĻāĻŋ
construct_query
āĻ¯ā§āĻāĻžāĻ¨ā§ āĻā§āĻ°ā§ āĻāĻāĻāĻŋ http āĻāĻ˛ āĻāĻā§āĨ¤ -
āĻāĻŽāĻŋ āĻ¸āĻŦ āĻā§āĻˇā§āĻ¤ā§āĻ° āĻāĻ¨āĻž
snake_case
āĻāĻ°āĻžāĻŽā§āĻ° āĻāĻ¨ā§āĻ¯ -
āĻāĻžāĻ˛, āĻ¸ā§āĻ¨ā§āĻĻāĻ° āĻāĻŦāĻ āĻ¤āĻĨā§āĻ¯āĻĒā§āĻ°ā§āĻŖ āĻā§āĻ°ā§āĻ¸āĻŦā§āĻ¯āĻžāĻ āĻāĻāĻāĻĒā§āĻ āĻāĻ¨ā§āĻ¯ logger.catch āĻ¸āĻā§āĻāĻž.
PS config.yml-āĻ āĻ¸ā§āĻĨāĻžāĻ¨ā§āĻ¯āĻŧāĻāĻžāĻŦā§ alphavantage āĻā§āĻā§āĻ¨ āĻ¯ā§āĻ āĻāĻ°āĻ¤ā§ āĻā§āĻ˛āĻŦā§āĻ¨ āĻ¨āĻž, āĻ
āĻĨāĻŦāĻž āĻāĻ¨āĻāĻžāĻ¯āĻŧāĻ°āĻ¨āĻŽā§āĻ¨ā§āĻ āĻā§āĻ°āĻŋāĻ¯āĻŧā§āĻŦāĻ˛ āĻāĻā§āĻ¸āĻĒā§āĻ°ā§āĻ āĻāĻ°āĻ¤ā§ āĻā§āĻ˛āĻŦā§āĻ¨ āĻ¨āĻž HORTON_SERVICE_APIKEY
. āĻāĻŽāĻ°āĻž āĻāĻāĻāĻŋ āĻā§āĻā§āĻ¨ āĻā§āĻ°āĻšāĻŖ āĻāĻ°āĻŋ
CRUD āĻā§āĻ˛āĻžāĻ¸
āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻ āĻ¸āĻŽā§āĻĒāĻ°ā§āĻā§ āĻŽā§āĻāĻž āĻ¤āĻĨā§āĻ¯ āĻ¸āĻāĻ°āĻā§āĻˇāĻŖ āĻāĻ°āĻžāĻ° āĻāĻ¨ā§āĻ¯ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻāĻāĻŋ āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻ āĻ¸āĻāĻā§āĻ°āĻš āĻĨāĻžāĻāĻŦā§āĨ¤
āĻāĻŽāĻžāĻ° āĻŽāĻ¤ā§, āĻāĻāĻžāĻ¨ā§ āĻāĻŋāĻā§ āĻŦā§āĻ¯āĻžāĻā§āĻ¯āĻž āĻāĻ°āĻžāĻ° āĻĒā§āĻ°āĻ¯āĻŧā§āĻāĻ¨ āĻ¨ā§āĻ, āĻāĻŦāĻ āĻŦā§āĻ¸ āĻā§āĻ˛āĻžāĻ¸ āĻ¨āĻŋāĻā§āĻ āĻŦā§āĻļ āĻ¸āĻšāĻāĨ¤
āĻ ā§āĻ¯āĻžāĻĒ āĻāĻŋ āĻ¨āĻŋāĻ¨()
āĻāĻāĻāĻŋ āĻ
ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻā§āĻļāĻ¨ āĻŦāĻ¸ā§āĻ¤ā§ āĻ¤ā§āĻ°āĻŋ āĻāĻ°āĻžāĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ āĻĢāĻžāĻāĻļāĻ¨ āĻ¯ā§āĻ āĻāĻ°āĻž āĻ¯āĻžāĻ
āĻāĻā§āĻˇāĻ
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
āĻāĻĒāĻžāĻ¤āĻ¤ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻžāĻā§ āĻ¸āĻšāĻāĻ¤āĻŽ āĻ
ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻā§āĻļāĻ¨ āĻ¤ā§āĻ°āĻŋ āĻšāĻŦā§, āĻāĻāĻā§ āĻĒāĻ°ā§ āĻāĻŽāĻ°āĻž āĻāĻāĻŋāĻā§ āĻĒā§āĻ°āĻ¸āĻžāĻ°āĻŋāĻ¤ āĻāĻ°āĻŦ, āĻ¤āĻŦā§, āĻāĻĒāĻ¨āĻžāĻā§ āĻ
āĻĒā§āĻā§āĻˇāĻž āĻ¨āĻž āĻāĻ°āĻžāĻ° āĻāĻ¨ā§āĻ¯, āĻāĻāĻžāĻ¨ā§
āĻĒā§āĻ°āĻ§āĻžāĻ¨ āĻ āĻāĻļ
āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻā§āĻ° āĻ¤āĻžāĻ˛āĻŋāĻāĻž āĻ¸āĻāĻā§āĻ°āĻš āĻ āĻ°āĻā§āĻˇāĻŖāĻžāĻŦā§āĻā§āĻˇāĻŖā§āĻ° āĻāĻ¨ā§āĻ¯ āĻāĻā§āĻ¨ā§āĻ
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
āĻ¸ā§āĻ¤āĻ°āĻžāĻ, āĻĒā§āĻ°āĻĨāĻŽā§ āĻāĻŽāĻ°āĻž āĻĢāĻžāĻ¸ā§āĻ āĻ ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻā§āĻļāĻ¨ āĻ āĻŦāĻā§āĻā§āĻāĻāĻŋ āĻĒāĻžāĻ - āĻāĻāĻŋ āĻŦā§āĻļ āĻ¸āĻšāĻāĨ¤ āĻāĻ° āĻĒāĻ°ā§, āĻāĻŽāĻ°āĻž āĻ¸ā§āĻĒāĻˇā§āĻāĻāĻžāĻŦā§ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻā§āĻ¨ā§āĻā§āĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻā§āĻˇāĻŖāĻž āĻāĻ°āĻŋ... āĻāĻāĻžāĻ¨ā§ āĻāĻāĻŋ āĻā§, āĻ āĻā§āĻ¯āĻ¨ā§āĻ¤āĻ°ā§āĻŖ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻāĻžāĻ° āĻā§ āĻāĻŦāĻ āĻāĻāĻŋāĻā§ āĻā§āĻāĻžāĻŦā§ āĻāĻ˛āĻžāĻĻāĻžāĻāĻžāĻŦā§ āĻ¸āĻžāĻāĻžāĻ¨ā§ āĻ¯ā§āĻ¤ā§ āĻĒāĻžāĻ°ā§ āĻ¤āĻž āĻāĻ˛ā§āĻ˛ā§āĻ āĻāĻ°āĻžāĻ° āĻŽāĻ¤ā§āĨ¤
-
āĻāĻžāĻĢāĻāĻžāĻ° āĻāĻĒāĻŋāĻāĻ¸, āĻ¯āĻĻāĻŋ āĻāĻŽāĻ°āĻž āĻ¸āĻ āĻŋāĻ āĻ¸āĻāĻā§āĻāĻž āĻāĻžāĻ¨āĻ¤ā§ āĻāĻžāĻ, āĻ¤āĻžāĻšāĻ˛ā§ āĻĒāĻĄāĻŧāĻž āĻāĻžāĻ˛ā§
āĻŦāĻ¨ā§āĻ§ āĻ¨āĻĨāĻŋ , āĻ āĻĨāĻŦāĻž āĻāĻĒāĻ¨āĻŋ āĻĒāĻĄāĻŧāĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨āĻŦāĻŋāĻŽā§āĻ°ā§āĻ¤ āĻ°āĻžāĻļāĻŋāĻ¯āĻŧāĻžāĻ¨ āĻāĻžāĻˇāĻžāĻ¯āĻŧ āĻšāĻžāĻŦā§āĻ°ā§āĻ¤ā§, āĻ¯ā§āĻāĻžāĻ¨ā§ āĻ¸āĻŦāĻāĻŋāĻā§ āĻŦā§āĻļ āĻ¸āĻ āĻŋāĻāĻāĻžāĻŦā§ āĻĒā§āĻ°āĻ¤āĻŋāĻĢāĻ˛āĻŋāĻ¤ āĻšāĻ¯āĻŧ :) -
āĻ āĻā§āĻ¯āĻ¨ā§āĻ¤āĻ°ā§āĻŖ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻāĻžāĻ° , faust āĻĄāĻ-āĻ āĻŦā§āĻļ āĻāĻžāĻ˛āĻāĻžāĻŦā§ āĻŦāĻ°ā§āĻŖāĻ¨āĻž āĻāĻ°āĻž āĻšāĻ¯āĻŧā§āĻā§, āĻāĻŽāĻžāĻĻā§āĻ°āĻā§ āĻā§āĻĄā§ āĻ¸āĻ°āĻžāĻ¸āĻ°āĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻāĻ¨āĻĢāĻŋāĻāĻžāĻ° āĻāĻ°āĻ¤ā§ āĻĻā§āĻ¯āĻŧ, āĻ āĻŦāĻļā§āĻ¯āĻ, āĻāĻ° āĻ āĻ°ā§āĻĨ āĻĢāĻžāĻ¸ā§āĻ āĻĄā§āĻā§āĻ˛āĻĒāĻžāĻ°āĻĻā§āĻ° āĻĻā§āĻŦāĻžāĻ°āĻž āĻĒā§āĻ°āĻĻāĻ¤ā§āĻ¤ āĻĒāĻ°āĻžāĻŽāĻŋāĻ¤āĻŋāĻā§āĻ˛āĻŋ, āĻāĻĻāĻžāĻšāĻ°āĻŖāĻ¸ā§āĻŦāĻ°ā§āĻĒ: āĻ§āĻžāĻ°āĻŖ, āĻ§āĻžāĻ°āĻŖ āĻ¨ā§āĻ¤āĻŋ (āĻĄāĻŋāĻĢāĻ˛ā§āĻāĻ°ā§āĻĒā§ āĻŽā§āĻā§ āĻĢā§āĻ˛ā§āĻ¨, āĻ¤āĻŦā§ āĻāĻĒāĻ¨āĻŋ āĻ¸ā§āĻ āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨āĻ¨āĻŋāĻā§āĻāĻŋāĻĻā§āĻ° ), āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻĒā§āĻ°āĻ¤āĻŋ āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ā§āĻ° āĻ¸āĻāĻā§āĻ¯āĻž (āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ āĻāĻ°āĻ¤ā§, āĻāĻĻāĻžāĻšāĻ°āĻŖāĻ¸ā§āĻŦāĻ°ā§āĻĒ, āĻāĻ° āĻā§āĻ¯āĻŧā§ āĻāĻŽāĻŦāĻŋāĻļā§āĻŦāĻŦā§āĻ¯āĻžāĻĒā§ āĻ¤āĻžā§āĻĒāĻ°ā§āĻ¯ āĻ ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻā§āĻļāĻ¨ āĻĢāĻžāĻ¸ā§āĻ)āĨ¤ -
āĻ¸āĻžāĻ§āĻžāĻ°āĻŖāĻāĻžāĻŦā§, āĻāĻā§āĻ¨ā§āĻ āĻŦāĻŋāĻļā§āĻŦāĻŦā§āĻ¯āĻžāĻĒā§ āĻŽāĻžāĻ¨ āĻ¸āĻš āĻāĻāĻāĻŋ āĻĒāĻ°āĻŋāĻāĻžāĻ˛āĻŋāĻ¤ āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻ¤ā§āĻ°āĻŋ āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§, āĻ¤āĻŦā§, āĻāĻŽāĻŋ āĻ¸ā§āĻĒāĻˇā§āĻāĻāĻžāĻŦā§ āĻ¸āĻŦāĻāĻŋāĻā§ āĻā§āĻˇāĻŖāĻž āĻāĻ°āĻ¤ā§ āĻāĻžāĻāĨ¤ āĻāĻĒāĻ°āĻ¨ā§āĻ¤ā§, āĻāĻā§āĻ¨ā§āĻ āĻŦāĻŋāĻā§āĻāĻžāĻĒāĻ¨ā§ āĻŦāĻŋāĻˇāĻ¯āĻŧā§āĻ° āĻāĻŋāĻā§ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻāĻžāĻ° (āĻāĻĻāĻžāĻšāĻ°āĻŖāĻ¸ā§āĻŦāĻ°ā§āĻĒ, āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ā§āĻ° āĻ¸āĻāĻā§āĻ¯āĻž āĻŦāĻž āĻ§āĻ°ā§ āĻ°āĻžāĻāĻžāĻ° āĻ¨ā§āĻ¤āĻŋ) āĻāĻ¨āĻĢāĻŋāĻāĻžāĻ° āĻāĻ°āĻž āĻ¯āĻžāĻŦā§ āĻ¨āĻžāĨ¤
āĻŽā§āĻ¯āĻžāĻ¨ā§āĻ¯āĻŧāĻžāĻ˛āĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧāĻāĻŋ āĻ¸āĻāĻā§āĻāĻžāĻ¯āĻŧāĻŋāĻ¤ āĻ¨āĻž āĻāĻ°ā§ āĻāĻāĻŋ āĻĻā§āĻāĻ¤ā§ āĻā§āĻŽāĻ¨ āĻšāĻ¤ā§ āĻĒāĻžāĻ°ā§ āĻ¤āĻž āĻāĻāĻžāĻ¨ā§:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
āĻāĻā§āĻāĻž, āĻāĻāĻ¨ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻā§āĻ¨ā§āĻ āĻā§ āĻāĻ°āĻŦā§ āĻ¤āĻž āĻŦāĻ°ā§āĻŖāĻ¨āĻž āĻāĻ°āĻŋ :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
āĻ¸ā§āĻ¤āĻ°āĻžāĻ, āĻāĻā§āĻ¨ā§āĻā§āĻ° āĻļā§āĻ°ā§āĻ¤ā§, āĻāĻŽāĻ°āĻž āĻāĻŽāĻžāĻĻā§āĻ° āĻā§āĻ˛āĻžāĻ¯āĻŧā§āĻ¨ā§āĻā§āĻ° āĻŽāĻžāĻ§ā§āĻ¯āĻŽā§ āĻ
āĻ¨ā§āĻ°ā§āĻ§ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ aiohttp āĻ¸ā§āĻļāĻ¨ āĻā§āĻ˛āĻŋāĨ¤ āĻāĻāĻāĻžāĻŦā§, āĻāĻāĻāĻ¨ āĻāĻ°ā§āĻŽā§ āĻļā§āĻ°ā§ āĻāĻ°āĻžāĻ° āĻ¸āĻŽāĻ¯āĻŧ, āĻ¯āĻāĻ¨ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻā§āĻ¨ā§āĻ āĻāĻžāĻ˛ā§ āĻāĻ°āĻž āĻšāĻ¯āĻŧ, āĻ¤āĻāĻ¨āĻ āĻāĻāĻāĻŋ āĻ¸ā§āĻļāĻ¨ āĻā§āĻ˛āĻž āĻšāĻŦā§ - āĻāĻ, āĻĒā§āĻ°ā§ āĻ¸āĻŽāĻ¯āĻŧ āĻāĻ°ā§āĻŽā§ āĻāĻ˛āĻā§ (āĻ
āĻĨāĻŦāĻž āĻāĻāĻžāĻ§āĻŋāĻ, āĻ¯āĻĻāĻŋ āĻāĻĒāĻ¨āĻŋ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻāĻžāĻ° āĻĒāĻ°āĻŋāĻŦāĻ°ā§āĻ¤āĻ¨ āĻāĻ°ā§āĻ¨)
āĻāĻ°āĻĒāĻ°ā§, āĻāĻŽāĻ°āĻž āĻ¸ā§āĻā§āĻ°ā§āĻŽ āĻ
āĻ¨ā§āĻ¸āĻ°āĻŖ āĻāĻ°āĻŋ (āĻāĻŽāĻ°āĻž āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻāĻŋ āĻ°āĻžāĻāĻŋ _
, āĻ¯ā§āĻšā§āĻ¤ā§ āĻāĻŽāĻ°āĻž, āĻāĻ āĻāĻā§āĻ¨ā§āĻā§, āĻāĻŽāĻžāĻĻā§āĻ° āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻĨā§āĻā§ āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻā§āĻ˛āĻŋāĻ° āĻŦāĻŋāĻˇāĻ¯āĻŧāĻŦāĻ¸ā§āĻ¤ā§ āĻ¸āĻŽā§āĻĒāĻ°ā§āĻā§ āĻāĻŋāĻ¨ā§āĻ¤āĻž āĻāĻ°āĻŋ āĻ¨āĻž, āĻ¯āĻĻāĻŋ āĻ¸ā§āĻā§āĻ˛āĻŋ āĻŦāĻ°ā§āĻ¤āĻŽāĻžāĻ¨ āĻ
āĻĢāĻ¸ā§āĻā§ āĻŦāĻŋāĻĻā§āĻ¯āĻŽāĻžāĻ¨ āĻĨāĻžāĻā§, āĻ
āĻ¨ā§āĻ¯āĻĨāĻžāĻ¯āĻŧ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻā§āĻ° āĻ¤āĻžāĻĻā§āĻ° āĻāĻāĻŽāĻ¨ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻ
āĻĒā§āĻā§āĻˇāĻž āĻāĻ°āĻŦā§āĨ¤ āĻ āĻŋāĻ āĻāĻā§, āĻāĻŽāĻžāĻĻā§āĻ° āĻ˛ā§āĻĒā§āĻ° āĻāĻŋāĻ¤āĻ°ā§, āĻāĻŽāĻ°āĻž āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻ° āĻ°āĻ¸āĻŋāĻĻ āĻ˛āĻ āĻāĻ°āĻŋ, āĻ¸āĻā§āĻ°āĻŋāĻ¯āĻŧā§āĻ° āĻāĻāĻāĻŋ āĻ¤āĻžāĻ˛āĻŋāĻāĻž āĻĒāĻžāĻ (get_securities āĻļā§āĻ§ā§āĻŽāĻžāĻ¤ā§āĻ° āĻĄāĻŋāĻĢāĻ˛ā§āĻāĻ°ā§āĻĒā§ āĻ¸āĻā§āĻ°āĻŋāĻ¯āĻŧ āĻšāĻ¯āĻŧ, āĻā§āĻ˛āĻžāĻ¯āĻŧā§āĻ¨ā§āĻ āĻā§āĻĄ āĻĻā§āĻā§āĻ¨) āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻ āĻāĻŦāĻ āĻĄāĻžāĻāĻžāĻŦā§āĻ¸ā§ āĻ¸āĻāĻ°āĻā§āĻˇāĻŖ āĻāĻ°āĻŋ, āĻāĻāĻ āĻāĻŋāĻāĻžāĻ°ā§āĻ° āĻ¸āĻžāĻĨā§ āĻāĻāĻāĻŋ āĻ¨āĻŋāĻ°āĻžāĻĒāĻ¤ā§āĻ¤āĻž āĻāĻā§ āĻāĻŋāĻ¨āĻž āĻ¤āĻž āĻĒāĻ°ā§āĻā§āĻˇāĻž āĻāĻ°ā§ āĻāĻŦāĻ āĻĄāĻžāĻāĻžāĻŦā§āĻ¸ā§ āĻŦāĻŋāĻ¨āĻŋāĻŽāĻ¯āĻŧ, āĻ¯āĻĻāĻŋ āĻĨāĻžāĻā§, āĻ¤āĻŦā§ āĻāĻāĻŋ (āĻāĻžāĻāĻ) āĻā§āĻŦāĻ˛ āĻāĻĒāĻĄā§āĻ āĻāĻ°āĻž āĻšāĻŦā§āĨ¤
āĻāĻŽāĻžāĻĻā§āĻ° āĻ¸ā§āĻˇā§āĻāĻŋ āĻāĻžāĻ˛ā§ āĻāĻ°āĻž āĻ¯āĻžāĻ!
> docker-compose up -d
... ĐĐ°ĐŋŅŅĐē ĐēĐžĐŊŅĐĩĐšĐŊĐĩŅОв ...
> faust -A horton.agents worker --without-web -l info
āĻĒāĻŋāĻāĻ¸ āĻŦā§āĻļāĻŋāĻˇā§āĻā§āĻ¯
āĻāĻŽāĻžāĻĻā§āĻ° āĻ˛āĻā§āĻ āĻāĻŽāĻžāĻ¨ā§āĻĄā§, āĻāĻŽāĻ°āĻž āĻ¤āĻĨā§āĻ¯ āĻ˛āĻ āĻāĻāĻāĻĒā§āĻ āĻ¸ā§āĻ¤āĻ°ā§āĻ° āĻ¸āĻžāĻĨā§ āĻ ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻā§āĻļāĻ¨ āĻ āĻŦāĻā§āĻā§āĻāĻāĻŋ āĻā§āĻĨāĻžāĻ¯āĻŧ āĻā§āĻāĻāĻ¤ā§ āĻšāĻŦā§ āĻāĻŦāĻ āĻāĻāĻŋāĻ° āĻ¸āĻžāĻĨā§ āĻā§ āĻāĻ°āĻ¤ā§ āĻšāĻŦā§ (āĻāĻāĻāĻ¨ āĻāĻ°ā§āĻŽā§ āĻ˛āĻā§āĻ āĻāĻ°ā§āĻ¨) āĻŦāĻ˛ā§āĻāĻŋāĻ˛āĻžāĻŽāĨ¤ āĻāĻŽāĻ°āĻž āĻ¨āĻŋāĻŽā§āĻ¨āĻ˛āĻŋāĻāĻŋāĻ¤ āĻāĻāĻāĻĒā§āĻ āĻĒā§āĻ¤ā§:
āĻāĻā§āĻˇāĻ
âÆaÂĩSâ v1.10.4âŦââââââââââââââââââââââââââââââââââââââââââââââââââââ
â id â horton â
â transport â [URL('kafka://localhost:9092')] â
â store â memory: â
â log â -stderr- (info) â
â pid â 1271262 â
â hostname â host-name â
â platform â CPython 3.8.2 (Linux x86_64) â
â drivers â â
â transport â aiokafka=1.1.6 â
â web â aiohttp=3.6.2 â
â datadir â /path/to/project/horton-data â
â appdir â /path/to/project/horton-data/v1 â
âââââââââââââââ´ââââââââââââââââââââââââââââââââââââââââââââââââââââ
... ĐģĐžĐŗи, ĐģĐžĐŗи, ĐģĐžĐŗи ...
âTopic Partition SetââââââââââŦâââââââââââââ
â topic â partitions â
ââââââââââââââââââââââââââââââŧâââââââââââââ¤
â collect_securities â {0-7} â
â horton-__assignor-__leader â {0} â
ââââââââââââââââââââââââââââââ´âââââââââââââ
āĻāĻāĻž āĻā§āĻŦāĻŋāĻ¤!!!
āĻāĻ¸ā§āĻ¨ āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ āĻ¸ā§āĻāĻāĻŋ āĻĻā§āĻāĻŋāĨ¤ āĻāĻŽāĻ°āĻž āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻā§āĻāĻŋ, āĻāĻāĻāĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻ¤ā§āĻ°āĻŋ āĻāĻ°āĻž āĻšāĻ¯āĻŧā§āĻāĻŋāĻ˛ āĻ¯ā§ āĻ¨āĻžāĻŽāĻāĻŋ āĻāĻŽāĻ°āĻž āĻā§āĻĄā§ āĻŽāĻ¨ā§āĻ¨ā§āĻ¤ āĻāĻ°ā§āĻāĻŋ, āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ā§āĻ° āĻĄāĻŋāĻĢāĻ˛ā§āĻ āĻ¸āĻāĻā§āĻ¯āĻž (8, āĻĨā§āĻā§ āĻ¨ā§āĻāĻ¯āĻŧāĻž
āĻ āĻŋāĻ āĻāĻā§, āĻāĻāĻ¨ āĻāĻŽāĻ°āĻž āĻ āĻ¨ā§āĻ¯ āĻāĻžāĻ°ā§āĻŽāĻŋāĻ¨āĻžāĻ˛ āĻāĻāĻ¨ā§āĻĄā§āĻ¤ā§ āĻ¯ā§āĻ¤ā§ āĻĒāĻžāĻ°āĻŋ āĻāĻŦāĻ āĻāĻŽāĻžāĻĻā§āĻ° āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻāĻāĻāĻŋ āĻāĻžāĻ˛āĻŋ āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻĒāĻžāĻ āĻžāĻ¤ā§ āĻĒāĻžāĻ°āĻŋ:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
āĻĒāĻŋāĻāĻ¸ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§ @
āĻāĻŽāĻ°āĻž āĻĻā§āĻāĻžāĻ āĻ¯ā§ āĻāĻŽāĻ°āĻž "collect_securities" āĻ¨āĻžāĻŽā§āĻ° āĻāĻāĻāĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻāĻāĻāĻŋ āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻĒāĻžāĻ āĻžāĻā§āĻāĻŋāĨ¤
āĻāĻ āĻā§āĻˇā§āĻ¤ā§āĻ°ā§, āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻāĻŋ āĻĒāĻžāĻ°ā§āĻāĻŋāĻļāĻ¨ 6-āĻ āĻāĻŋāĻ¯āĻŧā§āĻāĻŋāĻ˛ - āĻāĻĒāĻ¨āĻŋ āĻā§āĻ¯āĻžāĻĢāĻĄā§āĻ°āĻĒ āĻ
āĻ¨-āĻ āĻāĻŋāĻ¯āĻŧā§ āĻāĻāĻŋ āĻĒāĻ°ā§āĻā§āĻˇāĻž āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨ localhost:9000
āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻ°ā§āĻŽā§āĻ° āĻ¸āĻžāĻĨā§ āĻāĻžāĻ°ā§āĻŽāĻŋāĻ¨āĻžāĻ˛ āĻāĻāĻ¨ā§āĻĄā§āĻ¤ā§ āĻāĻŋāĻ¯āĻŧā§, āĻāĻŽāĻ°āĻž loguru āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§ āĻĒāĻžāĻ āĻžāĻ¨ā§ āĻāĻāĻāĻŋ āĻā§āĻļāĻŋāĻ° āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻŦ:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
āĻāĻŽāĻ°āĻž āĻŽāĻā§āĻā§ (Robo3T āĻŦāĻž Studio3T āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§) āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻ°āĻŋ āĻāĻŦāĻ āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻ°āĻŋ āĻ¯ā§ āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻā§āĻ˛āĻŋ āĻĄāĻžāĻāĻžāĻŦā§āĻ¸ā§ āĻ°āĻ¯āĻŧā§āĻā§:
āĻāĻŽāĻŋ āĻŦāĻŋāĻ˛āĻŋāĻ¯āĻŧāĻ¨ā§āĻ¯āĻŧāĻžāĻ° āĻ¨āĻ, āĻāĻŦāĻ āĻ¤āĻžāĻ āĻāĻŽāĻ°āĻž āĻĒā§āĻ°āĻĨāĻŽ āĻĻā§āĻāĻžāĻ° āĻŦāĻŋāĻāĻ˛ā§āĻĒā§ āĻ¸āĻ¨ā§āĻ¤ā§āĻˇā§āĻāĨ¤
āĻ¸ā§āĻ āĻāĻŦāĻ āĻāĻ¨āĻ¨ā§āĻĻ - āĻĒā§āĻ°āĻĨāĻŽ āĻāĻā§āĻ¨ā§āĻ āĻĒā§āĻ°āĻ¸ā§āĻ¤ā§āĻ¤ :)
āĻāĻā§āĻ¨ā§āĻ āĻĒā§āĻ°āĻ¸ā§āĻ¤ā§āĻ¤, āĻ¨āĻ¤ā§āĻ¨ āĻāĻā§āĻ¨ā§āĻ āĻĻā§āĻ°ā§āĻāĻā§āĻŦā§ āĻšā§āĻ!
āĻšā§āĻ¯āĻžāĻ, āĻāĻĻā§āĻ°āĻ˛ā§āĻ, āĻāĻŽāĻ°āĻž āĻāĻ āĻ¨āĻŋāĻŦāĻ¨ā§āĻ§āĻāĻŋ āĻĻā§āĻŦāĻžāĻ°āĻž āĻĒā§āĻ°āĻ¸ā§āĻ¤ā§āĻ¤ āĻĒāĻĨā§āĻ° āĻŽāĻžāĻ¤ā§āĻ° 1/3 āĻ āĻāĻļ āĻāĻāĻžāĻ° āĻāĻ°ā§āĻāĻŋ, āĻ¤āĻŦā§ āĻ¨āĻŋāĻ°ā§ā§āĻ¸āĻžāĻšāĻŋāĻ¤ āĻšāĻŦā§āĻ¨ āĻ¨āĻž, āĻāĻžāĻ°āĻŖ āĻāĻāĻ¨ āĻāĻāĻŋ āĻāĻ°āĻ āĻ¸āĻšāĻ āĻšāĻŦā§āĨ¤
āĻ¸ā§āĻ¤āĻ°āĻžāĻ āĻāĻāĻ¨ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻŽāĻ¨ āĻāĻāĻāĻ¨ āĻāĻā§āĻ¨ā§āĻā§āĻ° āĻĒā§āĻ°āĻ¯āĻŧā§āĻāĻ¨ āĻ¯āĻž āĻŽā§āĻāĻž āĻ¤āĻĨā§āĻ¯ āĻ¸āĻāĻā§āĻ°āĻš āĻāĻ°ā§ āĻāĻŦāĻ āĻāĻāĻāĻŋ āĻ¸āĻāĻā§āĻ°āĻš āĻ¨āĻĨāĻŋāĻ¤ā§ āĻ°āĻžāĻā§:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
āĻ¯ā§āĻšā§āĻ¤ā§ āĻāĻ āĻāĻā§āĻ¨ā§āĻ āĻāĻāĻāĻŋ āĻ¨āĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āĻ āĻ¨āĻŋāĻ°āĻžāĻĒāĻ¤ā§āĻ¤āĻž āĻ¸āĻŽā§āĻĒāĻ°ā§āĻā§ āĻ¤āĻĨā§āĻ¯ āĻĒā§āĻ°āĻā§āĻ°āĻŋāĻ¯āĻŧāĻž āĻāĻ°āĻŦā§, āĻ¤āĻžāĻ āĻāĻŽāĻžāĻĻā§āĻ° āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻ¯āĻŧ āĻāĻ āĻ¨āĻŋāĻ°āĻžāĻĒāĻ¤ā§āĻ¤āĻžāĻ° āĻāĻŋāĻāĻžāĻ° (āĻĒā§āĻ°āĻ¤ā§āĻ) āĻ¨āĻŋāĻ°ā§āĻĻā§āĻļ āĻāĻ°āĻ¤ā§ āĻšāĻŦā§āĨ¤ āĻāĻ āĻāĻĻā§āĻĻā§āĻļā§āĻ¯ā§ āĻĢāĻžāĻāĻ¸ā§āĻā§ āĻāĻā§
āĻāĻ āĻā§āĻˇā§āĻ¤ā§āĻ°ā§, āĻāĻ° āĻ¯āĻžāĻ¨
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
āĻāĻĒāĻ¨āĻŋ āĻ
āĻ¨ā§āĻŽāĻžāĻ¨ āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨, faust āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻ¸ā§āĻāĻŋāĻŽāĻž āĻŦāĻ°ā§āĻŖāĻ¨āĻž āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻāĻĨāĻ¨ āĻāĻžāĻāĻĒ āĻā§āĻāĻž āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§, āĻ¯āĻžāĻ° āĻāĻžāĻ°āĻŖā§ āĻ˛āĻžāĻāĻŦā§āĻ°ā§āĻ°āĻŋ āĻĻā§āĻŦāĻžāĻ°āĻž āĻ¸āĻŽāĻ°ā§āĻĨāĻŋāĻ¤ āĻ¸āĻ°ā§āĻŦāĻ¨āĻŋāĻŽā§āĻ¨ āĻ¸āĻāĻ¸ā§āĻāĻ°āĻŖ
āĻāĻ¸ā§āĻ¨ āĻāĻā§āĻ¨ā§āĻā§ āĻĢāĻŋāĻ°ā§ āĻ¯āĻžāĻ, āĻĒā§āĻ°āĻāĻžāĻ°āĻā§āĻ˛āĻŋ āĻ¸ā§āĻ āĻāĻ°ā§āĻ¨ āĻāĻŦāĻ āĻāĻāĻŋ āĻ¯ā§āĻ āĻāĻ°ā§āĻ¨:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
āĻāĻĒāĻ¨āĻŋ āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻā§āĻā§āĻ¨, āĻāĻŽāĻ°āĻž āĻāĻĒāĻŋāĻ āĻĒā§āĻ°āĻžāĻ°āĻŽā§āĻāĻŋāĻ āĻĒāĻĻā§āĻ§āĻ¤āĻŋāĻ¤ā§ āĻāĻāĻāĻŋ āĻ¸ā§āĻāĻŋāĻŽ āĻ¸āĻš āĻāĻāĻāĻŋ āĻ¨āĻ¤ā§āĻ¨ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻāĻžāĻ° āĻĒāĻžāĻ¸ āĻāĻ°āĻŋ - value_typeāĨ¤ āĻāĻ°āĻ, āĻ¸āĻŦāĻāĻŋāĻā§ āĻāĻāĻ āĻ¸ā§āĻāĻŋāĻŽ āĻ āĻ¨ā§āĻ¸āĻ°āĻŖ āĻāĻ°ā§, āĻ¤āĻžāĻ āĻāĻŽāĻŋ āĻ āĻ¨ā§āĻ¯ āĻāĻŋāĻā§āĻ¤ā§ āĻĨāĻžāĻāĻžāĻ° āĻā§āĻ¨ āĻŦāĻŋāĻ¨ā§āĻĻā§ āĻĻā§āĻāĻ¤ā§ āĻĒāĻžāĻā§āĻāĻŋ āĻ¨āĻžāĨ¤
āĻ āĻŋāĻ āĻāĻā§, āĻā§āĻĄāĻŧāĻžāĻ¨ā§āĻ¤ āĻ¸ā§āĻĒāĻ°ā§āĻļ āĻšāĻ˛ āĻŽā§āĻāĻž āĻ¤āĻĨā§āĻ¯ āĻ¸āĻāĻā§āĻ°āĻšāĻāĻžāĻ°ā§ āĻāĻā§āĻ¨ā§āĻāĻā§ collect_securitites āĻāĻ°āĻžāĻ° āĻāĻ¨ā§āĻ¯ āĻāĻāĻāĻŋ āĻāĻ˛ āĻ¯ā§āĻ āĻāĻ°āĻž:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
āĻāĻŽāĻ°āĻž āĻŦāĻžāĻ°ā§āĻ¤āĻžāĻ° āĻāĻ¨ā§āĻ¯ āĻĒā§āĻ°ā§āĻŦā§ āĻā§āĻˇāĻŋāĻ¤ āĻ¸ā§āĻāĻŋāĻŽ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°āĻŋāĨ¤ āĻāĻ āĻā§āĻˇā§āĻ¤ā§āĻ°ā§, āĻāĻŽāĻŋ .cast āĻĒāĻĻā§āĻ§āĻ¤āĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§āĻāĻŋ āĻ¯ā§āĻšā§āĻ¤ā§ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻā§āĻ¨ā§āĻā§āĻ° āĻĢāĻ˛āĻžāĻĢāĻ˛ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻ
āĻĒā§āĻā§āĻˇāĻž āĻāĻ°āĻ¤ā§ āĻšāĻŦā§ āĻ¨āĻž, āĻ¤āĻŦā§ āĻāĻāĻŋ āĻāĻ˛ā§āĻ˛ā§āĻ āĻāĻ°āĻžāĻ° āĻŽāĻ¤ā§
-
cast - āĻŦā§āĻ˛āĻ āĻāĻ°ā§ āĻ¨āĻž āĻāĻžāĻ°āĻŖ āĻāĻāĻŋ āĻĢāĻ˛āĻžāĻĢāĻ˛ āĻāĻļāĻž āĻāĻ°ā§ āĻ¨āĻžāĨ¤ āĻāĻĒāĻ¨āĻŋ āĻāĻāĻāĻŋ āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻšāĻŋāĻ¸āĻžāĻŦā§ āĻ āĻ¨ā§āĻ¯ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻĢāĻ˛āĻžāĻĢāĻ˛ āĻĒāĻžāĻ āĻžāĻ¤ā§ āĻĒāĻžāĻ°āĻŦā§āĻ¨ āĻ¨āĻž.
-
send - āĻŦā§āĻ˛āĻ āĻāĻ°ā§ āĻ¨āĻž āĻāĻžāĻ°āĻŖ āĻāĻāĻŋ āĻĢāĻ˛āĻžāĻĢāĻ˛ āĻāĻļāĻž āĻāĻ°ā§ āĻ¨āĻžāĨ¤ āĻāĻĒāĻ¨āĻŋ āĻāĻāĻāĻŋ āĻāĻā§āĻ¨ā§āĻ āĻāĻ˛ā§āĻ˛ā§āĻ āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨ āĻ¯ā§ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻĢāĻ˛āĻžāĻĢāĻ˛ āĻ¯āĻžāĻŦā§āĨ¤
-
āĻāĻŋāĻā§āĻāĻžāĻ¸āĻž āĻāĻ°ā§āĻ¨ - āĻĢāĻ˛āĻžāĻĢāĻ˛ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻ āĻĒā§āĻā§āĻˇāĻž āĻāĻ°ā§āĻ¨āĨ¤ āĻāĻĒāĻ¨āĻŋ āĻāĻāĻāĻŋ āĻāĻā§āĻ¨ā§āĻ āĻāĻ˛ā§āĻ˛ā§āĻ āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°ā§āĻ¨ āĻ¯ā§ āĻŦāĻŋāĻˇāĻ¯āĻŧā§ āĻĢāĻ˛āĻžāĻĢāĻ˛ āĻ¯āĻžāĻŦā§āĨ¤
āĻ¸ā§āĻ¤āĻ°āĻžāĻ, āĻāĻāĻā§āĻ° āĻāĻ¨ā§āĻ¯ āĻāĻā§āĻ¨ā§āĻāĻĻā§āĻ° āĻ¸āĻžāĻĨā§ āĻ¯ā§ āĻ¸āĻŦ!
āĻ¸ā§āĻŦāĻĒā§āĻ¨ā§āĻ° āĻĻāĻ˛
āĻļā§āĻˇ āĻāĻŋāĻ¨āĻŋāĻ¸āĻāĻŋ āĻāĻŽāĻŋ āĻāĻ āĻ āĻāĻļā§ āĻ˛āĻŋāĻāĻ¤ā§ āĻĒā§āĻ°āĻ¤āĻŋāĻļā§āĻ°ā§āĻ¤āĻŋ āĻĻāĻŋāĻ¯āĻŧā§āĻāĻŋ āĻ¤āĻž āĻšāĻ˛ āĻāĻŽāĻžāĻ¨ā§āĻĄāĨ¤ āĻāĻā§āĻ āĻāĻ˛ā§āĻ˛ā§āĻ āĻāĻ°āĻž āĻšāĻ¯āĻŧā§āĻā§, faust-āĻ āĻāĻŽāĻžāĻ¨ā§āĻĄāĻā§āĻ˛āĻŋ āĻā§āĻ˛āĻŋāĻā§āĻ° āĻāĻžāĻ°āĻĒāĻžāĻļā§ āĻāĻāĻāĻŋ āĻŽā§āĻĄāĻŧāĻāĨ¤ āĻāĻ¸āĻ˛ā§, faust -A āĻā§ āĻ¨āĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āĻ āĻāĻ°āĻžāĻ° āĻ¸āĻŽāĻ¯āĻŧ āĻāĻŽāĻžāĻĻā§āĻ° āĻāĻžāĻ¸ā§āĻāĻŽ āĻāĻŽāĻžāĻ¨ā§āĻĄāĻā§ āĻāĻ° āĻāĻ¨ā§āĻāĻžāĻ°āĻĢā§āĻ¸ā§ āĻ¸āĻāĻ¯ā§āĻā§āĻ¤ āĻāĻ°ā§
āĻā§āĻˇāĻŋāĻ¤ āĻāĻā§āĻ¨ā§āĻāĻĻā§āĻ° āĻĒāĻ°ā§
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
āĻ¸ā§āĻ¤āĻ°āĻžāĻ, āĻ¯āĻĻāĻŋ āĻāĻŽāĻ°āĻž āĻāĻŽāĻžāĻ¨ā§āĻĄā§āĻ° āĻ¤āĻžāĻ˛āĻŋāĻāĻž āĻāĻ˛ āĻāĻ°āĻŋ, āĻāĻŽāĻžāĻĻā§āĻ° āĻ¨āĻ¤ā§āĻ¨ āĻāĻŽāĻžāĻ¨ā§āĻĄ āĻāĻ¤ā§ āĻĨāĻžāĻāĻŦā§:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
āĻāĻŽāĻ°āĻž āĻāĻāĻŋāĻā§ āĻ āĻ¨ā§āĻ¯ āĻāĻžāĻ°āĻ āĻŽāĻ¤ā§ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°āĻ¤ā§ āĻĒāĻžāĻ°āĻŋ, āĻ¤āĻžāĻ āĻāĻ¸ā§āĻ¨ āĻĢāĻžāĻ¸ā§āĻ āĻāĻ¯āĻŧāĻžāĻ°ā§āĻāĻžāĻ°āĻāĻŋ āĻĒā§āĻ¨āĻ°āĻžāĻ¯āĻŧ āĻāĻžāĻ˛ā§ āĻāĻ°āĻŋ āĻāĻŦāĻ āĻ¸āĻŋāĻāĻŋāĻāĻ°āĻŋāĻāĻŋāĻā§āĻ° āĻāĻāĻāĻŋ āĻ¸āĻŽā§āĻĒā§āĻ°ā§āĻŖ āĻ¸āĻāĻā§āĻ°āĻš āĻļā§āĻ°ā§ āĻāĻ°āĻŋ:
> faust -A horton.agents start-collect-securities
āĻĒāĻ°āĻŦāĻ°ā§āĻ¤ā§āĻ¤ā§ āĻā§ āĻšāĻŦā§?
āĻĒāĻ°āĻŦāĻ°ā§āĻ¤ā§ āĻ āĻāĻļā§, āĻāĻāĻāĻŋ āĻāĻĻāĻžāĻšāĻ°āĻŖ āĻšāĻŋāĻ¸āĻžāĻŦā§ āĻ āĻŦāĻļāĻŋāĻˇā§āĻ āĻāĻā§āĻ¨ā§āĻ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻāĻ°ā§, āĻāĻŽāĻ°āĻž āĻŦāĻāĻ°ā§āĻ° āĻā§āĻ°ā§āĻĄāĻŋāĻ āĻāĻŦāĻ āĻā§āĻ°āĻ¨ āĻ˛āĻā§āĻā§āĻ° āĻāĻā§āĻ¨ā§āĻā§āĻ° āĻā§āĻ˛ā§āĻāĻŋāĻ āĻĒā§āĻ°āĻžāĻāĻ¸ā§āĻ° āĻāĻ°āĻŽāĻ¤āĻž āĻ āĻ¨ā§āĻ¸āĻ¨ā§āĻ§āĻžāĻ¨ā§āĻ° āĻāĻ¨ā§āĻ¯ āĻ¸āĻŋāĻā§āĻ āĻĒā§āĻ°āĻā§āĻ°āĻŋāĻ¯āĻŧāĻž āĻŦāĻŋāĻŦā§āĻāĻ¨āĻž āĻāĻ°āĻŦāĨ¤
āĻāĻ āĻ¯ā§ āĻāĻ¨ā§āĻ¯ āĻ¸āĻŦ! āĻĒāĻĄāĻŧāĻžāĻ° āĻāĻ¨ā§āĻ¯ āĻ§āĻ¨ā§āĻ¯āĻŦāĻžāĻĻ :)
āĻĒāĻŋāĻāĻ¸ āĻļā§āĻˇ āĻ
āĻāĻļā§āĻ° āĻ
āĻ§ā§āĻ¨ā§ āĻāĻŽāĻžāĻā§ āĻĢāĻžāĻ¸ā§āĻ āĻāĻŦāĻ āĻ¸āĻā§āĻāĻŽ āĻāĻžāĻĢāĻāĻž āĻ¸āĻŽā§āĻĒāĻ°ā§āĻā§ āĻāĻŋāĻā§āĻāĻžāĻ¸āĻž āĻāĻ°āĻž āĻšāĻ¯āĻŧā§āĻāĻŋāĻ˛ (
āĻāĻ¤ā§āĻ¸: www.habr.com