Table of Contents
BeĆ II: Ajan Ă» TĂźm
Em li vir çi dikin?
Ji ber vĂȘ yekĂȘ, beĆa duyemĂźn. WekĂź ku berĂȘ hatĂź nivĂźsandin, di wĂȘ de em ĂȘ jĂȘrĂźn bikin:
Ka em ji bo alphavantage xerĂźdarek piçûk li aiohttp bi daxwazĂȘn xalĂȘn dawĂź yĂȘn ku em hewce ne binivĂźsin.
Ka em ajanek biafirĂźnin ku dĂȘ daneyĂȘn li ser ewlehiyĂȘ Ă» agahdariya meta li ser wan berhev bike.
LĂȘ, ya ku em ĂȘ ji bo projeyĂȘ bixwe bikin ev e, Ă» di warĂȘ lĂȘkolĂźna faustĂȘ de, em ĂȘ fĂȘr bibin ka meriv çawa ajanĂȘn ku bĂ»yeran ji kafka diherikĂźnin binivĂźsin, Ă» her weha meriv çawa fermanan binivĂźsĂźne (wrapper bikirtĂźne), di doza me de - ji bo peyamĂȘn push manual ji bo mijara ku agent çavdĂȘriya.
Amadekirin
AlphaVantage Client
PĂȘĆĂźn, bila ji bo daxwazĂȘn alphavantage xerĂźdarek aiohttp piçûk binivĂźsin.
Spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Bi rastĂź, her tiĆt ji wĂȘ zelal e:
AlphaVantage API pir sade Ă» xweĆik hatĂź sĂȘwirandin, ji ber vĂȘ yekĂȘ min biryar da ku hemĂź daxwazan bi rĂȘbazĂȘ bikim
construct_queryli ku derĂȘ bangek http heye.Ez hemĂ» zeviyan bĂźnim ber
snake_caseji bo rihetiyĂȘ.WelĂȘ, xemla logger.catch ji bo derketina Ćopandina xweĆik Ă» agahdar.
PS Ji bĂźr nekin ku nĂźĆana alphavantage-ya herĂȘmĂź li config.yml zĂȘde bikin, an guhĂȘrbara jĂźngehĂȘ derxĂźnin HORTON_SERVICE_APIKEY. Em nĂźĆanek distĂźnin .
çßna CRUD
Em ĂȘ berhevokek ewlehiyĂȘ hebe ku agahdariya meta di derheqĂȘ ewlehiyĂȘ de hilĂźne.
Bi dĂźtina min, ne hewce ye ku li vir tiĆtek were ravekirin, Ă» çßna bingehĂźn bixwe pir hĂȘsan e.
get_app ()
Werin em fonksiyonek ji bo afirandina objeyek serĂźlĂȘdanĂȘ lĂȘ zĂȘde bikin
Spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Heya nuha em ĂȘ xwedan çĂȘkirina serĂźlĂȘdanĂȘ ya herĂź hĂȘsan bin, piçek paĆĂȘ em ĂȘ wĂȘ berfireh bikin, lĂȘbelĂȘ, ji bo ku hĂ»n li bendĂȘ nemĂźnin, li vir ji bo App-class. Di heman demĂȘ de ez ji we re ĆĂźret dikim ku hĂ»n li çßna mĂźhengan binihĂȘrin, ji ber ku ew ji piraniya mĂźhengan berpirsiyar e.
BeĆek sereke
Agent ji bo berhevkirin Ă» domandina navnĂźĆek ewlehiyĂȘ
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passJi ber vĂȘ yekĂȘ, pĂȘĆĂź em armanca serĂźlĂȘdana faust digirin - ew pir hĂȘsan e. DĂ»v re, em bi eĆkere mijarek ji bo nĂ»nerĂȘ xwe radigihĂźnin... Li vir hĂȘjayĂź gotinĂȘ ye ku ew çi ye, pĂźvana hundurĂźn çi ye Ă» çawa ev dikare bi rengek cĂ»da were saz kirin.
MijarĂȘn di kafkayĂȘ de, ger em bixwazin pĂȘnaseya rast bizanibin, çĂȘtir e ku em bixwĂźnin , an hĂ»n dikarin bixwĂźnin li ser HabrĂ© bi rĂ»sĂź, ku her tiĆt jĂź bi rengek rast xuya dike :)
, ku di belgeya faustĂȘ de pir xweĆ tĂȘ diyar kirin, dihĂȘle ku em mijarĂȘ rasterast di kodĂȘ de mĂźheng bikin, bĂȘ guman, ev tĂȘ vĂȘ wateyĂȘ ku pĂźvanĂȘn ku ji hĂȘla pĂȘĆdebirĂȘn faust ve tĂȘne peyda kirin, mĂźnakĂź: ragirtin, polĂźtĂźkaya ragirtinĂȘ (bi xwerĂ» jĂȘbirin, lĂȘ hĂ»n dikarin saz bikin ), hejmara beĆan li ser her mijarĂȘ (ji bo nimĂ»ne, kĂȘmtir ji sepanĂȘn faust).
Bi gelemperĂź, ajan dikare mijarek birĂȘvebirĂź bi nirxĂȘn gerdĂ»nĂź biafirĂźne, lĂȘbelĂȘ, ez dixwazim her tiĆtĂź bi eĆkere eĆkere bikim. WekĂź din, hin pĂźvan (mĂźnak, hejmara dabeĆan an polĂźtĂźkaya ragirtinĂȘ) ya mijarĂȘ di reklama ajansĂȘ de nayĂȘ mĂźheng kirin.
Li vir ev e ku ew bĂȘyĂź destnĂźĆankirina mijarĂȘ bi destan xuya dike:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passBelĂȘ, naha em diyar bikin ka dĂȘ nĂ»nerĂȘ me çi bike :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Ji ber vĂȘ yekĂȘ, di destpĂȘka nĂ»nerĂȘ de, em ji bo daxwaznameyĂȘn bi navgĂźniya muwekĂźlĂȘ xwe ve rĂ»niĆtinek aiohttp vedikin. Bi vĂź rengĂź, dema ku xebatkarek dest pĂȘ dike, dema ku nĂ»nerĂȘ me dest pĂȘ dike, dĂȘ tavilĂȘ daniĆĂźnek were vekirin - yek, ji bo tevahiya dema ku xebatkar dimeĆĂźne (an çend, heke hĂ»n pĂźvanĂȘ biguherĂźnin ji karmendek bi yekĂźneyek xwerĂ»).
DĂ»v re, em tĂźrĂȘjĂȘ diĆopĂźnin (em peyamĂȘ tĂȘ de cih digirin _, ji ber ku em, di vĂȘ ajansĂȘ de, bala xwe nadin naverokĂȘ) peyamĂȘn ji mijara xwe, heke ew di dema niha de hebin, wekĂź din çerxa me dĂȘ li benda hatina wan bimĂźne. WelĂȘ, di hundurĂȘ lĂ»leya xwe de, em wergirtina peyamĂȘ tĂȘketinĂȘ, navnĂźĆek ewlekariya çalak (get_securities tenĂȘ ji hĂȘla xwerĂ» ve vedigere, koda xerĂźdar bibĂźne) distĂźnin Ă» wĂȘ li databasĂȘ hilĂźnin, kontrol dikin ka ewlehĂźyek bi heman tĂźpĂȘ heye Ă» danĂ»stendina di databasĂȘ de, heke hebe, wĂȘ hingĂȘ ew (kaxiz) tenĂȘ dĂȘ were nĂ»ve kirin.
Werin em afirandina xwe bidin destpĂȘkirin!
> docker-compose up -d
... ĐапŃŃĐș ĐșĐŸĐœŃĐ”ĐčĐœĐ”ŃĐŸĐČ ...
> faust -A horton.agents worker --without-web -l infoTaybetmendiyĂȘn PS Ez ĂȘ di gotaran de faustĂȘ nehesibĂźnim, ji ber vĂȘ yekĂȘ em ala guncan destnĂźĆan dikin.
Di fermana destpĂȘkirina xwe de, me ji faustĂȘ re got ku li kuderĂȘ li hĂȘmana serĂźlĂȘdanĂȘ bigere Ă» bi wĂȘ re çi bike (karkerek bide destpĂȘkirin) bi asta derana tĂȘketina agahdariyĂȘ. Em encamek jĂȘrĂźn bistĂźnin:
Spoiler
âÆa”Sâ v1.10.4âŹââââââââââââââââââââââââââââââââââââââââââââââââââââ
â id â horton â
â transport â [URL('kafka://localhost:9092')] â
â store â memory: â
â log â -stderr- (info) â
â pid â 1271262 â
â hostname â host-name â
â platform â CPython 3.8.2 (Linux x86_64) â
â drivers â â
â transport â aiokafka=1.1.6 â
â web â aiohttp=3.6.2 â
â datadir â /path/to/project/horton-data â
â appdir â /path/to/project/horton-data/v1 â
âââââââââââââââŽââââââââââââââââââââââââââââââââââââââââââââââââââââ
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...
âTopic Partition SetââââââââââŹâââââââââââââ
â topic â partitions â
ââââââââââââââââââââââââââââââŒâââââââââââââ€
â collect_securities â {0-7} â
â horton-__assignor-__leader â {0} â
ââââââââââââââââââââââââââââââŽâââââââââââââ Ew zindĂź ye!!!
Ka em li set dabeĆkirinĂȘ binĂȘrin. WekĂź ku em dibĂźnin, mijarek bi navĂȘ ku me di kodĂȘ de destnĂźĆan kiriye, hejmara xwerĂ» ya dabeĆan (8, ji - Parametreya objeya serĂźlĂȘdanĂȘ), ji ber ku me ji bo mijara xwe nirxek kesane diyar nekir (bi riya dabeĆan). Di xebatkarĂȘ de nĂ»nerĂȘ dest pĂȘkirĂź ji her 8 dabeĆan re tĂȘne destnĂźĆan kirin, ji ber ku ew yekane ye, lĂȘ ev ĂȘ di beĆa di derbarĂȘ kombĂ»nĂȘ de bi hĂ»rgulĂź were nĂźqaĆ kirin.
WelĂȘ, naha em dikarin biçin pencereyek termĂźnalek din Ă» peyamek vala ji mijara xwe re biĆĂźnin:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}PS bikar tĂźne @ em nĂźĆan didin ku em ji mijarek bi navĂȘ "collect_securities" re peyamek diĆĂźnin.
Di vĂȘ rewĆĂȘ de, peyam çû dabeĆkirina 6 - hĂ»n dikarin vĂȘ yekĂȘ bi çûna kafdrop-ĂȘ kontrol bikin localhost:9000
Bi xebatkarĂȘ xwe re diçin pencereya termĂźnalĂȘ, em ĂȘ peyamek kĂȘfxweĆ bibĂźnin ku bi karanĂźna loguru hatĂź Ćandin:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securitiesEm dikarin li mongo jĂź binihĂȘrin (bi Robo3T an Studio3T bikar tĂźnin) Ă» bibĂźnin ku ewlekarĂź di databasĂȘ de ne:
Ez ne mĂźlyarder im, Ă» ji ber vĂȘ yekĂȘ em bi vebijarka dĂźtina yekem razĂź ne.
KĂȘfxweĆĂź Ă» ĆahĂź - nĂ»nerĂȘ yekem amade ye :)
Agent amade ye, bijĂź ajanĂȘ nĂ»!
ErĂȘ, birĂȘzan, me tenĂȘ 1/3 riya ku ji hĂȘla vĂȘ gotarĂȘ ve hatĂź amadekirin vegirtiye, lĂȘ dilteng nebin, ji ber ku niha ew ĂȘ hĂȘsantir be.
Ji ber vĂȘ yekĂȘ naha ji me re karmendek pĂȘdivĂź ye ku agahdariya meta berhev dike Ă» wĂȘ dixe nav belgeyek berhevokĂȘ:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...Ji ber ku ev ajan dĂȘ di derheqĂȘ ewlehĂźyek taybetĂź de agahdariya pĂȘvajoyĂȘ bike, pĂȘdivĂź ye ku em di peyamĂȘ de nĂźĆana (sembola) vĂȘ ewlehiyĂȘ destnĂźĆan bikin. Ji bo vĂȘ armancĂȘ di faustĂȘ de hene - dersĂȘn ku di mijara agentĂȘ de nexĆeya peyamĂȘ radigihĂźnin.
Di vĂȘ rewĆĂȘ de, em biçin Ă» diyar bike ka peyama vĂȘ mijarĂȘ divĂȘ çawa xuya bike:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
WekĂź ku we texmĂźn kir, faust annotation tĂźpa python bikar tĂźne da ku Ćema peyamĂȘ rave bike, ji ber vĂȘ yekĂȘ guhertoya herĂź kĂȘm a ku ji hĂȘla pirtĂ»kxaneyĂȘ ve hatĂź piĆtgirĂź kirin e. .
Ka em vegerin ser ajanĂȘ, celeb saz bikin Ă» lĂȘ zĂȘde bikin:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
WekĂź ku hĂ»n dibĂźnin, em bi pileyek pĂźvanek nĂ» ji rĂȘbaza destpĂȘkirina mijarĂȘ re derbas dikin - value_type. Digel vĂȘ yekĂȘ, her tiĆt heman pilanĂȘ diĆopĂźne, ji ber vĂȘ yekĂȘ ez ti xalĂȘ nabĂźnim ku ez li ser tiĆtek din bisekinim.
WelĂȘ, pĂȘwendiya paĆĂźn ev e ku meriv bangek li nĂ»nerĂȘ berhevkirina agahdariya meta zĂȘde bike da ku collect_securites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....Em ji bo peyamĂȘ nexĆeya ku berĂȘ hatĂź ragihandin bikar tĂźnin. Di vĂȘ rewĆĂȘ de, min rĂȘbaza .cast bikar anĂź ji ber ku em ne hewce ne ku li benda encamĂȘ ji nĂ»nerĂȘ bisekinin, lĂȘ hĂȘjayĂź gotinĂȘ ye ku ji mijarĂȘ re peyamek biĆĂźnin:
avĂȘtin - asteng nake ji ber ku ew li hĂȘviya encamek ne. HĂ»n nikarin encamĂȘ wekĂź peyamek ji mijarek din re biĆĂźnin.
Ćandin - asteng nake ji ber ku encamek hĂȘvĂź nake. HĂ»n dikarin di mijara ku encam jĂȘ re diçe de karmendek diyar bikin.
bipirse - li benda encamĂȘ ye. HĂ»n dikarin di mijara ku encam jĂȘ re diçe de karmendek diyar bikin.
Ji ber vĂȘ yekĂȘ, ew hemĂź bi ajanĂȘn Ăźro re ye!
TĂźma xewnĂȘ
TiĆta dawĂź ku min soz da ku di vĂȘ beĆĂȘ de binivĂźsim ferman e. WekĂź ku berĂȘ hate behs kirin, fermanĂȘn di faustĂȘ de li dora klĂźk pĂȘça ne. Bi rastĂź, faust dema ku mifteya -A destnĂźĆan dike, tenĂȘ fermana meya xwerĂ» bi navbeyna xwe ve girĂȘdide
PiĆtĂź ku ajanĂȘn hatin ragihandin ketin fonksiyonek bi dekoratorĂȘ zĂȘde bike app.commandgazĂźkirina rĂȘbazĂȘ avdan Ń collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Bi vĂź rengĂź, heke em navnĂźĆa fermanan bang bikin, emrĂȘ meya nĂ» dĂȘ tĂȘ de be:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Em dikarin wĂȘ mĂźna her kesĂȘ bikar bĂźnin, ji ber vĂȘ yekĂȘ em xebatkarĂȘ faust ji nĂ» ve bidin destpĂȘkirin Ă» berhevokek bĂȘkĂȘmasĂź ya ewlehiyĂȘ dest pĂȘ bikin:
> faust -A horton.agents start-collect-securitiesDĂȘ paĆĂȘ çi bibe?
Di beĆa paĆĂźn de, wekĂź mĂźnakek ajanĂȘn mayĂź bikar bĂźnin, em ĂȘ mekanĂźzmaya sink ji bo lĂȘgerĂźna tundĂ»tĂ»jiyĂȘ di bihayĂȘn girtina bazirganiya salĂȘ de Ă» destpĂȘkirina krona ajanan de binirxĂźnin.
Ji bo Ăźro her tiĆt e! Spas ji bo xwendinĂȘ :)
PS Di beĆa dawĂźn de ji min hat pirsĂźn kafka faust Ă» tevlihev (). Wusa dixuye ku confluent bi gelek awayan fonksiyoneltir e, lĂȘ rastiyek ev e ku faust ji bo konfluentĂȘ xwedan piĆtgiriyek tam a xerĂźdar nĂźne - ev ji.
Source: www.habr.com
