Мундариҷа
-
Қисми II: Агентҳо ва дастаҳо
Мо дар ин ҷо чӣ кор карда истодаем?
Ҳамин тавр, қисми дуюм. Тавре ки қаблан навишта шуда буд, дар он мо корҳои зеринро иҷро мекунем:
-
Биёед як муштарии хурдеро барои alphavantage дар aiohttp бо дархостҳо барои нуқтаҳои ниҳоӣ ба мо нависед.
-
Биёед агентеро эҷод кунем, ки маълумотро дар бораи коғазҳои қиматнок ва мета-маълумот дар бораи онҳо ҷамъоварӣ кунад.
Аммо, ин аст он чизе ки мо барои худи лоиҳа кор хоҳем кард ва дар робита ба таҳқиқоти фауст, мо мефаҳмем, ки чӣ гуна агентҳоеро, ки рӯйдодҳои ҷараёнро аз кафка коркард мекунанд ва инчунин чӣ гуна навиштани фармонҳоро (кликро пахш кунед), дар ҳолати мо - барои паёмҳои такони дастӣ ба мавзӯъе, ки агент назорат мекунад.
Омодагӣ
Мизоҷи AlphaVantage
Аввалан, биёед як муштарии хурди aiohttp барои дархостҳо ба alphavantage нависед.
Spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Воқеан, аз он ҳама чиз равшан аст:
-
API AlphaVantage хеле содда ва зебо тарҳрезӣ шудааст, аз ин рӯ ман қарор додам, ки ҳама дархостҳоро тавассути ин усул иҷро кунам
construct_query
дар он ҷо дар навбати худ занги http вуҷуд дорад. -
Ман тамоми майдонҳоро ба он меорам
snake_case
барои роҳат. -
Хуб, ороиши logger.catch барои баромади пайгирии зебо ва иттилоотӣ.
PS Фаромӯш накунед, ки аломати alphavantage ба таври маҳаллӣ ба config.yml илова кунед ё тағирёбандаи муҳити зистро содир кунед HORTON_SERVICE_APIKEY
. Мо нишона мегирем
Синфи CRUD
Мо коллексияи коғазҳои қиматнокро барои нигоҳ доштани мета маълумот дар бораи коғазҳои қиматнок хоҳем дошт.
Ба андешаи ман, дар ин ҷо ҳеҷ чизро шарҳ додан лозим нест ва худи синфи асосӣ хеле содда аст.
get_app()
Биёед функсияро барои сохтани объекти барнома дар
Spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Ҳоло мо соддатарин эҷоди барномаро дорем, каме дертар онро васеъ хоҳем кард, аммо барои он ки шуморо интизор нашавед, дар ин ҷо
Қисми асосии он
Агент оид ба ҷамъоварӣ ва нигоҳдории рӯйхати коғазҳои қиматнок
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Ҳамин тавр, аввал мо объекти барномаи faust-ро мегирем - ин хеле оддӣ аст. Баъдан, мо ба таври возеҳ мавзӯъро барои агенти худ эълон мекунем ... Дар ин ҷо бояд қайд кард, ки он чист, параметри дохилӣ чист ва чӣ гуна онро ба таври дигар ташкил кардан мумкин аст.
-
Мавзӯъҳо дар кафка, агар бихоҳем таърифи дақиқро бидонем, хондан беҳтар аст
хомӯш. ҳуҷҷат , ё шумо метавонед хонедмаҷмӯа дар Habré бо забони русӣ, ки дар он ҳама чиз хеле дақиқ инъикос ёфтааст :) -
Параметри дохилӣ , ки дар faust doc хеле хуб тавсиф шудааст, ба мо имкон медиҳад, ки мавзӯъро мустақиман дар код танзим кунем, албатта ин маънои параметрҳоеро дорад, ки таҳиягарони faust пешниҳод кардаанд, масалан: нигоҳдорӣ, сиёсати нигоҳдорӣ (бо нобаёнӣ нест кардан, аммо шумо метавонед танзим кунед.зич ), шумораи бахшҳо дар як мавзӯъ (қисмҳо кардан, масалан, камтар азахамияти умумичахонй барномаҳои faust). -
Умуман, агент метавонад мавзӯи идорашавандаро бо арзишҳои глобалӣ эҷод кунад, аммо ман мехоҳам ҳама чизро ба таври возеҳ эълон кунам. Илова бар ин, баъзе параметрҳоро (масалан, шумораи қисмҳо ё сиёсати нигоҳдорӣ) мавзӯъ дар таблиғи агент танзим кардан мумкин нест.
Ин аст он чизест, ки бидуни муайян кардани мавзӯъ ба таври дастӣ ба назар мерасад:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Хуб, ҳоло биёед тавсиф кунем, ки агенти мо чӣ кор хоҳад кард :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Ҳамин тавр, дар оғози агент, мо барои дархостҳо тавассути муштарии худ сессияи aiohttp кушоем. Ҳамин тариқ, ҳангоми ба кор андохтани коргар, вақте ки агенти мо оғоз мешавад, дарҳол сеанс кушода мешавад - як, барои тамоми вақти коркунанда (ё якчанд, агар шумо параметрро тағир диҳед)
Баъдан, мо ҷараёнро пайгирӣ мекунем (мо паёмро дар _
, зеро мо, дар ин агент, ба мундариҷа) паёмҳои мавзӯи мо аҳамият намедиҳем, агар онҳо дар ҷабҳаи ҷорӣ мавҷуд бошанд, вагарна давраи мо омадани онҳоро интизор мешавад. Хуб, дар дохили ҳалқаи мо, мо қабули паёмро сабт мекунем, рӯйхати коғазҳои қиматноки фаъолро мегирем (get_securities танҳо бо нобаёнӣ фаъол аст, рамзи муштариро бубинед) ва онро дар базаи маълумот захира карда, тафтиш мекунем, ки оё коғазҳои қиматнок бо ҳамон тикер вуҷуд дорад ва мубодила дар базаи маълумот, агар мавҷуд бошад, он (коғаз) танҳо нав карда мешавад.
Биёед эҷоди худро оғоз кунем!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Хусусиятҳои PS
Дар фармони оғози мо, мо ба faust гуфтем, ки объекти барномаро дар куҷо ҷустуҷӯ кардан лозим аст ва бо он чӣ бояд кард (коргарро оғоз кунед) бо сатҳи баромади иттилоот. Мо натиҷаи зеринро ба даст меорем:
Spoiler
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
Ин зинда аст!!!
Биёед маҷмӯи тақсимотро бубинем. Тавре ки мо мебинем, мавзӯъ бо номе сохта шудааст, ки мо дар код таъин кардем, шумораи пешфарзҳои қисмҳо (8, аз
Хуб, ҳоло мо метавонем ба равзанаи дигари терминал равем ва ба мавзӯи худ паёми холӣ фиристем:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS истифода @
мо нишон медиҳем, ки мо ба мавзӯи бо номи "collect_securities" паём фиристода истодаем.
Дар ин ҳолат, паём ба қисмати 6 рафт - шумо метавонед инро тавассути рафтан ба kafdrop дар санҷед localhost:9000
Бо коргари худ ба равзанаи терминал рафта, мо паёми хушбахтеро мебинем, ки бо истифода аз loguru фиристода шудааст:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Мо инчунин метавонем ба mongo (бо истифода аз Robo3T ё Studio3T) назар кунем ва бубинем, ки коғазҳои қиматнок дар пойгоҳи додаҳо мавҷуданд:
Ман миллиардер нестам ва аз ин рӯ мо бо интихоби аввалини тамошо қаноатмандем.
Хушбахтӣ ва шодӣ - агенти аввал омода аст :)
Агент тайёр, зинда бод агенти нав!
Бале, ҷанобон, мо ҳамагӣ 1/3 роҳи омодакардаи ин мақоларо тай кардем, аммо рӯҳафтода нашавед, зеро акнун ин осонтар мешавад.
Ҳамин тавр, ҳоло ба мо агенте лозим аст, ки иттилооти метаро ҷамъоварӣ мекунад ва онро ба ҳуҷҷати ҷамъоварӣ мегузорад:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Азбаски ин агент маълумотро дар бораи коғазҳои қиматноки мушаххас коркард мекунад, мо бояд дар паём аломати (рамзи) ин коғазро нишон диҳем. Бо ин максад дар фауст мавчуданд
Дар ин ҳолат, биёед ба
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Тавре ки шумо тахмин кардаед, faust барои тавсифи схемаи паём шарҳи навъи python-ро истифода мебарад, аз ин рӯ версияи ҳадди ақали аз ҷониби китобхона дастгирӣшаванда аст.
Биёед ба агент баргардем, намудҳоро таъин кунед ва онро илова кунед:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Тавре ки шумо мебинед, мо параметри навро бо схема ба усули оғозкунии мавзӯъ мегузорем - value_type. Гузашта аз ин, ҳама чиз аз рӯи ҳамон нақша сурат мегирад, бинобар ин ман дар бораи чизи дигар чизе намебинам.
Хуб, ламси ниҳоӣ ин илова кардани занг ба агенти ҷамъоварии иттилоот барои collect_securitites аст:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Мо барои паём схемаи қаблан эълоншударо истифода мебарем. Дар ин ҳолат, ман усули .cast-ро истифода кардам, зеро ба мо лозим нест, ки натиҷаро аз агент интизор шавем, аммо бояд қайд кард, ки
-
рехтан - блок намекунад, зеро он натиҷаро интизор нест. Шумо наметавонед натиҷаро ба мавзӯи дигар ҳамчун паём фиристед.
-
ирсол - баста намешавад, зеро он натиҷаро интизор нест. Шумо метавонед агентеро дар мавзӯъе, ки натиҷа ба он меравад, муайян кунед.
-
пурсед — натичаашро интизор аст. Шумо метавонед агентеро дар мавзӯъе, ки натиҷа ба он меравад, муайян кунед.
Ҳамин тавр, ин ҳама бо агентҳо барои имрӯз аст!
Дастаи орзуҳо
Охирин чизе, ки ман ваъда дода будам, ки дар ин бахш менависам, ин фармонҳост. Тавре ки қаблан зикр гардид, фармонҳо дар faust як печонидани клик мебошанд. Дар асл, faust ҳангоми муайян кардани калиди -A фармони фармоишии моро ба интерфейси худ замима мекунад
Пас аз он ки агентхои эълоншуда дар
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Ҳамин тариқ, агар мо рӯйхати фармонҳоро даъват кунем, фармони нави мо дар он хоҳад буд:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Мо метавонем онро мисли ҳар каси дигар истифода барем, аз ин рӯ биёед коргари фаустро бозоғоз кунем ва коллексияи мукаммали коғазҳои қиматнокро оғоз кунем:
> faust -A horton.agents start-collect-securities
Баъд чӣ мешавад?
Дар қисми оянда, бо истифода аз агентҳои боқимонда ҳамчун намуна, мо механизми таназзулро барои ҷустуҷӯи ифротӣ дар нархҳои пӯшидаи савдо дар сол ва оғози cron агентҳоро баррасӣ хоҳем кард.
Ин ҳама барои имрӯз аст! Ташаккур барои хондан :)
PS Дар қисми охир аз ман дар бораи кафкаи фауст ва омехта пурсиданд (
Манбаъ: will.com