เบเบฒเบเบฐเบฅเบฒเบเปเบเบทเปเบญเบซเบฒ
-
เบเบฒเบเบเบต II: เบเบปเบงเปเบเบเปเบฅเบฐเบเบตเบกเบเบฒเบ
เบเบงเบเปเบฎเบปเบฒเปเบฎเบฑเบเบซเบเบฑเบเบขเบนเปเบเบตเป?
เบเบฑเปเบเบเบฑเปเบ, เบเบฑเปเบเบเบฑเปเบ, เบชเปเบงเบเบเบตเบชเบญเบ. เบเบฑเปเบเบเบตเปเบเบฝเบเปเบงเปเบเปเบญเบเบซเบเปเบฒเบเบตเป, เปเบเบกเบฑเบเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบฎเบฑเบเบเบฑเปเบเบเปเปเปเบเบเบตเป:
-
เปเบซเปเบเบฝเบเบฅเบนเบเบเปเบฒเบเบฐเบซเบเบฒเบเบเปเบญเบเบชเปเบฒเบฅเบฑเบ alphavantage เปเบ aiohttp เบเปเบงเบเบเบฒเบเบฎเปเบญเบเบเปเบชเปเบฒเบฅเบฑเบเบเบธเบเบชเบดเปเบเบชเบธเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเปเบญเบเบเบฒเบ.
-
เปเบซเปเบชเปเบฒเบเบเบปเบงเปเบเบเบเบนเปเบเบตเปเบเบฐเปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบเบซเบผเบฑเบเบเบฑเบเปเบฅเบฐเบเปเปเบกเบนเบ meta เบเปเบฝเบงเบเบฑเบเบเบงเบเปเบเบปเบฒ.
เปเบเป, เบเบตเปเปเบกเปเบเบชเบดเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบฎเบฑเบเบชเปเบฒเบฅเบฑเบเปเบเบเบเบฒเบเบเบปเบงเบกเบฑเบเปเบญเบ, เปเบฅเบฐเปเบเปเบเปเบเบญเบเบเบฒเบเบเบปเปเบเบเบงเปเบฒ faust, เบเบงเบเปเบฎเบปเบฒเบเบฐเบฎเบฝเบเบฎเบนเปเบงเบดเบเบตเบเบฒเบเบเบฝเบเบเบปเบงเปเบเบเบเบตเปเบเบฐเบเบงเบเบเบฒเบเบเปเบฒเบเบเบญเบเปเบซเบเบเบฒเบเบเบฒเบ kafka, เปเบเบฑเปเบเบเบฝเบงเบเบฑเบเบเบฑเบเบงเบดเบเบตเบเบฒเบเบเบฝเบเบเปเบฒเบชเบฑเปเบ (click wrapper), เปเบเบเปเบฅเบฐเบเบตเบเบญเบเบเบงเบเปเบฎเบปเบฒ - เบชเปเบฒเบฅเบฑเบเบเปเปเบเบงเบฒเบกเบเบธเบเบเบนเปเบเบนเปเบกเบทเปเบเบซเบฒเบซเบปเบงเบเปเปเบเบตเปเบเบปเบงเปเบเบเบเปเบฒเบฅเบฑเบเบเบดเบเบเบฒเบก.
เบเบฒเบโเบเบถเบโเบญเบปเบโเบฎเบปเบก
เบฅเบนเบเบเปเบฒ AlphaVantage
เบเปเบฒเบญเบดเบ, เปเบซเปเบเบฝเบเบฅเบนเบเบเปเบฒ aiohttp เบเบฐเบซเบเบฒเบเบเปเบญเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบฎเปเบญเบเบเปเปเบซเป alphavantage.
Spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
เปเบเบเบงเบฒเบกเปเบเบฑเบเบเบดเบ, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเปเบกเปเบเบเบฐเปเบเปเบเบเบฒเบเบกเบฑเบ:
-
AlphaVantage API เปเบกเปเบเบเปเบญเบเบเปเบฒเบเบเปเบฒเบเบเบฒเบเปเบฅเบฐเบญเบญเบเปเบเบเบเบตเปเบชเบงเบเบเบฒเบก, เบเบฑเปเบเบเบฑเปเบเบเปเบญเบเบเบถเปเบเบเบฑเบเบชเบดเบเปเบเปเบฎเบฑเบเบเบฒเบเบฎเปเบญเบเบเปเบเบฑเบเบซเบกเบปเบเปเบเบเบเปเบฒเบเบงเบดเบเบตเบเบฒเบ
construct_query
เบเปเบญเบเบเบตเปเบกเบตเบเบฒเบเปเบ http. -
เบเปเบญเบเปเบญเบปเบฒเบเบปเปเบเบเบฒเบเบฑเบเปเบปเบเบกเบฒเปเบซเป
snake_case
เปเบเบทเปเบญเบเบงเบฒเบกเบชเบฐเบเบงเบ. -
เบเบต, เบเบฒเบเบเบปเบเปเบเปเบ logger.catch เบชเปเบฒเบฅเบฑเบเบเบปเบเบเบฐเบฅเบดเบ traceback เบเบตเปเบชเบงเบเบเบฒเบกเปเบฅเบฐเปเบซเปเบเปเปเบกเบนเบ.
PS เบขเปเบฒเบฅเบทเบกเปเบเบตเปเบก token alphavantage เบขเบนเปเปเบเบเปเบญเบเบเบดเปเบเปเบเบทเปเบญ config.yml, เบซเบผเบทเบชเบปเปเบเบญเบญเบเบเบปเบงเปเบเบชเบฐเบเบฒเบเปเบงเบเบฅเปเบญเบก HORTON_SERVICE_APIKEY
. เบเบงเบเปเบฎเบปเบฒเปเบเปเบฎเบฑเบ token
เบซเปเบญเบเบฎเบฝเบ CRUD
เบเบงเบเปเบฎเบปเบฒเบเบฐเบกเบตเบเบฒเบเปเบเบฑเบเบเปเบฒเบซเบผเบฑเบเบเบฑเบเปเบเบทเปเบญเปเบเบฑเบเบเปเปเบกเบนเบ meta เบเปเบฝเบงเบเบฑเบเบซเบผเบฑเบเบเบฑเบ.
เปเบเบเบงเบฒเบกเบเบดเบเปเบซเบฑเบเบเบญเบเบเปเบญเบ, เบเปเปเบเปเบฒเปเบเบฑเบเบเปเบญเบเบญเบฐเบเบดเบเบฒเบเบซเบเบฑเบเบขเบนเปเบเบตเปเบเบตเป, เปเบฅเบฐเบเบฑเปเบเบเบทเปเบเบเบฒเบเบเบญเบเบกเบฑเบเปเบญเบเปเบกเปเบเบเปเบญเบเบเปเบฒเบเบเปเบฒเบเบเบฒเบ.
get_app()
เปเบซเปเปเบเบตเปเบกเบเบฑเบเบเบฑเบเบชเปเบฒเบเบงเบฑเบเบเบธเปเบญเบฑเบเบเบฅเบดเปเบเบเบฑเบเปเบ
Spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
เบชเปเบฒเบฅเบฑเบเปเบเบเบฑเบเบเบธเบเบฑเบเบเบงเบเปเบฎเบปเบฒเบเบฐเบกเบตเบเบฒเบเบชเปเบฒเบเบเปเบฒเบฎเปเบญเบเบชเบฐเบซเบกเบฑเบเบเบตเปเบเปเบฒเบเบเบฒเบเบเบตเปเบชเบธเบ, เปเบฅเบฑเบเบเปเบญเบเบเปเปเบกเบฒเบเบงเบเปเบฎเบปเบฒเบเบฐเบเบฐเบซเบเบฒเบเบกเบฑเบ, เบขเปเบฒเบเปเบเบเปเบเบฒเบก, เปเบเบทเปเบญเบเปเปเปเบซเปเบเปเบฒเบเบฅเปเบเปเบฒ, เบเบตเปเบเบตเป.
เบชเปเบงเบเปเบซเบเปเปเบกเปเบ
เบเบปเบงเปเบเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเปเบเบฑเบเปเบฅเบฐเบฎเบฑเบเบชเบฒเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบญเบเบซเบผเบฑเบเบเบฑเบ
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
เบเบฑเปเบเบเบฑเปเบ, เบเปเบฒเบญเบดเบเบเบงเบเปเบฎเบปเบฒเปเบเปเบฎเบฑเบเบงเบฑเบเบเบธเบเปเบฒเบฎเปเบญเบเบชเบฐเบซเบกเบฑเบ faust - เบกเบฑเบเบเปเบฒเบเบเบฒเบเบเบต. เบเปเปเปเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเบฒเบเบซเบปเบงเบเปเปเบชเปเบฒเบฅเบฑเบเบเบปเบงเปเบเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบขเปเบฒเบเบเบฐเปเบเปเบ ... เปเบเบเบตเปเบเบตเปเบกเบฑเบเบเบงเบเบเบฐเปเบเบฑเบเบเบฒเบเบเปเบฒเบงเปเบเบดเบเบงเปเบฒเบกเบฑเบเปเบกเปเบเบซเบเบฑเบ, เบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบเบฒเบเปเบเปเบกเปเบเบซเบเบฑเบเปเบฅเบฐเบงเบดเบเบตเบเบฒเบเบเบตเปเบชเบฒเบกเบฒเบเปเบเปเบฎเบฑเบเบเบฒเบเบเบฑเบเบฅเบฝเบเปเบเบเบเปเบฒเบเบเบฑเบ.
-
เบซเบปเบงเบเปเปเปเบ kafka, เบเปเบฒเบเบงเบเปเบฎเบปเบฒเบเปเบญเบเบเบฒเบเบฎเบนเปเบเปเบฒเบเบดเบเบฒเบกเบเบตเปเปเบเปเบเบญเบ, เบกเบฑเบเบเบตเบเบงเปเบฒเบเบตเปเบเบฐเบญเปเบฒเบ
เบเบดเบ. เปเบญเบเบฐเบชเบฒเบ , เบซเบผเบทเบเปเบฒเบเบชเบฒเบกเบฒเบเบญเปเบฒเบเบเบปเบเบชเบฐเบซเบผเบธเบ on Habre เปเบโเบเบฒโเบชเบฒโเบฅเบฑเบโเปเบเบโ, เบเปเบญเบโเบเบตเปโเบเบธเบโเบชเบดเปเบโเบเบธเบโเบขเปเบฒเบโเปเบกเปเบโเบชเบฐโเบเปเบญเบโเปเบซเปโเปเบซเบฑเบโเบขเปเบฒเบโเบเบทเบโเบเปเบญเบ :) -
เบเบฒเบฃเบฒเบกเบดเปเบเบตเบเบฒเบเปเบ , เบญเบฐเบเบดเบเบฒเบเปเบเปเบเบตเปเบ faust doc, เบญเบฐเบเบธเบเบฒเบเปเบซเปเบเบงเบเปเบฎเบปเบฒเบเปเบฒเบเบปเบเบซเบปเบงเบเปเปเปเบเบเบเบปเบเปเบเบฅเบฐเบซเบฑเบ, เปเบเปเบเบญเบ, เบเบตเปเบซเบกเบฒเบเบเบงเบฒเบกเบงเปเบฒเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบชเบฐเบซเบเบญเบเปเบซเปเปเบเบเบเบฑเบเบเบฑเบเบเบฐเบเบฒ faust, เบเบปเบงเบขเปเบฒเบ: เบเบฒเบเบฎเบฑเบเบชเบฒเปเบงเป, เบเบฐเปเบเบเบฒเบเบเบฒเบเปเบเบฑเบเบฎเบฑเบเบชเบฒ (เปเบเบเบเปเบฒเปเบฅเบตเปเบกเบเบปเปเบเบฅเบถเบ, เปเบเปเบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเบเบปเบ.เบซเบเบฒเปเบซเบเปเบ ), เบเปเบฒโเบเบงเบโเบเบญเบโเบเบฒเบโเปเบเปเบโเบเบฑเบโเบเปเปโเบซเบปเบงโเบเปเป (เบเบฐเปเบเบ เปเบเบทเปเบญเปเบฎเบฑเบ, เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบปเบเบเบปเบงเบขเปเบฒเบ, เบซเบเปเบญเบเบเบงเปเบฒเบเบงเบฒเบกเบชเปเบฒเบเบฑเบเบเบปเปเบงเปเบฅเบ faust เบเปเบฒเบฎเปเบญเบเบชเบฐเบซเบกเบฑเบ). -
เปเบเบเบเบปเปเบงเปเบ, เบเบปเบงเปเบเบเบชเบฒเบกเบฒเบเบชเปเบฒเบเบซเบปเบงเบเปเปเบเบตเปเบกเบตเบเบฒเบเบเบธเปเบกเบเบญเบเบเบตเปเบกเบตเบเปเบฒเบเบปเปเบงเปเบฅเบ, เปเบเบงเปเบเบเปเปเบเบฒเบก, เบเปเบญเบเบขเบฒเบเบเบฐเบเบฒเบเบเบธเบเบขเปเบฒเบเบขเปเบฒเบเบเบฐเปเบเปเบ. เบเบญเบเบเบฒเบเบเบฑเปเบ, เบเบฒเบเบเบฒเบฅเบฒเบกเบดเปเบเบต (เบเบปเบงเบขเปเบฒเบ, เบเปเบฒเบเบงเบเบเบญเบเบเบฒเบเปเบเปเบเบชเปเบงเบเบซเบผเบทเบเบฐเปเบเบเบฒเบเบเบฒเบเบฎเบฑเบเบชเบฒเปเบงเป) เบเบญเบเบซเบปเบงเบเปเปเปเบเบเบฒเบเปเบเบชเบฐเบเบฒเบเบปเบงเปเบเบเบเปเปเบชเบฒเบกเบฒเบเบเบทเบเบเบฑเปเบเบเปเบฒเปเบเป.
เบเบตเปเปเบกเปเบเบชเบดเปเบเบเบตเปเบกเบฑเบเบญเบฒเบเบเบฐเปเบเบดเปเบเบเบทเบงเปเบฒเปเบเบเบเปเปเบกเบตเบเบฒเบเบเปเบฒเบเบปเบเบซเบปเบงเบเปเปเบเปเบงเบเบเบปเบเปเบญเบ:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
เปเบฅเปเบง, เบเบญเบเบเบตเปเปเบซเปเบญเบฐเบเบดเบเบฒเบเบชเบดเปเบเบเบตเปเบเบปเบงเปเบเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบฎเบฑเบ :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
เบเบฑเปเบเบเบฑเปเบ, เปเบเบเบญเบเปเบฅเบตเปเบกเบเบปเปเบเบเบญเบเบเบปเบงเปเบเบ, เบเบงเบเปเบฎเบปเบฒเปเบเบตเบเบเบญเบเบเบฐเบเบธเบก aiohttp เบชเปเบฒเบฅเบฑเบเบเบฒเบเบฎเปเบญเบเบเปเบเปเบฒเบเบฅเบนเบเบเปเบฒเบเบญเบเบเบงเบเปเบฎเบปเบฒ. เบเบฑเปเบเบเบฑเปเบ, เปเบกเบทเปเบญเปเบฅเบตเปเบกเบเบปเปเบเบเบฐเบเบฑเบเบเบฒเบ, เปเบกเบทเปเบญเบเบปเบงเปเบเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบทเบเปเบเบตเบเบเบปเบง, เบเบญเบเบเบฐเบเบธเบกเบเบฐเปเบเบตเบเบเบฑเบเบเบต - เบซเบเบถเปเบ, เบชเปเบฒเบฅเบฑเบเปเบงเบฅเบฒเบเบตเปเบเบฐเบเบฑเบเบเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบฑเบเบซเบกเบปเบ (เบซเบผเบทเบซเบผเบฒเบเปเบเบปเบ, เบเปเบฒเบเปเบฒเบเบเปเบฝเบเบเบฒเบฅเบฒเบกเบดเปเบเบต.
เบเปเปเปเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเบดเบเบฑเบเบเบฒเบกเบเบฐเปเบช (เบเบงเบเปเบฎเบปเบฒเบงเบฒเบเบเปเปเบเบงเบฒเบกเปเบ _
, เปเบเบทเปเบญเบเบเบฒเบเบงเปเบฒเบเบงเบเปเบฎเบปเบฒ, เปเบเบเบปเบงเปเบเบเบเบตเป, เบเปเปเบชเบปเบเปเบเปเบเบทเปเบญเบซเบฒ) เบเบญเบเบเปเปเบเบงเบฒเบกเบเบฒเบเบซเบปเบงเบเปเปเบเบญเบเบเบงเบเปเบฎเบปเบฒ, เบเปเบฒเบเบงเบเปเบเบปเบฒเบกเบตเบขเบนเปเปเบเบเบฒเบเบเบปเบเปเบเบตเบเปเบเบเบฐเบเบธเบเบฑเบ, เบเปเบฒเบเปเปเบเบฑเปเบเบเบฑเปเบเบงเบปเบเบเบญเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฐเบฅเปเบเปเบฒเบเบฒเบเบกเบฒเบฎเบญเบเบเบญเบเบเบงเบเปเบเบปเบฒ. เบเบต, เบเบฒเบเปเบ loop เบเบญเบเบเบงเบเปเบฎเบปเบฒ, เบเบงเบเปเบฎเบปเบฒเปเบเบปเปเบฒเบชเบนเปเบฅเบฐเบเบปเบเบเบฒเบเบฎเบฑเบเบเปเปเบเบงเบฒเบก, เปเบเปเบฎเบฑเบเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบญเบเบเบฒเบเปเบเบทเปเบญเบเปเบซเบง (get_securities เบเบฑเบเบเบทเบเบกเบฒเบเบฝเบเปเบเปเบเบฒเบเปเบเบทเปเบญเบเปเบซเบงเปเบเบเบเปเบฒเปเบฅเบตเปเบกเบเบปเปเบ, เปเบเบดเปเบเบฅเบฐเบซเบฑเบเบฅเบนเบเบเปเบฒ) เบซเบผเบฑเบเบเบฑเบเปเบฅเบฐเบเบฑเบเบเบถเบเบกเบฑเบเปเบงเปเปเบเบเบฒเบเบเปเปเบกเบนเบ, เบเบงเบเปเบเบดเปเบเบงเปเบฒเบกเบตเบเบงเบฒเบกเบเบญเบเปเบเบเบตเปเบกเบต ticker เบเบฝเบงเบเบฑเบเปเบฅเบฐ. เบเบฒเบเปเบฅเบเบเปเบฝเบเปเบเบเบฒเบเบเปเปเบกเบนเบ, เบเปเบฒเบซเบฒเบเบงเปเบฒเบกเบต, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบกเบฑเบ (เปเบเปเบ) เบเบฐเปเบเปเบฎเบฑเบเบเบฒเบเบเบฑเบเบเบธเบเบเบฝเบเปเบเป.
เปเบเบตเบเบเบปเบงเบเบฒเบเบชเปเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ!
> docker-compose up -d
... ะะฐะฟััะบ ะบะพะฝัะตะนะฝะตัะพะฒ ...
> faust -A horton.agents worker --without-web -l info
เบเบธเบเบเบฐเบชเบปเบกเบเบฑเบ PS
เปเบเบเปเบฒเบชเบฑเปเบเปเบเบตเบเบเบปเบงเบเบญเบเบเบงเบเปเบฎเบปเบฒ, เบเบงเบเปเบฎเบปเบฒเปเบเปเบเบญเบ faust เบเปเบญเบเบเบตเปเบเบฐเบเบญเบเบซเบฒเบเบธเบเบเบฐเบชเบปเบเบเบญเบเปเบญเบฑเบเบเบฅเบดเปเบเบเบฑเบเปเบฅเบฐเบชเบดเปเบเบเบตเปเบเปเบญเบเปเบฎเบฑเบเบเบฑเบเบกเบฑเบ (เปเบเบตเบเบเบปเบงเบเบฐเบเบฑเบเบเบฒเบ) เบเบฑเบเบฅเบฐเบเบฑเบเบเบปเบเบเบฐเบฅเบดเบเบเบฑเบเบเบถเบเบเปเปเบกเบนเบ. เบเบงเบเปเบฎเบปเบฒเปเบเปเบฎเบฑเบเบเบปเบเบเบฐเบฅเบดเบเบเบฑเปเบเบเปเปเปเบเบเบตเป:
Spoiler
โฦaยตSโ v1.10.4โฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ id โ horton โ
โ transport โ [URL('kafka://localhost:9092')] โ
โ store โ memory: โ
โ log โ -stderr- (info) โ
โ pid โ 1271262 โ
โ hostname โ host-name โ
โ platform โ CPython 3.8.2 (Linux x86_64) โ
โ drivers โ โ
โ transport โ aiokafka=1.1.6 โ
โ web โ aiohttp=3.6.2 โ
โ datadir โ /path/to/project/horton-data โ
โ appdir โ /path/to/project/horton-data/v1 โ
โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...
โTopic Partition Setโโโโโโโโโโฌโโโโโโโโโโโโโ
โ topic โ partitions โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโค
โ collect_securities โ {0-7} โ
โ horton-__assignor-__leader โ {0} โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโ
เบกเบฑเบเบกเบตเบเบตเบงเบดเบเบขเบนเป!!!
เปเบซเปโเปเบฎเบปเบฒโเปเบเบดเปเบโเบเบตเปโเบเปเบฒโเบเบปเบโเปเบงเปโเปเบเปเบโเบเบฑเบโ. เบเบฑเปเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเปเบซเบฑเบเปเบเป, เบซเบปเบงเบเปเปเบเบทเบเบชเปเบฒเบเบเบทเปเบเบเปเบงเบเบเบทเปเบเบตเปเบเบงเบเปเบฎเบปเบฒเบเปเบฒเบเบปเบเปเบงเปเปเบเบฅเบฐเบซเบฑเบ, เบเปเบฒเบเบงเบเบเบฒเบเบดเบเบฑเบเปเบฅเบตเปเบกเบเบปเปเบ (8, เปเบญเบปเบฒเบกเบฒเบเบฒเบ.
เปเบฅเปเบง, เบเบญเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเปเบเบซเบฒเบเปเบญเบเบขเปเบฝเบกเบเบฒเบเบเบฒเบเบญเบทเปเบเปเบฅเบฐเบชเบปเปเบเบเปเปเบเบงเบฒเบกเบซเบงเปเบฒเบเปเบเบซเบฒเบซเบปเบงเบเปเปเบเบญเบเบเบงเบเปเบฎเบปเบฒ:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS เปเบเป @
เบเบงเบเปเบฎเบปเบฒเบชเบฐเปเบเบเปเบซเปเปเบซเบฑเบเบงเปเบฒเบเบงเบเปเบฎเบปเบฒเบเปเบฒเบฅเบฑเบเบชเบปเปเบเบเปเปเบเบงเบฒเบกเปเบเบซเบฒเบซเบปเบงเบเปเปเบเบตเปเบกเบตเบเบทเปเบงเปเบฒ "collect_securities".
เปเบโเบเปโเบฅเบฐโเบเบตโเบเบตเปโ, เบเปเปโเบเบงเบฒเบกโเปเบเปโเปเบโเบซเบฒโเบเบฒโเบเบดโเบเบฑเบ 6 - เบเปเบฒเบโเบชเบฒโเบกเบฒเบโเบเบงเบโเบชเบญเบโเบเบฒเบโเบเบตเปโเปเบเบโเบเบฒเบโเปเบ kafdrop onโ localhost:9000
เปเบเบซเบฒเบเปเบญเบเบขเปเบฝเบกเบขเบนเปเบเบฒเบเบเบญเบเบเบฑเบเบเบฐเบเบฑเบเบเบฒเบเบเบญเบเบเบงเบเปเบฎเบปเบฒ, เบเบงเบเปเบฎเบปเบฒเบเบฐเปเบซเบฑเบเบเปเปเบเบงเบฒเบกเบเบตเปเบกเบตเบเบงเบฒเบกเบชเบธเบเบเบตเปเบเบทเบเบชเบปเปเบเปเบเปเบเบเปเบเป loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
เบเบงเบเปเบฎเบปเบฒเบเบฑเบเบชเบฒเบกเบฒเบเปเบเบดเปเบเปเบเบปเปเบฒเปเบเปเบ mongo (เปเบเป Robo3T เบซเบผเบท Studio3T) เปเบฅเบฐเปเบเบดเปเบเบงเปเบฒเบซเบผเบฑเบเบเบฑเบเปเบกเปเบเบขเบนเปเปเบเบเบฒเบเบเปเปเบกเบนเบ:
เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเปเปเบกเปเบเบกเบฐเบซเบฒเปเบชเบเบเบต, เปเบฅเบฐเบเบฑเปเบเบเบฑเปเบเบเบงเบเปเบฎเบปเบฒเบกเบตเบเบงเบฒเบกเบเปเปเบเบเบฑเบเบเบปเบงเปเบฅเบทเบญเบเบเบฒเบเปเบเบดเปเบเบเปเบฒเบญเบดเบ.
เบเบงเบฒเบกเบชเบธเบเปเบฅเบฐเบเบงเบฒเบกเบชเบธเบ - เบเบปเบงเปเบเบเบเปเบฒเบญเบดเบเปเบกเปเบเบเบฝเบกเบเปเบญเบก :)
เบเบปเบงเปเบเบเบเปเบญเบกเปเบฅเปเบง เบเบปเบงเปเบเบเปเปเป เบญเบฒเบเบธเบเบทเบ!
เปเบกเปเบเปเบฅเปเบง, เบเบนเปเบเบฒเบ, เบเบงเบเปเบฎเบปเบฒเปเบเปเบเบงเบกเปเบญเบปเบฒเบเบฝเบเปเบเป 1/3 เบเบญเบเปเบชเบฑเปเบเบเบฒเบเบเบตเปเบเบฐเบเบฝเบกเปเบเบเบเบปเบเบเบงเบฒเบกเบเบตเป, เปเบเปเบขเปเบฒเบเปเปเบเบญเบเปเบ, เปเบเบฒเบฐเบงเปเบฒเปเบเบเบฑเบเบเบธเบเบฑเบเบกเบฑเบเบเบฐเบเปเบฒเบเบเบถเปเบ.
เบเบฑเปเบเบเบฑเปเบเปเบเบเบฑเบเบเบธเบเบฑเบเบเบงเบเปเบฎเบปเบฒเบเปเบญเบเบเบฒเบเบเบปเบงเปเบเบเบเบตเปเปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบ meta เปเบฅเบฐเปเบฎเบฑเบเปเบซเปเบกเบฑเบเปเบเบปเปเบฒเปเบเปเบเปเบญเบเบฐเบชเบฒเบเบเบฒเบเบฅเบงเบเบฅเบงเบก:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
เปเบเบทเปเบญเบเบเบฒเบเบเบปเบงเปเบเบเบเบตเปเบเบฐเบเบฐเบกเบงเบเบเบปเบเบเปเปเบกเบนเบเบเปเบฝเบงเบเบฑเบเบเบงเบฒเบกเบเบญเบเปเบเบชเบฐเปเบเบฒเบฐ, เบเบงเบเปเบฎเบปเบฒเบเปเบฒเปเบเบฑเบเบเปเบญเบเบเบตเปเบเบญเบ ticker (เบชเบฑเบเบเบฒเบฅเบฑเบ) เบเบญเบเบเบงเบฒเบกเบเบญเบเปเบเบเบตเปเปเบเบเปเปเบเบงเบฒเบก. เบชเปเบฒเบฅเบฑเบเบเบธเบเบเบฐเบชเบปเบเบเบตเปเปเบ faust เบกเบต
เปเบเบเปเบฅเบฐเบเบตเบเบตเป, เปเบซเปเปเบ
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
เบเบฑเปเบเบเบตเปเบเปเบฒเบเบญเบฒเบเบเบฐเปเบเปเบเบฒเบเปเบเบปเบฒ, faust เปเบเปเบเปเบฒเบเบฑเบเบเบฒเบเบเบฐเปเบเบ python เปเบเบทเปเบญเบญเบฐเบเบดเบเบฒเบเบฎเบนเบเปเบเบเบเปเปเบเบงเบฒเบก, เปเบเบดเปเบเปเบกเปเบเปเบซเบเบเบปเบเบเบตเปเบงเปเบฒเบชเบฐเบเบฑเบเบเปเบฒเปเบชเบธเบเบเบตเปเบชเบฐเบซเบเบฑเบเบชเบฐเบซเบเบธเบเปเบเบเบซเปเบญเบเบชเบฐเบซเบกเบธเบเปเบกเปเบ.
เปเบซเปเบเบฑเบเบเบทเบเปเบเบซเบฒเบเบปเบงเปเบเบ, เบเปเบฒเบเบปเบเบเบฐเปเบเบเปเบฅเบฐเปเบเบตเปเบกเบกเบฑเบ:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
เบเบฑเปเบเบเบตเปเปเบเบปเปเบฒเบชเบฒเบกเบฒเบเปเบซเบฑเบเปเบเป, เบเบงเบเปเบฎเบปเบฒเบเปเบฒเบเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเปเบซเบกเปเบเบตเปเบกเบตเปเบเบเบเบฒเบเบเบฑเบเบงเบดเบเบตเบเบฒเบเปเบฅเบตเปเบกเบเบปเปเบเบซเบปเบงเบเปเป - value_type . เบเบญเบเบเบฒเบเบเบฑเปเบ, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบเบเบฐเบเบดเบเบฑเบเบเบฒเบกเปเบเบเบเบฒเบเบเบฝเบงเบเบฑเบ, เบชเบฐเบเบฑเปเบเบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเปเปเบซเบฑเบเบเบธเบเปเบเบเบตเปเบเบฐเบขเบนเปเบเบฑเบเบชเบดเปเบเบญเบทเปเบ.
เบเบต, เบเบฒเบเบชเปเบฒเบเบฑเบเบชเบธเบเบเปเบฒเบเปเบกเปเบเบเบฒเบเปเบเบตเปเบกเบเบฒเบเปเบเบซเบฒเบเบปเบงเปเบเบเปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบ meta เปเบเบทเปเบญ collect_securitites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
เบเบงเบเปเบฎเบปเบฒเบเปเบฒเปเบเปเปเบเบเบเบฒเบเบเบตเปเบเบฐเบเบฒเบเบเปเบญเบเบซเบเปเบฒเบเบตเปเบชเปเบฒเบฅเบฑเบเบเปเปเบเบงเบฒเบก. เปเบเบเปเบฅเบฐเบเบตเบเบตเป, เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเปเบเปเบงเบดเบเบตเบเบฒเบ .cast เบเบฑเบเบเบฑเปเบเปเบเปเบเบงเบเปเบฎเบปเบฒเบเปเปเบเปเบฒเปเบเบฑเบเบเปเบญเบเบฅเปเบเปเบฒเบเบปเบเปเบเปเบฎเบฑเบเบเบฒเบเบเบปเบงเปเบเบ, เปเบเปเบกเบฑเบเบชเบปเบกเบเบงเบเบเบตเปเบเบฐเบเปเบฒเบงเปเบเบดเบเบงเปเบฒ.
-
cast - เบเปเปเบเบฑเบเปเบเบทเปเบญเบเบเบฒเบเบงเปเบฒเบกเบฑเบเบเปเปเปเบเปเบเบฒเบเบซเบงเบฑเบเบเบปเบเปเบเปเบฎเบฑเบ. เบเปเบฒเบเบเปเปเบชเบฒเบกเบฒเบเบชเบปเปเบเบเบปเบเปเบเปเบฎเบฑเบเปเบเบซเบฒเบซเบปเบงเบเปเปเบญเบทเปเบเปเบเบฑเบเบเปเปเบเบงเบฒเบก.
-
send - เบเปเปเบเบฑเบเปเบเบฒเบฐเบงเปเบฒเบกเบฑเบเบเปเปเปเบเปเบเบฒเบเบซเบงเบฑเบเบเบปเบเปเบเปเบฎเบฑเบ. เบเปเบฒเบเบชเบฒเบกเบฒเบเบฅเบฐเบเบธเบเบปเบงเปเบเบเปเบเบซเบปเบงเบเปเปเบเบตเปเบเบปเบเปเบเปเบฎเบฑเบเบเบฐเปเบ.
-
เบเบฒเบก - เบฅเปเบเปเบฒเบเบปเบเปเบเปเบฎเบฑเบ. เบเปเบฒเบเบชเบฒเบกเบฒเบเบฅเบฐเบเบธเบเบปเบงเปเบเบเปเบเบซเบปเบงเบเปเปเบเบตเปเบเบปเบเปเบเปเบฎเบฑเบเบเบฐเปเบ.
เบเบฑเปเบเบเบฑเปเบ, เบเบฑเบเบซเบกเบปเบเบเบตเปเบกเบตเบเบปเบงเปเบเบเบชเปเบฒเบฅเบฑเบเบกเบทเปเบเบตเป!
เบเบตเบกเบเบฒเบเบเบฑเบ
เบชเบดเปเบเบชเบธเบเบเปเบฒเบเบเบตเปเบเปเบญเบเบชเบฑเบเบเบฒเบงเปเบฒเบเบฐเบเบฝเบเปเบเบชเปเบงเบเบเบตเปเปเบกเปเบเบเปเบฒเบชเบฑเปเบ. เบเบฑเปเบเบเบตเปเปเบเปเบเปเบฒเบงเบเปเบญเบเบซเบเปเบฒเบเบตเป, เบเปเบฒเบชเบฑเปเบเปเบ faust เปเบกเปเบ wrapper เบเบฐเบกเบฒเบเบเบฅเบดเบ. เปเบเบเบงเบฒเบกเปเบเบฑเบเบเบดเบ, faust เบเบฝเบเปเบเปเปเบญเบปเบฒเบเปเบฒเบชเบฑเปเบเบเบตเปเบเปเบฒเบซเบเบปเบเปเบญเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเปเบชเปเปเบเบเบฒเบเปเบเปเบเบญเบเบเบญเบเบกเบฑเบเปเบเปเบงเบฅเบฒเบเบตเปเบเปเบฒเบเบปเบเบฅเบฐเบซเบฑเบ -A
เบซเบผเบฑเบเบเบฒเบเบเบปเบงเปเบเบเบเบฐเบเบฒเบเปเบ
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
เบเบฑเปเบเบเบฑเปเบ, เบเปเบฒเบเบงเบเปเบฎเบปเบฒเปเบเบซเบฒเบเบฑเบเบเบตเบฅเบฒเบเบเบทเปเบเบญเบเบเปเบฒเบชเบฑเปเบ, เบเปเบฒเบชเบฑเปเบเปเบซเบกเปเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฐเบขเบนเปเปเบเบกเบฑเบ:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
เบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเปเบเปเบกเบฑเบเบเบทเบเบฑเบเบเบนเปเบญเบทเปเบ, เบชเบฐเบเบฑเปเบเปเบซเปเบเบงเบเปเบฎเบปเบฒเปเบฅเบตเปเบกเบเบฐเบเบฑเบเบเบฒเบ faust เปเปเป เปเบฅเบฐเปเบฅเบตเปเบกเบเบปเปเบเบเบฒเบเปเบเบฑเบเบซเบผเบฑเบเบเบฑเบเบเบตเปเปเบเบฑเบกเบฎเบนเบเปเบเบ:
> faust -A horton.agents start-collect-securities
เบเบฐเบกเบตเบซเบเบฑเบเปเบเบตเบเบเบทเปเบเบเปเปเปเบ?
เปเบเบชเปเบงเบเบเปเปเปเบ, เบเบฒเบเบเปเบฒเปเบเปเบเบปเบงเปเบเบเบเบตเปเบเบฑเบเปเบซเบผเบทเบญเปเบเบฑเบเบเบปเบงเบขเปเบฒเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเบดเบเบฒเบฅเบฐเบเบฒเบเบปเบเปเบเบเบฒเบเบซเบฅเบปเปเบกเบเบปเบกเปเบเบเบฒเบเบเบปเปเบเบซเบฒเบเบตเปเบชเบธเบเปเบเบฅเบฒเบเบฒเบเบดเบเบเบญเบเบเบฒเบเบเบทเปเบเบฒเบเบชเปเบฒเบฅเบฑเบเบเบตเปเบฅเบฐเบเบฒเบเปเบเบตเบเบเบปเบง cron เบเบญเบเบเบปเบงเปเบเบ.
เบเบฑเปเบเปเบกเปเบเบเบฑเบเปเบปเบเบชเบณเบฅเบฑเบเบกเบทเปเบเบตเป! เบเบญเบเปเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบญเปเบฒเบ :)
PS เบเบฒเบโเปเบเปโเบเบฒเบโเบชเปเบงเบโเบชเบธเบโเบเปเบฒเบโเบเปเบฒโเบเบฐโเปเบเบปเปเบฒโเปเบเปโเบเบทเบโเบเบฒเบกโเบเปเบฝเบงโเบเบฑเบ faust เปเบฅเบฐ confluent kafka (
เปเบซเบผเปเบเบเปเปเบกเบนเบ: www.habr.com