Uhlu lokuqukethwe
-
Ingxenye II: Ama-ejenti namaQembu
Senzani lapha?
Ngakho, ngakho, ingxenye yesibili. Njengoba kulotshiwe ngaphambili, kuyo sizokwenza lokhu okulandelayo:
-
Masibhale iklayenti elincane le-alphavantage ku-aiohttp ngezicelo zamaphoyinti esiwadingayo.
-
Masidale umenzeli ozoqoqa idatha yezibambiso nolwazi lwe-meta olukuzo.
Kodwa, yilokho esizokwenzela iphrojekthi ngokwayo, futhi mayelana nocwaningo olusheshayo, sizofunda ukuthi singabhala kanjani ama-agent acubungula imicimbi yokusakaza kusuka ku-kafka, kanye nendlela yokubhala imiyalo (chofoza i-wrapper), kithi - ngemiyalezo ephushwayo eyenziwa ngesandla esihlokweni esiqashwe yi-ejenti.
Training
AlphaVantage Client
Okokuqala, masibhale iklayenti elincane le-aiohttp ukuze uthole izicelo zokwenziwa kwe-alphavantage.
spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Eqinisweni, konke kucacile kuyo:
-
I-AlphaVantage API ilula futhi iklanywe kahle, ngakho nginqume ukwenza zonke izicelo ngale ndlela
construct_query
lapho kukhona khona ucingo lwe-http. -
Ngiletha zonke izinkundla
snake_case
ukuze kube lula. -
Hhayi-ke, umhlobiso we-logger.catch ukuze uthole umphumela omuhle futhi ofundisayo we-traceback.
PS Ungakhohlwa ukwengeza ithokheni ye-alphavantage endaweni ku-config.yml, noma ukuthekelisa okuguquguqukayo kwemvelo HORTON_SERVICE_APIKEY
. Sithola uphawu
Isigaba se-CRUD
Sizoba neqoqo lezibambiso ukuze sigcine ulwazi lwe-meta mayelana nezibambiso.
Ngokubona kwami, asikho isidingo sokuchaza noma yini lapha, futhi isigaba sesisekelo ngokwaso silula kakhulu.
thola_uhlelo lokusebenza()
Ake sengeze umsebenzi wokudala into yohlelo lokusebenza kuyo
spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Okwamanje sizoba nendalo elula yohlelo lokusebenza, ngokuhamba kwesikhathi sizoyandisa, nokho, ukuze ungakugcini ulindile, lapha
Umzimba oyinhloko
I-ejenti yokuqoqa nokugcina uhlu lwezibambiso
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Ngakho-ke, okokuqala sithola into esebenza kakhulu - ilula kakhulu. Okulandelayo, simemezela ngokucacile isihloko se-ejenti yethu... Lapha kufanelekile ukusho ukuthi iyini, ukuthi iyini ipharamitha yangaphakathi nokuthi lokhu kungahlelwa kanjani ngokuhlukile.
-
Izihloko ku-kafka, uma sifuna ukwazi incazelo eqondile, kungcono ukufunda
icishiwe. idokhumenti , noma ungafundaiqoqo ku-Habré ngesiRashiya, lapho konke kuboniswa khona ngokunembile :) -
Ipharamitha yangaphakathi , echazwe kahle ku-faust doc, isivumela ukuthi silungiselele isihloko ngokuqondile kukhodi, yebo, lokhu kusho imingcele enikezwe abathuthukisi abasheshayo, isibonelo: inqubomgomo yokugcina, yokugcina (ngokuzenzakalelayo susa, kodwa ungasethaicwecwe ), inani lokuhlukaniswa ngesihloko ngasinye (izikolo ukwenza, isibonelo, ngaphansiukubaluleka komhlaba wonke izicelo fast). -
Ngokuvamile, umenzeli angadala isihloko esiphethwe ngamavelu omhlaba, nokho, ngithanda ukumemezela yonke into ngokusobala. Ukwengeza, amanye amapharamitha (isibonelo, inombolo yezingxenye noma inqubomgomo yokugcinwa) yesihloko esikhangisweni somenzeli awakwazi ukumiswa.
Nakhu ukuthi kungase kubukeke kanjani ngaphandle kokuchaza isihloko mathupha:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Hhayi-ke, manje ake sichaze ukuthi i-ejenti yethu izokwenzani :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Ngakho-ke, ekuqaleni kwe-ejenti, sivula iseshini ye-aiohttp ngezicelo ngeklayenti lethu. Ngakho-ke, lapho uqala isisebenzi, lapho i-ejenti yethu yethulwa, iseshini izovulwa ngokushesha - eyodwa, ngaso sonke isikhathi isisebenzi sisebenza (noma eziningana, uma ushintsha ipharamitha.
Okulandelayo, silandela umfudlana (sibeka umlayezo kuwo _
, njengoba thina, kulo menzeli, asinandaba nokuqukethwe) kwemilayezo evela esihlokweni sethu, uma ikhona ku-offset yamanje, ngaphandle kwalokho umjikelezo wethu uzolinda ukufika kwayo. Yebo, ngaphakathi kwe-loop yethu, sifaka irisidi yomlayezo, sithole uhlu lwezinto ezisebenzayo (get_securities returns ezisebenza kuphela ngokuzenzakalelayo, bona ikhodi yeklayenti) bese uyigcina ku-database, sibheka ukuthi kukhona yini ukuphepha okunethikha efanayo futhi. exchange ku-database , uma kukhona, khona-ke (iphepha) lizobuyekezwa kalula.
Ake sethule indalo yethu!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Izici ze-PS
Emyalweni wethu wokwethula, sitshele u-faust ukuthi uyibheke kuphi into yesicelo nokuthi wenzeni ngayo (qala isisebenzi) ngezinga lokuphuma kwelogi yolwazi. Sithola okukhiphayo okulandelayo:
spoiler
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
Iyaphila!!!
Ake sibheke isethi yokuhlukanisa. Njengoba sibona, isihloko sadalwa ngegama esiliqoke kukhodi, inombolo ezenzakalelayo yezingxenye (8, ezithathwe
Hhayi-ke, manje sesingaya kwelinye iwindi letheminali futhi sithumele umlayezo ongenalutho esihlokweni sethu:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS usebenzisa @
sibonisa ukuthi sithumela umlayezo esihlokweni esibizwa ngokuthi "collect_securities".
Kulokhu, umlayezo uye ku-partition 6 - ungahlola lokhu ngokuya ku-kafdrop on localhost:9000
Ukuya ewindini letheminali nesisebenzi sethu, sizobona umlayezo ojabulisayo othunyelwe kusetshenziswa i-loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Futhi singabheka ku-mongo (sisebenzisa i-Robo3T noma i-Studio3T) futhi sibone ukuthi izibambiso zikusizindalwazi:
Angiyena usozigidi, ngakho-ke sanelisekile ngenketho yokuqala yokubuka.
Injabulo nenjabulo - i-ejenti yokuqala isilungile :)
Umenzeli ulungile, phila isikhathi eside umenzeli omusha!
Yebo, madoda, sihlanganise kuphela i-1/3 yendlela elungiselelwe yilesi sihloko, kodwa ungadangali, ngoba manje kuzoba lula.
Ngakho-ke manje sidinga i-ejenti eqoqa imininingwane ye-meta futhi ilufake kudokhumenti yeqoqo:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Njengoba lo menzeli ezocubungula ulwazi mayelana nokuvikeleka okuthile, sidinga ukukhombisa ithikha (uphawu) lwalesi sivikelo kumlayezo. Ngale njongo ku-faust kukhona
Kulokhu, ake siye ku
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Njengoba ungase uqagele, i-faust isebenzisa isichasiselo sohlobo lwe-python ukuchaza i-schema yomlayezo, yingakho inguqulo encane esekelwa umtapo wezincwadi
Ake sibuyele kumenzeli, setha izinhlobo bese sengeza:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Njengoba ubona, sidlulisela ipharamitha entsha ngohlelo endleleni yokuqalisa isihloko - value_type. Ngaphezu kwalokho, yonke into ilandela uhlelo olufanayo, ngakho-ke angiboni iphuzu lokuhlala kunoma yini enye.
Nokho, ukuthinta kokugcina ukwengeza ucingo kumenzeli wokuqoqa imininingwane ye-meta ukuqoqa_securitites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Sisebenzisa uhlelo olwamenyezelwa ngaphambilini lomlayezo. Kulokhu, ngisebenzise indlela ethi .cast njengoba kungadingeki silinde umphumela ovela kumenzeli, kodwa kufanele sikusho lokho.
-
i-cast - ayivimbeli ngoba ayilindele umphumela. Awukwazi ukuthumela umphumela kwesinye isihloko njengomlayezo.
-
thumela - akuvimbi ngoba ayilindele umphumela. Ungacacisa i-ejenti esihlokweni lapho umphumela uzoya khona.
-
buza - ulinde umphumela. Ungacacisa i-ejenti esihlokweni lapho umphumela uzoya khona.
Ngakho-ke, yilokho kuphela ngama-ejenti anamuhla!
Ithimba lamaphupho
Into yokugcina engathembisa ukuyibhala kule ngxenye imiyalo. Njengoba kushiwo ngaphambili, imiyalo ku-faust iwukugoqa ukuchofoza. Eqinisweni, i-faust imane inamathisele umyalo wethu wangokwezifiso kusixhumi esibonakalayo lapho icacisa ukhiye -A
Ngemuva kokuthi ama-ejenti amenyezelwe
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Ngakho-ke, uma sibiza uhlu lwemiyalo, umyalo wethu omusha uzoba kuwo:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Singayisebenzisa njenganoma ubani omunye, ngakho-ke ake siqale kabusha isisebenzi esingasebenzi kahle futhi siqale iqoqo eligcwele lezibambiso:
> faust -A horton.agents start-collect-securities
Kuzokwenzekani ngokulandelayo?
Engxenyeni elandelayo, sisebenzisa ama-ejenti asele njengesibonelo, sizobheka indlela kasinki yokufuna ukweqisa emananini okuvala okuhweba onyaka kanye nokwethulwa kwe-cron kwama-ejenti.
Yilokho kuphela okwanamuhla! Siyabonga ngokufunda :)
PS Ngaphansi kwengxenye yokugcina ngabuzwa mayelana ne-faust ne-confluent kafka (
Source: www.habr.com