Tafura yezvinyorwa
-
Chikamu II: Agents uye Zvikwata
Tiri kuitei pano?
Saka, saka, chikamu chechipiri. Sezvazvakanyorwa kare, mairi tichaita zvinotevera:
-
Ngatinyorei mutengi mudiki kune alphavantage paaiohttp nezvikumbiro zvemagumo atinoda.
-
Ngatigadzirei mumiririri anozounganidza data pamusoro pezvivimbiso uye meta ruzivo pazviri.
Asi, izvi ndizvo zvatichaitira purojekiti yacho pachayo, uye maererano nekutsvagisa kwakanyanya, isu tichadzidza kunyora maajenti anogadzira zviitiko zvekuyerera kubva kafka, pamwe nekunyora mirairo (tinya wrapper), mune yedu - yemanyorero ekusundidzira mameseji kune iyo nyaya iyo mumiririri ari kutarisa.
Kugadzirira kwe
AlphaVantage Client
Kutanga, ngatinyore diki aiohttp mutengi kune zvikumbiro kune alphavantage.
pomuparadzi
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Muchokwadi, zvese zviri pachena kubva pazviri:
-
Iyo AlphaVantage API iri nyore uye yakanyatsogadzirwa, saka ndakafunga kuita zvikumbiro zvese kuburikidza nenzira
construct_query
uko zvakare kune http kufona. -
Ndinounza minda yese kwairi
snake_case
kuitira nyore. -
Zvakanaka, iyo logger.catch kushongedzwa kune yakanaka uye inodzidzisa traceback kubuda.
PS Usakanganwa kuwedzera iyo alphavantage token munharaunda yako ku config.yml, kana kutumira kunze kwenyika kushanduka HORTON_SERVICE_APIKEY
. Tinogamuchira chiratidzo
CRUD kirasi
Tichava nekuunganidza kwekuchengetedza kuchengetedza meta ruzivo nezve kuchengetedzwa.
Mukuona kwangu, hapana chikonzero chekutsanangura chero chinhu pano, uye kirasi yega yega iri nyore.
get_app()
Ngatiwedzerei basa rekugadzira chinhu chekushandisa mukati
pomuparadzi
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Parizvino tichava neyakareruka yekugadzira application, gare gare tichaiwedzera, zvisinei, kuti usarambe wakamirira, pano.
Chikamu chikuru
Mumiririri wekuunganidza nekuchengetedza runyoro rwezvivimbiso
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Saka, chekutanga tinowana iyo yakanyanyisa application chinhu - zviri nyore. Zvadaro, tinozivisa zvakajeka musoro wemumiririri wedu ... Pano zvakakodzera kutaura kuti chii, chii chinonzi parameter yemukati uye kuti izvi zvinogona kurongwa sei zvakasiyana.
-
Misoro mu kafka, kana tichida kuziva tsanangudzo chaiyo, zviri nani kuverenga
off. document , kana kuti unogona kuverengacompendium paHabré muchiRussia, uko zvese zvinoratidzwawo nemazvo :) -
Parameter mukati , inotsanangurwa zvakanyatsonaka mune faust doc, inotibvumira kuti tigadzirise musoro wacho zvakananga mukodhi, hongu, izvi zvinoreva maparameter anopiwa nevanogadzirisa faust, semuenzaniso: kuchengetedza, kuchengetedza mutemo (nekuda kudzima, asi unogona kuseta.tsindirana ), nhamba yezvikamu pamusoro wenyaya (chikamu kuita, semuenzaniso, zvishoma panekukosha kwepasi rose zvikumbiro zvinokurumidza). -
Kazhinji, mumiririri anogona kugadzira chinyorwa chinogadziriswa chine tsika dzepasirese, zvisinei, ini ndinoda kuzivisa zvese zvakajeka. Uye zvakare, mamwe ma paramita (semuenzaniso, nhamba yezvikamu kana chengetedzo mutemo) yemusoro mushambadziro yemumiririri haigone kugadzirwa.
Hezvino zvazvingaite pasina kutsanangura nemaoko musoro wenyaya:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Zvakanaka, ikozvino ngatitsanangure zvichaitwa nemumiririri wedu :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Saka, pakutanga kwemumiririri, tinovhura aiohttp chikamu chezvikumbiro kuburikidza nemutengi wedu. Saka, kana uchitanga mushandi, kana mumiririri wedu atanga, chikamu chichavhurwa pakarepo - imwe, kwenguva yose iyo mushandi ari kushanda (kana akati wandei, kana ukachinja parameter.
Tevere, tinotevera rukova (tinoisa meseji mukati _
, sezvo isu, mumiririri uyu, tisina hanya nezvezvirimo) zvemashoko kubva mumusoro wedu, kana aripo pane yazvino offset, zvikasadaro kutenderera kwedu kunomirira kusvika kwavo. Zvakanaka, mukati mechiuno chedu, tinonyora risiti yemeseji, tora runyoro rwekushanda (get_securities inodzoka chete inoshanda nekusarudzika, ona kodhi yemutengi) zvibatiso uye chengetedza kune dhatabhesi, uchitarisa kana paine chengetedzo ine imwechete ticker uye. kuchinjanisa mu database , kana iripo, ipapo iyo (bepa) inongogadziriswa.
Ngatitangei kusikwa kwedu!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Features
Mukuraira kwedu kwekutanga, takaudza faust kwekutsvaga chinhu chekushandisa uye zvekuita nacho (kutanga mushandi) neiyo info log yekubuda level. Isu tinowana zvinotevera zvinobuda:
pomuparadzi
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
Hupenyu!!!
Ngatitarisei pane partition set. Sezvatinoona, musoro wakagadzirwa une zita ratakasarudza mukodhi, nhamba yekusarudzika yezvikamu (8, zvakatorwa kubva
Zvakanaka, ikozvino isu tinogona kuenda kune imwe terminal hwindo uye kutumira meseji isina chinhu kune yedu musoro wenyaya:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS kushandisa @
isu tinoratidza kuti tiri kutumira meseji kune musoro unonzi "collect_securities".
Muchiitiko ichi, meseji yakaenda kuchikamu chechitanhatu - unogona kutarisa izvi nekuenda ku kafdrop on localhost:9000
Kuenda kuhwindo rekupedzisira nemushandi wedu, tichaona meseji inofadza inotumirwa tichishandisa loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Isu tinogona zvakare kutarisa mumongo (tichishandisa Robo3T kana Studio3T) uye toona kuti masecurity ari mudhatabhesi:
Ini handisi bhirioni, uye saka isu tinogutsikana nesarudzo yekutanga yekuona.
Mufaro uye mufaro - mumiriri wekutanga akagadzirira :)
Mumiririri akagadzirira, rarama kwenguva refu mumiriri mutsva!
Hongu, varume, isu takangovhara 1/3 yegwara rakagadzirirwa nechinyorwa ichi, asi musaore moyo, nekuti zvino zvichave nyore.
Saka ikozvino tinoda mumiririri anounganidza meta ruzivo uye anoisa mugwaro rekuunganidza:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Sezvo mumiririri uyu achizogadzirisa ruzivo nezve imwe chengetedzo, isu tinofanirwa kuratidza iyo ticker (chiratidzo) chekuchengetedza uku mumeseji. Nechinangwa ichi mune faust pane
Muchiitiko ichi, ngatiendei
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Sezvaungave wakafungidzira, faust anoshandisa iyo python mhando rondedzero kutsanangura meseji schema, ndosaka iyo shoma shanduro inotsigirwa neraibhurari iri.
Ngatidzokere kumumiririri, toisa mhando uye tiwedzere:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Sezvauri kuona, isu tinopfuudza paramende nyowani ine chirongwa kune iyo nyaya yekutanga nzira - value_type. Kupfuurirazve, zvese zvinotevera chirongwa chimwe chete, saka ini handioni chero chikonzero chekugara pane chimwe chinhu.
Zvakanaka, kubata kwekupedzisira kuwedzera runhare kune meta ruzivo rwekuunganidza mumiriri kuunganidza_securitites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Isu tinoshandisa iyo yakamboziviswa chirongwa chemeseji. Muchiitiko ichi, ndakashandisa .cast nzira sezvo isu hatidi kumirira mhedzisiro kubva kumumiririri, asi zvakakodzera kuti titaure izvozvo.
-
cast - haivharidzi nekuti haitarisiri mhedzisiro. Iwe haugone kutumira mhinduro kune imwe musoro semeseji.
-
tumira - haivharidzi nekuti haitarisiri mhedzisiro. Iwe unogona kutsanangura mumiriri mune iyo mhedzisiro ichaenda.
-
kubvunza - kumirira mhedzisiro. Iwe unogona kutsanangura mumiriri mune iyo mhedzisiro ichaenda.
Saka, ndizvo zvese nemaajenti anhasi!
The dream Team
Chinhu chekupedzisira chandakavimbisa kunyora muchikamu chino mirairo. Sezvambotaurwa, mirairo mu faust ndeye kuputira kwakatenderedza kudzvanya. Muchokwadi, faust inongobatanidza tsika yedu yekuraira kune yayo interface kana ichitsanangura iyo -A kiyi
Mushure mokunge vamiririri vakaziviswa vapinda
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Saka, kana tikadaidza rondedzero yemirairo, murairo wedu mutsva uchave mauri:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Isu tinogona kuishandisa semumwe munhu wese, saka ngatitangeizve mushandi akapusa uye titange yakazara-yakazara muunganidzwa wezvivimbiso:
> faust -A horton.agents start-collect-securities
Chii chichatevera kuitika?
Muchikamu chinotevera, tichishandisa vamiririri vakasara semuenzaniso, isu tichafunga nzira yekunyura yekutsvaga zvakanyanyisa mumitengo yekuvhara yekutengesa kwegore uye cron kutanga kwevamiririri.
Ndizvo zvanhasi! Thanks nekuverenga :)
PS Pasi pechikamu chekupedzisira ndakabvunzwa nezve faust uye confluent kafka (
Source: www.habr.com