์ฐจ๋ก
-
XNUMX๋ถ: ์์ด์ ํธ ๋ฐ ํ
์ฐ๋ฆฌ ์ฌ๊ธฐ์ ๋ญํ๋ ๊ฑฐ์ง?
๊ทธ๋์ ๋ ๋ฒ์งธ ๋ถ๋ถ์ ๋๋ค. ์์ ์์ฑํ ๋๋ก ๋ค์์ ์ํํฉ๋๋ค.
-
ํ์ํ ์๋ํฌ์ธํธ์ ๋ํ ์์ฒญ์ ์ฌ์ฉํ์ฌ aiohttp์ alphavantage์ฉ ์๊ท๋ชจ ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํด ๋ณด๊ฒ ์ต๋๋ค.
-
์ฆ๊ถ์ ๋ํ ๋ฐ์ดํฐ์ ๊ทธ์ ๋ํ ๋ฉํ์ ๋ณด๋ฅผ ์์งํ๋ ์์ด์ ํธ๋ฅผ ๋ง๋ค์ด ๋ณด๊ฒ ์ต๋๋ค.
๊ทธ๋ฌ๋ ์ด๊ฒ์ด ํ๋ก์ ํธ ์์ฒด์ ๋ํด ์ํํ ์์ ์ด๋ฉฐ, ํ์ฐ์คํธ ์ฐ๊ตฌ ์ธก๋ฉด์์ Kafka์ ์คํธ๋ฆผ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ ์์ด์ ํธ๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ๊ณผ ๋ช ๋ น(ํด๋ฆญ ๋ํผ)์ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ฐฐ์๋๋ค. ์์ด์ ํธ๊ฐ ๋ชจ๋ํฐ๋งํ๋ ์ฃผ์ ์ ๋ํ ์๋ ํธ์ ๋ฉ์์ง์ ๊ฒฝ์ฐ.
ํ๋ จ
AlphaVantage ํด๋ผ์ด์ธํธ
๋จผ์ alphavantage์ ๋ํ ์์ฒญ์ ์ํ ์์ aiohttp ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํด ๋ณด๊ฒ ์ต๋๋ค.
์คํฌ์ผ๋ฌ
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
์ค์ ๋ก ๋ชจ๋ ๊ฒ์ด ๋ช ํํฉ๋๋ค.
-
AlphaVantage API๋ ๋งค์ฐ ๊ฐ๋จํ๊ณ ์๋ฆ๋ต๊ฒ ๋์์ธ๋์๊ธฐ ๋๋ฌธ์ ๋ชจ๋ ์์ฒญ์ ์ด ๋ฉ์๋๋ฅผ ํตํด ์ํํ๊ธฐ๋ก ๊ฒฐ์ ํ์ต๋๋ค.
construct_query
์ฐจ๋ก๋ก http ํธ์ถ์ด ์์ต๋๋ค. -
๋๋ ๋ชจ๋ ๋ถ์ผ๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
snake_case
ํธ์ํจ์ ์ํด. -
์๋ฆ๋ต๊ณ ์ ์ตํ ์ญ์ถ์ ์ถ๋ ฅ์ ์ํ logger.catch ์ฅ์์ ๋๋ค.
PS alphavantage ํ ํฐ์ config.yml์ ๋ก์ปฌ๋ก ์ถ๊ฐํ๊ฑฐ๋ ํ๊ฒฝ ๋ณ์๋ฅผ ๋ด๋ณด๋ด๋ ๊ฒ์ ์์ง ๋ง์ธ์. HORTON_SERVICE_APIKEY
. ์ฐ๋ฆฌ๋ ํ ํฐ์ ๋ฐ์ต๋๋ค
CRUD ํด๋์ค
์ฆ๊ถ์ ๋ํ ๋ฉํ์ ๋ณด๋ฅผ ์ ์ฅํ๊ธฐ ์ํด ์ฆ๊ถ ์ปฌ๋ ์ ์ ๊ฐ๊ฒ ๋ฉ๋๋ค.
์ ์๊ฐ์๋ ์ฌ๊ธฐ์๋ ์๋ฌด๊ฒ๋ ์ค๋ช ํ ํ์๊ฐ ์์ผ๋ฉฐ ๊ธฐ๋ณธ ํด๋์ค ์์ฒด๋ ๋งค์ฐ ๊ฐ๋จํฉ๋๋ค.
get_app()
์ ํ๋ฆฌ์ผ์ด์
๊ฐ์ฒด๋ฅผ ์์ฑํ๋ ํจ์๋ฅผ ์ถ๊ฐํด ๋ณด๊ฒ ์ต๋๋ค.
์คํฌ์ผ๋ฌ
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
์ง๊ธ์ ๊ฐ์ฅ ๊ฐ๋จํ ์ ํ๋ฆฌ์ผ์ด์
์์ฑ์ ํ ์์ ์ด๋ฉฐ, ์กฐ๊ธ ํ์ ํ์ฅํ ์์ ์
๋๋ค. ๊ทธ๋ฌ๋ ์ฌ๋ฌ๋ถ์ด ๊ธฐ๋ค๋ฆฌ์ง ์๋๋ก ์ฌ๊ธฐ์์ ํ์ธํ์ธ์.
์ฃผ์ ๋ถ๋ถ
์ ๊ฐ์ฆ๊ถ ๋ชฉ๋ก ์์ง ๋ฐ ์ ์ง ๊ด๋ฆฌ ๋ํ
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
๋จผ์ ๊ฐ์ฅ ๋น ๋ฅธ ์ ํ๋ฆฌ์ผ์ด์ ๊ฐ์ฒด๋ฅผ ์ป์ต๋๋ค. ๋งค์ฐ ๊ฐ๋จํฉ๋๋ค. ๋ค์์ผ๋ก, ์์ด์ ํธ์ ๋ํ ์ฃผ์ ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ธํฉ๋๋ค... ์ฌ๊ธฐ์ ๊ทธ๊ฒ์ด ๋ฌด์์ธ์ง, ๋ด๋ถ ๋งค๊ฐ๋ณ์๊ฐ ๋ฌด์์ธ์ง, ๊ทธ๋ฆฌ๊ณ ์ด๊ฒ์ด ์ด๋ป๊ฒ ๋ค๋ฅด๊ฒ ๋ฐฐ์ด๋ ์ ์๋์ง ์ธ๊ธํ ๊ฐ์น๊ฐ ์์ต๋๋ค.
-
์นดํ์นด์ ์ฃผ์ , ์ ํํ ์ ์๋ฅผ ์๊ณ ์ถ๋ค๋ฉด ์ฝ๋ ๊ฒ์ด ์ข์ต๋๋ค
๋๋ค. ๋ฌธ์ , ๋๋ ์ฝ์ ์ ์์ต๋๋ค์ ์ ๋ชจ๋ ๊ฒ์ด ๋งค์ฐ ์ ํํ๊ฒ ๋ฐ์๋๋ ๋ฌ์์์ด Habrรฉ์ :) -
๋งค๊ฐ๋ณ์ ๋ด๋ถ , faust ๋ฌธ์์ ๊ฝค ์ ์ค๋ช ๋์ด ์์ผ๋ฏ๋ก ์ฝ๋์์ ์ง์ ์ฃผ์ ๋ฅผ ๊ตฌ์ฑํ ์ ์์ต๋๋ค. ๋ฌผ๋ก ์ด๋ faust ๊ฐ๋ฐ์๊ฐ ์ ๊ณตํ ๋งค๊ฐ๋ณ์๋ฅผ ์๋ฏธํฉ๋๋ค. ์: ๋ณด์กด, ๋ณด์กด ์ ์ฑ (๊ธฐ๋ณธ์ ์ผ๋ก ์ญ์ ๋์ง๋ง ์ค์ ํ ์ ์์)ํฉํธ ), ์ฃผ์ ๋น ํํฐ์ ์(์ ์ ์๋ฅผ ๋ค์ด, ~๋ณด๋ค ์ ๊ฒ ํ๋ค์ธ๊ณ์ ์ธ ์ค์์ฑ ์์ฉ ํ๋ก๊ทธ๋จ ํ์ฐ์คํธ). -
์ผ๋ฐ์ ์ผ๋ก ์์ด์ ํธ๋ ์ ์ญ ๊ฐ์ ์ฌ์ฉํ์ฌ ๊ด๋ฆฌ๋๋ ์ฃผ์ ๋ฅผ ์์ฑํ ์ ์์ง๋ง ์ ๋ ๋ชจ๋ ๊ฒ์ ๋ช ์์ ์ผ๋ก ์ ์ธํ๋ ๊ฒ์ ์ ํธํฉ๋๋ค. ๋ํ ์์ด์ ํธ ๊ด๊ณ ์ ์๋ ์ฃผ์ ์ ์ผ๋ถ ๋งค๊ฐ๋ณ์(์: ํํฐ์ ์ ๋๋ ๋ณด์กด ์ ์ฑ )๋ฅผ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
์ฃผ์ ๋ฅผ ์๋์ผ๋ก ์ ์ํ์ง ์์ ๊ฒฝ์ฐ์ ๋ชจ์ต์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
์, ์ด์ ์ฐ๋ฆฌ ์์ด์ ํธ๊ฐ ๋ฌด์์ ํ ๊ฒ์ธ์ง ์ค๋ช ํ๊ฒ ์ต๋๋ค :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
๋ฐ๋ผ์ ์์ด์ ํธ ์์ ๋ถ๋ถ์์ ํด๋ผ์ด์ธํธ๋ฅผ ํตํ ์์ฒญ์ ๋ํด aiohttp ์ธ์
์ ์ฝ๋๋ค. ๋ฐ๋ผ์ ์์
์๋ฅผ ์์ํ ๋ ์์ด์ ํธ๊ฐ ์์๋๋ฉด ์์
์๊ฐ ์คํ๋๋ ์ ์ฒด ์๊ฐ ๋์ ํ๋์ ์ธ์
์ด ์ฆ์ ์ด๋ฆฝ๋๋ค(๋๋ ๋งค๊ฐ๋ณ์๋ฅผ ๋ณ๊ฒฝํ๋ ๊ฒฝ์ฐ ์ฌ๋ฌ ๊ฐ).
๋ค์์ผ๋ก ์คํธ๋ฆผ์ ๋ฐ๋ฆ
๋๋ค(๋ฉ์์ง๋ฅผ _
, ์ด ์์ด์ ํธ์์๋ ์ฃผ์ ์ ๋ฉ์์ง ๋ด์ฉ์ ์ ๊ฒฝ ์ฐ์ง ์๊ธฐ ๋๋ฌธ์ ๋ฉ์์ง๊ฐ ํ์ฌ ์คํ์
์ ์กด์ฌํ๋ ๊ฒฝ์ฐ ๊ทธ๋ ์ง ์์ผ๋ฉด ์ฃผ๊ธฐ๊ฐ ๋ฉ์์ง ๋์ฐฉ์ ๊ธฐ๋ค๋ฆฝ๋๋ค. ๋ฃจํ ๋ด์์ ๋ฉ์์ง ์์ ์ ๊ธฐ๋กํ๊ณ ํ์ฑ(get_securities๋ ๊ธฐ๋ณธ์ ์ผ๋ก ํ์ฑ ์ํ๋ง ๋ฐํ, ํด๋ผ์ด์ธํธ ์ฝ๋ ์ฐธ์กฐ) ์ฆ๊ถ ๋ชฉ๋ก์ ๊ฐ์ ธ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํ๊ณ ๋์ผํ ํฐ์ปค๊ฐ ์๋ ์ฆ๊ถ์ด ์๋์ง ํ์ธํฉ๋๋ค. ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ตํ์ด ์๋ ๊ฒฝ์ฐ ํด๋น ๋ฌธ์(๋
ผ๋ฌธ)๊ฐ ๊ฐ๋จํ ์
๋ฐ์ดํธ๋ฉ๋๋ค.
์ฐ๋ฆฌ์ ์ฐฝ์กฐ๋ฌผ์ ์์ํฉ์๋ค!
> docker-compose up -d
... ะะฐะฟััะบ ะบะพะฝัะตะนะฝะตัะพะฒ ...
> faust -A horton.agents worker --without-web -l info
PS ๊ธฐ๋ฅ
์์ ๋ช ๋ น์์ ์ฐ๋ฆฌ๋ ์ ๋ณด ๋ก๊ทธ ์ถ๋ ฅ ์์ค์ ์ฌ์ฉํ์ฌ ์์ฉ ํ๋ก๊ทธ๋จ ๊ฐ์ฒด๋ฅผ ์ฐพ์ ์์น์ ์ด๋ฅผ ํตํด ์ํํ ์์ (์์ ์ ์์)์ ํ์ฐ์คํธ์๊ฒ ์ง์ํ์ต๋๋ค. ์ฐ๋ฆฌ๋ ๋ค์๊ณผ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ์ป์ต๋๋ค:
์คํฌ์ผ๋ฌ
โฦaยตSโ v1.10.4โฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ id โ horton โ
โ transport โ [URL('kafka://localhost:9092')] โ
โ store โ memory: โ
โ log โ -stderr- (info) โ
โ pid โ 1271262 โ
โ hostname โ host-name โ
โ platform โ CPython 3.8.2 (Linux x86_64) โ
โ drivers โ โ
โ transport โ aiokafka=1.1.6 โ
โ web โ aiohttp=3.6.2 โ
โ datadir โ /path/to/project/horton-data โ
โ appdir โ /path/to/project/horton-data/v1 โ
โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
... ะปะพะณะธ, ะปะพะณะธ, ะปะพะณะธ ...
โTopic Partition Setโโโโโโโโโโฌโโโโโโโโโโโโโ
โ topic โ partitions โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโค
โ collect_securities โ {0-7} โ
โ horton-__assignor-__leader โ {0} โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโ
์ด์์์ด!!!
ํํฐ์
์ธํธ๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค. ๋ณด์๋ค์ํผ ์ฝ๋์์ ์ง์ ํ ์ด๋ฆ, ์ฆ ๊ธฐ๋ณธ ํํฐ์
์(8, ๋ค์์์ ๊ฐ์ ธ์ด)๋ก ์ฃผ์ ๊ฐ ์์ฑ๋์์ต๋๋ค.
์ด์ ๋ค๋ฅธ ํฐ๋ฏธ๋ ์ฐฝ์ผ๋ก ์ด๋ํ์ฌ ์ฃผ์ ์ ๋น ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ์ ์์ต๋๋ค.
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS๋ฅผ ์ฌ์ฉํ์ฌ @
์ฐ๋ฆฌ๋ "collect_securities"๋ผ๋ ์ฃผ์ ๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ ์์์ ๋ณด์ฌ์ค๋๋ค.
์ด ๊ฒฝ์ฐ ๋ฉ์์ง๋ ํํฐ์
6์ผ๋ก ์ด๋ํ์ต๋๋ค. kafdrop์ผ๋ก ์ด๋ํ์ฌ ์ด๋ฅผ ํ์ธํ ์ ์์ต๋๋ค. localhost:9000
์์ ์์ ํจ๊ป ํฐ๋ฏธ๋ ์ฐฝ์ผ๋ก ์ด๋ํ๋ฉด loguru๋ฅผ ์ฌ์ฉํ์ฌ ์ ์ก๋ ํ๋ณตํ ๋ฉ์์ง๋ฅผ ๋ณผ ์ ์์ต๋๋ค.
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
๋ํ (Robo3T ๋๋ Studio3T๋ฅผ ์ฌ์ฉํ์ฌ) mongo๋ฅผ ์กฐ์ฌํ๊ณ ์ฆ๊ถ์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์๋์ง ํ์ธํ ์ ์์ต๋๋ค.
๋๋ ์ต๋ง์ฅ์๊ฐ ์๋๋ฏ๋ก ์ฒซ ๋ฒ์งธ ๋ณด๊ธฐ ์ต์ ์ ๋ง์กฑํฉ๋๋ค.
ํ๋ณต๊ณผ ๊ธฐ์จ - ์ฒซ ๋ฒ์งธ ์์์ด ์ค๋น๋์์ต๋๋ค :)
์์ด์ ํธ ์ค๋น ์๋ฃ, ์ ์์ด์ ํธ ๋ง์ธ!
์, ์ฌ๋ฌ๋ถ, ์ฐ๋ฆฌ๋ ์ด ๊ธฐ์ฌ์์ ์ค๋นํ ๊ฒฝ๋ก์ 1/3๋ง ๋ค๋ฃจ์์ง๋ง ์ด์ ๋ ์ฌ์ธ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ๋๋ดํ์ง ๋ง์ญ์์ค.
์ด์ ๋ฉํ ์ ๋ณด๋ฅผ ์์งํ์ฌ ์์ง ๋ฌธ์์ ๋ฃ๋ ์์ด์ ํธ๊ฐ ํ์ํฉ๋๋ค.
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
์ด ์์ด์ ํธ๋ ํน์ ๋ณด์์ ๋ํ ์ ๋ณด๋ฅผ ์ฒ๋ฆฌํ๋ฏ๋ก ๋ฉ์์ง์ ์ด ๋ณด์์ ํฐ์ปค(๊ธฐํธ)๋ฅผ ํ์ํด์ผ ํฉ๋๋ค. ์ด๋ฅผ ์ํด ํ์ฐ์คํธ์๋ ๋ค์์ด ์์ต๋๋ค.
์ด ๊ฒฝ์ฐ ๋ค์์ผ๋ก ๊ฐ๋ด
์๋ค.
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
์ง์ํ ์ ์๋ฏ์ด, faust๋ Python ์ ํ ์ฃผ์์ ์ฌ์ฉํ์ฌ ๋ฉ์์ง ์คํค๋ง๋ฅผ ์ค๋ช
ํฉ๋๋ค. ์ด๊ฒ์ด ๋ฐ๋ก ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ์ง์ํ๋ ์ต์ ๋ฒ์ ์ด ๋ค์๊ณผ ๊ฐ์ ์ด์ ์
๋๋ค.
์์ด์ ํธ๋ก ๋์๊ฐ์ ์ ํ์ ์ค์ ํ๊ณ ์ถ๊ฐํด ๋ณด๊ฒ ์ต๋๋ค.
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
๋ณด์๋ค์ํผ, ์ฐ๋ฆฌ๋ ์ฃผ์ ์ด๊ธฐํ ๋ฐฉ๋ฒ์ธ value_type์ ์คํค๋ง๊ฐ ํฌํจ๋ ์ ๋งค๊ฐ๋ณ์๋ฅผ ์ ๋ฌํฉ๋๋ค. ๋ํ ๋ชจ๋ ๊ฒ์ด ๋์ผํ ๊ณํ์ ๋ฐ๋ฅด๋ฏ๋ก ๋ค๋ฅธ ๊ฒ์ ๋ํด ์๊ฐํ ํ์๊ฐ ์์ต๋๋ค.
๋ง์ง๋ง์ผ๋ก ๋ฉํ ์ ๋ณด ์์ง ์์ด์ ํธ์ ๋ํ ํธ์ถ์ ์ถ๊ฐํ์ฌ Collect_securitites๋ฅผ ์ถ๊ฐํ๋ ๊ฒ์ ๋๋ค.
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
์ฐ๋ฆฌ๋ ๋ฉ์์ง์ ๋ํด ์ด์ ์ ๋ฐํ๋ ์ฒด๊ณ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด ๊ฒฝ์ฐ ์์ด์ ํธ์ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆด ํ์๊ฐ ์๊ธฐ ๋๋ฌธ์ .cast ๋ฐฉ๋ฒ์ ์ฌ์ฉํ์ง๋ง ์ธ๊ธํ ๊ฐ์น๊ฐ ์์ต๋๋ค.
-
์บ์คํธ - ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ํ์ง ์๊ธฐ ๋๋ฌธ์ ์ฐจ๋จํ์ง ์์ต๋๋ค. ๊ฒฐ๊ณผ๋ฅผ ๋ค๋ฅธ ์ฃผ์ ์ ๋ฉ์์ง๋ก ๋ณด๋ผ ์ ์์ต๋๋ค.
-
๋ณด๋ด๊ธฐ - ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ํ์ง ์๊ธฐ ๋๋ฌธ์ ์ฐจ๋จํ์ง ์์ต๋๋ค. ๊ฒฐ๊ณผ๊ฐ ์ ๋ฌ๋ ์ฃผ์ ์ ์์ด์ ํธ๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค.
-
๋ฌป๋ค - ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆฝ๋๋ค. ๊ฒฐ๊ณผ๊ฐ ์ ๋ฌ๋ ์ฃผ์ ์ ์์ด์ ํธ๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค.
๊ทธ๋ผ ์ค๋์ ์๋ด์ ์๊ฐ๋ ์ฌ๊ธฐ๊น์ง์ ๋๋ค!
๋๋ฆผํ
์ด ๋ถ๋ถ์์ ์ ๊ฐ ๋ง์ง๋ง์ผ๋ก ์ฐ๊ฒ ๋ค๊ณ ์ฝ์ํ ๊ฒ์ ๋ช ๋ น์ ๋๋ค. ์์ ์ธ๊ธํ๋ฏ์ด, faust์ ๋ช ๋ น์ ํด๋ฆญ์ ๋๋ฌ์ผ ๋ํผ์ ๋๋ค. ์ค์ ๋ก faust๋ -A ํค๋ฅผ ์ง์ ํ ๋ ๋จ์ํ ์ฌ์ฉ์ ์ ์ ๋ช ๋ น์ ์ธํฐํ์ด์ค์ ์ฐ๊ฒฐํฉ๋๋ค.
๋ฐํ๋ ์์ด์ ํธ ์ดํ
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
๋ฐ๋ผ์ ๋ช ๋ น ๋ชฉ๋ก์ ํธ์ถํ๋ฉด ์ ๋ช ๋ น์ด ๊ทธ ์์ ํฌํจ๋ฉ๋๋ค.
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
๋ค๋ฅธ ์ฌ๋์ฒ๋ผ ์ฌ์ฉํ ์ ์์ผ๋ฏ๋ก Faust Worker๋ฅผ ๋ค์ ์์ํ๊ณ ๋ณธ๊ฒฉ์ ์ธ ์ฆ๊ถ ์์ง์ ์์ํ๊ฒ ์ต๋๋ค.
> faust -A horton.agents start-collect-securities
๊ทธ ๋ค์์ ์ด๋ป๊ฒ๋ฉ๋๊น?
๋ค์ ๋ถ๋ถ์์๋ ๋๋จธ์ง ์์ด์ ํธ๋ฅผ ์๋ก ๋ค์ด ํด๋น ์ฐ๋์ ๊ฑฐ๋ ์ข ๊ฐ์ ์์ด์ ํธ์ ํฌ๋ก ์ถ์์์ ๊ทน๋จ์ ๊ฒ์ํ๊ธฐ ์ํ ์ฑํฌ ๋ฉ์ปค๋์ฆ์ ๊ณ ๋ คํ ๊ฒ์ ๋๋ค.
์ค๋์ ๊ทธ๊ฒ ๋ค์ผ! ์ฝ์ด ์ฃผ์ ์ ๊ฐ์ฌํฉ๋๋ค :)
์ถ์ : ๋ง์ง๋ง ๋ถ๋ถ์์ ๋๋ ํ์ฐ์คํธ์ ์ตํฉ ์นดํ์นด(
์ถ์ฒ : habr.com