ç®æ¬¡
-
ããŒã II: ãšãŒãžã§ã³ããšããŒã
ç§ãã¡ã¯ããã§äœãããŠããã®ã§ããããïŒ
ããã§ãããã§ã第äºéšã åã«æžããããã«ããã®äžã§æ¬¡ã®ããšãè¡ããŸãã
-
å¿ èŠãªãšã³ããã€ã³ãã«å¯Ÿãããªã¯ãšã¹ããå«ã aiohttp äžã® alphavantage çšã®å°ããªã¯ã©ã€ã¢ã³ããäœæããŠã¿ãŸãããã
-
蚌åžã«é¢ããããŒã¿ãšèšŒåžã«é¢ããã¡ã¿æ å ±ãåéãããšãŒãžã§ã³ããäœæããŸãããã
ããããããã¯ãããžã§ã¯ãèªäœã«å¯ŸããŠè¡ãããšã§ããããã¡ãŠã¹ãã®ç 究ã®èŠ³ç¹ããã¯ãkafka ããã®ã¹ããªãŒã ã€ãã³ããåŠçãããšãŒãžã§ã³ãã®äœææ¹æ³ãšãã³ãã³ã (ã¯ãªã㯠ã©ãããŒ) ã®äœææ¹æ³ãåŠã³ãŸãããšãŒãžã§ã³ããç£èŠããŠãããããã¯ãžã®æåããã·ã¥ ã¡ãã»ãŒãžã®å Žåã
èšç·Ž
AlphaVantage ã¯ã©ã€ã¢ã³ã
ãŸããalphavantage ãžã®ãªã¯ãšã¹ãçšã®å°ã㪠aiohttp ã¯ã©ã€ã¢ã³ããäœæããŸãããã
ã¹ãã€ã©ãŒ
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
å®éããããããã¹ãŠãæããã§ãã
-
AlphaVantage API ã¯éåžžã«ã·ã³ãã«ãã€çŸããèšèšãããŠããããããã¹ãŠã®ãªã¯ãšã¹ãã次ã®ã¡ãœãããéããŠè¡ãããšã«ããŸããã
construct_query
ãã㧠http åŒã³åºããè¡ãããŸãã -
ãã¹ãŠã®ãã£ãŒã«ããæã£ãŠããŸã
snake_case
å¿«é©ãã®ããã«ã -
ããŠãçŸããæçãªãã¬ãŒã¹ããã¯åºåã®ããã® logger.catch è£ é£Ÿã§ãã
PS alphavantage ããŒã¯ã³ãããŒã«ã«ã§ config.yml ã«è¿œå ããããç°å¢å€æ°ããšã¯ã¹ããŒãããããšãå¿ããªãã§ãã ããã HORTON_SERVICE_APIKEY
ã ããŒã¯ã³ãåãåããŸã
CRUDã¯ã©ã¹
蚌åžã«é¢ããã¡ã¿æ å ±ãä¿åããããã®èšŒåžã³ã¬ã¯ã·ã§ã³ãçšæããŸãã
ç§ã®æèŠã§ã¯ãããã§äœã説æããå¿ èŠã¯ãªããåºåºã¯ã©ã¹èªäœã¯éåžžã«åçŽã§ãã
ã¢ããªãååŸïŒïŒ
ã«ã¢ããªã±ãŒã·ã§ã³ãªããžã§ã¯ããäœæããé¢æ°ãè¿œå ããŸãããã
ã¹ãã€ã©ãŒ
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
ä»ã®ãšãããæãåçŽãªã¢ããªã±ãŒã·ã§ã³ãäœæããŸããå°ãåŸã§æ¡åŒµããŸããããåŸ
ããããªãããã«ãããã§èª¬æããŸãã
ã¡ã€ã³
æ䟡蚌åžãªã¹ãã®åéããã³ç®¡çã®ä»£ç人
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
ãããã£ãŠãæåã« faust ã¢ããªã±ãŒã·ã§ã³ ãªããžã§ã¯ããååŸããŸããããã¯éåžžã«åçŽã§ãã 次ã«ããšãŒãžã§ã³ãã®ãããã¯ãæ瀺çã«å®£èšããŸããããã§ã¯ããããã¯ãäœã§ããããå éšãã©ã¡ãŒã¿ãäœã§ããããããã³ãããã©ã®ããã«å€æŽã§ãããã«ã€ããŠèšåãã䟡å€ããããŸãã
-
Kafka ã®ãããã¯ãæ£ç¢ºãªå®çŸ©ãç¥ãããå Žåã¯ãèªãããšããå§ãããŸãã
ãªãã æžé¡ ããŸãã¯èªãããšãã§ããŸãæŠèŠ ãã·ã¢èªã®ããã¬ã§ã¯ããã¹ãŠãéåžžã«æ£ç¢ºã«åæ ãããŠããŸã:) -
ãã©ã¡ãŒã¿å éš ãã¡ãŠã¹ãã®ããã¥ã¡ã³ãã§ãã説æãããŠãããã³ãŒãå ã§ãããã¯ãçŽæ¥èšå®ã§ããŸãããã¡ãããããã¯ããã¡ãŠã¹ãéçºè ã«ãã£ãŠæäŸããããã©ã¡ãŒã¿ãæå³ããŸããããšãã°ãä¿æãä¿æããªã·ãŒ (ããã©ã«ãã§ã¯åé€ãããŸãããèšå®ã§ããŸã)ã³ã³ãã¯ã )ããããã¯ãããã®ããŒãã£ã·ã§ã³æ° (ããŒãã£ã·ã§ã³ ããšãã°ã以äžã®ããšãè¡ãäžççãªéèŠæ§ ã¢ããªã±ãŒã·ã§ã³ãã¡ã¹ã)ã -
äžè¬ã«ããšãŒãžã§ã³ãã¯ã°ããŒãã«å€ã䜿çšããŠç®¡çãããã¯ãäœæã§ããŸãããç§ã¯ãã¹ãŠãæ瀺çã«å®£èšããããšèããŠããŸãã ããã«ããšãŒãžã§ã³ã ã¢ããã¿ã€ãºã¡ã³ãå ã®ãããã¯ã®äžéšã®ãã©ã¡ãŒã¿ãŒ (ããŒãã£ã·ã§ã³ã®æ°ãä¿æããªã·ãŒãªã©) ã¯æ§æã§ããŸããã
ãããã¯ãæåã§å®çŸ©ããªãå Žåã¯æ¬¡ã®ããã«ãªããŸãã
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
ããŠããšãŒãžã§ã³ããäœããããã説æããŸããã:)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
ãããã£ãŠããšãŒãžã§ã³ãã®éå§æã«ãã¯ã©ã€ã¢ã³ããä»ãããªã¯ãšã¹ãã«å¯Ÿã㊠aiohttp ã»ãã·ã§ã³ãéããŸãã ãããã£ãŠãã¯ãŒã«ãŒãéå§ãããšãããšãŒãžã§ã³ããèµ·åããããšãã»ãã·ã§ã³ãããã«éãããŸã - ã¯ãŒã«ãŒãå®è¡ãããŠããéãã£ãš XNUMX 〠(ãŸãã¯ãã©ã¡ãŒã¿ãŒãå€æŽããå Žåã¯è€æ°)
次ã«ãã¹ããªãŒã ã«åŸããŸã (ã¡ãã»ãŒãžã _
ããã®ãšãŒãžã§ã³ãã§ã¯ãçŸåšã®ãªãã»ããã«ã¡ãã»ãŒãžãååšããå Žåããããã¯ããã®ã¡ãã»ãŒãžã®å
容ãæ°ã«ããªããããããã§ãªãå Žåããµã€ã¯ã«ã¯ã¡ãã»ãŒãžã®å°çãåŸ
ã¡ãŸãã ããŠãç§ãã¡ã®ã«ãŒãå
ã§ã¯ãã¡ãã»ãŒãžã®åä¿¡ããã°ã«èšé²ããã¢ã¯ãã£ããªèšŒåžã®ãªã¹ããååŸã㊠(get_securities ã¯ããã©ã«ãã§ã¢ã¯ãã£ããªèšŒåžã®ã¿ãè¿ããŸããã¯ã©ã€ã¢ã³ã ã³ãŒããåç
§)ããããããŒã¿ããŒã¹ã«ä¿åããåããã£ãã«ãŒãæã€èšŒåžããããã©ããã確èªããããŒã¿ããŒã¹å
ã«äº€æãããã°ããã (è«æ) ãæŽæ°ãããã ãã§ãã
ç§ãã¡ã®äœåãç«ã¡äžããŸãããïŒ
> docker-compose up -d
... ÐапÑÑк кПМÑейМеÑПв ...
> faust -A horton.agents worker --without-web -l info
PSã®ç¹åŸŽ
èµ·åã³ãã³ãã§ã¯ãæ å ±ãã°ã®åºåã¬ãã«ã䜿çšããŠãã¢ããªã±ãŒã·ã§ã³ ãªããžã§ã¯ããã©ãã§æ¢ããããããã©ãåŠçããã (ã¯ãŒã«ãŒãéå§ãã) ã faust ã«æ瀺ããŸããã 次ã®åºåãåŸãããŸãã
ã¹ãã€ã©ãŒ
âÆaµSâ v1.10.4â¬ââââââââââââââââââââââââââââââââââââââââââââââââââââ
â id â horton â
â transport â [URL('kafka://localhost:9092')] â
â store â memory: â
â log â -stderr- (info) â
â pid â 1271262 â
â hostname â host-name â
â platform â CPython 3.8.2 (Linux x86_64) â
â drivers â â
â transport â aiokafka=1.1.6 â
â web â aiohttp=3.6.2 â
â datadir â /path/to/project/horton-data â
â appdir â /path/to/project/horton-data/v1 â
âââââââââââââââŽââââââââââââââââââââââââââââââââââââââââââââââââââââ
... лПгО, лПгО, лПгО ...
âTopic Partition Setââââââââââ¬âââââââââââââ
â topic â partitions â
ââââââââââââââââââââââââââââââŒâââââââââââââ€
â collect_securities â {0-7} â
â horton-__assignor-__leader â {0} â
ââââââââââââââââââââââââââââââŽâââââââââââââ
çããŠãïŒïŒïŒ
ããŒãã£ã·ã§ã³ã»ãããèŠãŠã¿ãŸãããã ã芧ã®ãšããããããã¯ã¯ã³ãŒãã§æå®ããååãããã©ã«ãã®ããŒãã£ã·ã§ã³æ° (8ãããååŸ) ã§äœæãããŸããã
ããŠãä»åºŠã¯å¥ã®ã¿ãŒããã« ãŠã£ã³ããŠã«ç§»åããŠããããã¯ã«ç©ºã®ã¡ãã»ãŒãžãéä¿¡ããŸãã
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PSã䜿çšã㊠@
ãcollect_securitiesããšããååã®ãããã¯ã«ã¡ãã»ãŒãžãéä¿¡ããŠããããšã瀺ããŠããŸãã
ãã®å Žåãã¡ãã»ãŒãžã¯ããŒãã£ã·ã§ã³ 6 ã«éä¿¡ãããŸãããããã¯ãkafdrop ã«ã¢ã¯ã»ã¹ããããšã§ç¢ºèªã§ããŸãã localhost:9000
ã¯ãŒã«ãŒãšäžç·ã«ã¿ãŒããã« ãŠã£ã³ããŠã«ç§»åãããšãloguru ã䜿çšããŠéä¿¡ããã幞ããªã¡ãã»ãŒãžã衚瀺ãããŸãã
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
(Robo3T ãŸã㯠Studio3T ã䜿çšããŠ) mongo ã調ã¹ãŠã蚌åžãããŒã¿ããŒã¹å ã«ããããšã確èªããããšãã§ããŸãã
ç§ã¯åäžé·è ã§ã¯ãªãã®ã§ãæåã®èŠèŽãªãã·ã§ã³ã«æºè¶³ããŠããŸãã
幞ããšåã³ - æåã®ãšãŒãžã§ã³ãã®æºåãã§ããŸãã:)
ãšãŒãžã§ã³ãã®æºåã¯å®äºã§ããæ°ãããšãŒãžã§ã³ãäžæ³ïŒ
ã¯ããçããããã®èšäºã§çšæããæé ã® 1 åã® 3 ããã«ããŒããŠããŸããããèœèããªãã§ãã ãããããããã¯ãã£ãšç°¡åã«ãªãããã§ãã
ãããã£ãŠãã¡ã¿æ å ±ãåéãããããã³ã¬ã¯ã·ã§ã³ ããã¥ã¡ã³ãã«å ¥ãããšãŒãžã§ã³ããå¿ èŠã«ãªããŸãã
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
ãã®ãšãŒãžã§ã³ãã¯ç¹å®ã®èšŒåžã«é¢ããæ
å ±ãåŠçãããããã¡ãã»ãŒãžå
ã§ãã®èšŒåžã®ãã£ãã«ãŒ (ã·ã³ãã«) ã瀺ãå¿
èŠããããŸãã ãã®ç®çã®ããã«ããã¡ãŠã¹ãã«ã¯æ¬¡ã®ãããªãã®ããããŸãã
ãã®å Žåã¯ã次ã®å Žæã«è¡ããŸããã
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
ãæ³åã®ãšãããfaust 㯠Python åã¢ãããŒã·ã§ã³ã䜿çšããŠã¡ãã»ãŒãž ã¹ããŒããèšè¿°ããŸãããã®ãããã©ã€ãã©ãªã§ãµããŒããããæå°ããŒãžã§ã³ã¯æ¬¡ã®ãšããã§ãã
ãšãŒãžã§ã³ãã«æ»ããã¿ã€ããèšå®ããŠè¿œå ããŸãããã
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
ã芧ã®ãšãããã¹ããŒã ãå«ãæ°ãããã©ã¡ãŒã¿ãŒããããã¯åæåã¡ãœãã value_type ã«æž¡ããŸãã ããã«ããã¹ãŠãåãã¹ããŒã ã«åŸã£ãŠããã®ã§ãä»ã®ããšã«ãã ããæå³ã¯ãªããšæããŸãã
ããŠãæåŸã®ä»äžãã¯ãã¡ã¿æ å ±åéãšãŒãžã§ã³ããžã®åŒã³åºããcollect_securititesã«è¿œå ããããšã§ãã
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
ã¡ãã»ãŒãžã«ã¯ä»¥åã«çºè¡šãããã¹ããŒã ã䜿çšããŸãã ãã®å ŽåããšãŒãžã§ã³ãããã®çµæãåŸ
ã€å¿
èŠããªãããã.cast ã¡ãœããã䜿çšããŸããããèšåãã䟡å€ããããŸãã
-
Cast - çµæãæåŸ ããŠããªããããããã¯ããŸããã çµæãã¡ãã»ãŒãžãšããŠå¥ã®ãããã¯ã«éä¿¡ããããšã¯ã§ããŸããã
-
send - çµæãæåŸ ããŠããªããããããã¯ããŸããã çµæãéä¿¡ããããããã¯å ã§ãšãŒãžã§ã³ããæå®ã§ããŸãã
-
ask - çµæãåŸ ã¡ãŸãã çµæãéä¿¡ããããããã¯å ã§ãšãŒãžã§ã³ããæå®ã§ããŸãã
ããã§ã¯ãä»æ¥ã®ãšãŒãžã§ã³ãã«ã€ããŠã¯ãããŸã§ã§ãã
æé«ã®ããŒã
ãã®ããŒãã§æåŸã«æžããšçŽæããã®ã¯ã³ãã³ãã§ãã åè¿°ããããã«ãfaust ã®ã³ãã³ãã¯ã¯ãªãã¯ã«é¢ããã©ãããŒã§ãã å®éãfaust ã¯ã-A ããŒãæå®ãããšãã«ã«ã¹ã¿ã ã³ãã³ããã€ã³ã¿ãŒãã§ã€ã¹ã«ã¢ã¿ããããã ãã§ãã
çºè¡šããããšãŒãžã§ã³ãã®åŸã
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
ãããã£ãŠãã³ãã³ãã®ãªã¹ããåŒã³åºããšãæ°ããã³ãã³ãããã®äžã«å«ãŸããŸãã
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
ä»ã®äººãšåãããã«äœ¿çšã§ããã®ã§ããã¡ãŠã¹ã ã¯ãŒã«ãŒãåèµ·åããŠã蚌åžã®æ¬æ Œçãªåéãéå§ããŸãããã
> faust -A horton.agents start-collect-securities
次ã«äœãèµ·ããã®ã ãããïŒ
次ã®ããŒãã§ã¯ãæ®ãã®ãšãŒãžã§ã³ããäŸãšããŠããã®å¹Žã®ååŒã®çµå€ã®æ¥µå€ãæ¢ãããã®ã·ã³ã¯ã¡ã«ããºã ãšãšãŒãžã§ã³ãã® cron èµ·åã«ã€ããŠæ€èšããŸãã
ãããä»æ¥ã®ãã¹ãŠã§ãïŒ èªãã§ãããŠããããšã ïŒïŒ
PS æåŸã®éšåã§ããã¡ãŠã¹ããšåæµã«ãã«ã«ã€ããŠè³ªåãããŸãã (
åºæïŒ habr.com