Lohahevitra misy
Fizarana II: Mpikambana sy Ekipa
Inona no ataontsika eto?
Noho izany, ny ampahany faharoa. Araka ny efa voasoratra teo aloha dia hanao izao manaraka izao izahay:
Andao hanoratra mpanjifa kely ho an'ny alphavantage amin'ny aiohttp miaraka amin'ny fangatahana ireo teboka farany ilaintsika.
Andao hamorona mpandraharaha iray izay hanangona angona momba ny fiarovana sy ny fampahalalana meta momba azy ireo.
Saingy, izao no hataontsika ho an'ny tetikasa mihitsy, ary amin'ny resaka fikarohana haingana, dia hianatra ny fomba hanoratana mpiasa izay mandrindra ny hetsika mivantana avy amin'ny kafka, ary koa ny fomba fanoratana baiko (click wrapper), amin'ity tranga ity - ho an'ny hafatra fanosehana amin'ny tanana amin'ny lohahevitra izay arahin'ny masoivoho.
Fiomanana
AlphaVantage Client
Voalohany, andao hanoratra mpanjifa aiohttp kely ho an'ny fangatahana alphavantage.
mpandringana
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Raha ny marina, mazava ny zava-drehetra avy amin'izany:
Ny AlphaVantage API dia natao tsotra sy tsara tarehy, noho izany dia nanapa-kevitra ny hanao ny fangatahana rehetra amin'ny alΓ lan'ny fomba aho
construct_queryaiza indray no misy antso http.Ento ny saha rehetra ho
snake_caseho an'ny fanamorana.Eny, ny haingo logger.catch ho an'ny vokatra traceback tsara tarehy sy ahalalana.
PS Aza adino ny manampy ny marika alphavantage eo an-toerana amin'ny config.yml, na manondrana ny fari-piainan'ny tontolo iainana HORTON_SERVICE_APIKEY. Mahazo marika izahay .
kilasy CRUD
Hanana fanangonam-bola hitehirizana fampahalalana meta momba ny antoka isika.
Raha ny hevitro dia tsy ilaina ny manazava na inona na inona eto, ary ny kilasy fototra dia tena tsotra.
get_app()
Andeha isika hanampy asa hamoronana zavatra fampiharana ao
mpandringana
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Amin'izao fotoana izao dia hanana ny famoronana fampiharana tsotra indrindra isika, aoriana kely dia hanitatra izany izahay, na izany aza, mba tsy hiandry anao, eto mankany amin'ny App-class. Manoro hevitra ihany koa aho hijery ny kilasin'ny fandrindrana, satria izy no tompon'andraikitra amin'ny ankamaroan'ny toe-javatra.
Ny ampahany lehibe indrindra
Agent amin'ny fanangonana sy fikojakojana ny lisitry ny antoka
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passNoho izany, mahazo ny faust application object aloha isika - tena tsotra. Manaraka izany, manambara mazava lohahevitra iray ho an'ny masoivoho izahay... Eto dia ilaina ny milaza hoe inona izany, inona ny mari-pamantarana anatiny ary ahoana no ahafahana mandamina izany amin'ny fomba hafa.
Ny lohahevitra ao amin'ny kafka, raha tiantsika ny hahafantatra ny famaritana marina, dia tsara kokoa ny mamaky , na azonao vakiana ao amin'ny HabrΓ© amin'ny teny Rosiana, izay hita taratra tsara ihany koa ny zava-drehetra :)
, voafaritra tsara ao amin'ny faust doc, dia mamela antsika hanitsy ny lohahevitra mivantana ao amin'ny code, mazava ho azy, midika izany fa ny mari-pamantarana nomen'ny mpamorona faust, ohatra: fitazonana, politika fitazonana (famafana default, fa azonao atao ny mametraka ), isan'ny fisarahana isaky ny lohahevitra (atao, ohatra, latsaky ny applications faust).
Amin'ny ankapobeny, ny mpandraharaha dia afaka mamorona lohahevitra voatanisa miaraka amin'ny soatoavina manerantany, na izany aza, tiako ny manambara mazava ny zava-drehetra. Fanampin'izay, tsy azo amboarina ny masontsivana sasany (ohatra, ny isan'ny fizarazarana na ny politikan'ny fihazonana) amin'ny lohahevitra ao amin'ny dokam-barotra.
Toy izao ny mety ho endrik'ilay izy raha tsy mamaritra amin'ny tanana ny lohahevitra:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passEny ary, andeha hofaritanay izay hataon'ny agent-nay :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Noho izany, amin'ny fiandohan'ny mpandraharaha, dia manokatra fivoriana aiohttp izahay ho an'ny fangatahana amin'ny alΓ lan'ny mpanjifanay. Noho izany, rehefa manomboka mpiasa, rehefa atomboka ny mpiasanay, dia hisokatra avy hatrany ny fivoriana iray - iray, mandritra ny fotoana rehetra iasan'ny mpiasa (na maromaro, raha manova ny parameter ianao avy amin'ny masoivoho manana unit default).
Manaraka, manaraka ny stream izahay (apetrakay ny hafatra ao _, satria izahay, ato amin'ity mpandraharaha ity, dia tsy miraharaha ny votoatin'ny) hafatra avy amin'ny lohahevitray, raha misy izy ireo amin'izao fotoana izao, raha tsy izany dia hiandry ny fahatongavany ny tsingerinay. Eny ary, ao anatin'ny tadivavaranay, dia misoratra anarana ny fandraisana ny hafatra, mahazo lisitry ny mavitrika (get_securities dia miverina mavitrika amin'ny alΓ lan'ny default, jereo ny kaody mpanjifa) ary tehirizo ao amin'ny tahiry, manamarina raha misy fiarovana mitovy amin'ny ticker ary mifanakalo amin'ny angon-drakitra , raha misy, dia havaozina fotsiny ilay izy (ny taratasy).
Andao hanomboka ny famoronana!
> docker-compose up -d
... ΠΠ°ΠΏΡΡΠΊ ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅ΡΠΎΠ² ...
> faust -A horton.agents worker --without-web -l infoPS Features Tsy hodinihintsika ao amin'ny lahatsoratra ny faus, ka nametraka ny saina mety izahay.
Tao amin'ny baikon'ny fandefasanay dia nilaza tamin'i faust izahay hoe aiza no hitadiavana ilay zavatra fampiharana sy ny tokony hatao amin'izany (manomboha mpiasa) miaraka amin'ny haavon'ny famoahana info log. Mahazo ity vokatra manaraka ity izahay:
mpandringana
βΖaΒ΅Sβ v1.10.4β¬ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β id β horton β
β transport β [URL('kafka://localhost:9092')] β
β store β memory: β
β log β -stderr- (info) β
β pid β 1271262 β
β hostname β host-name β
β platform β CPython 3.8.2 (Linux x86_64) β
β drivers β β
β transport β aiokafka=1.1.6 β
β web β aiohttp=3.6.2 β
β datadir β /path/to/project/horton-data β
β appdir β /path/to/project/horton-data/v1 β
βββββββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββββββββββββ
... Π»ΠΎΠ³ΠΈ, Π»ΠΎΠ³ΠΈ, Π»ΠΎΠ³ΠΈ ...
βTopic Partition Setββββββββββ¬βββββββββββββ
β topic β partitions β
ββββββββββββββββββββββββββββββΌβββββββββββββ€
β collect_securities β {0-7} β
β horton-__assignor-__leader β {0} β
ββββββββββββββββββββββββββββββ΄βββββββββββββ Velona io!!!
Andeha hojerentsika ny set partition. Araka ny hitantsika, dia nisy lohahevitra noforonina miaraka amin'ny anarana izay notendrenay ao amin'ny kaody, ny isan'ny partitions default (8, nalaina tamin'ny - Parameter object application), satria tsy nanondro sanda tsirairay ho an'ny lohahevitray izahay (amin'ny alΓ lan'ny fizarazarana). Ny solontena navoaka ao amin'ny mpiasa dia nomena ny fizarana 8 rehetra, satria izy irery ihany, fa horesahina amin'ny antsipiriany bebe kokoa amin'ny ampahany momba ny clustering.
Eny ary, afaka mandeha any amin'ny varavarankely terminal hafa isika ary mandefa hafatra tsy misy dikany amin'ny lohahevitray:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}PS mampiasa @ asehontsika fa mandefa hafatra amin'ny lohahevitra iray antsoina hoe "collect_securities".
Amin'ity tranga ity, ny hafatra dia nandeha tamin'ny fisarahana 6 - azonao atao ny manamarina izany amin'ny alΓ lan'ny kafdrop on localhost:9000
Mandeha any amin'ny varavarankelin'ny terminal miaraka amin'ny mpiasantsika isika dia hahita hafatra mahafaly alefa amin'ny fampiasana loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securitiesAfaka mijery ny mongo ihany koa isika (mampiasa Robo3T na Studio3T) ary mahita fa ao anaty tahiry ny securities:
Tsy miliaridera aho, ary noho izany dia afa-po amin'ny safidy fijerena voalohany izahay.
Fahasambarana sy fifaliana - vonona ny mpandraharaha voalohany :)
Agent vonona, ho ela velona ny agent vaovao!
Eny tompoko, ny 1/3 aminβny lalana nomanina ato aminβity lahatsoratra ity ihany no vitantsika, fa aza kivy fa izao dia ho mora kokoa.
Koa ankehitriny dia mila mpandraharaha iray manangona fampahalalana meta isika ary mametraka izany ao anaty antontan-taratasy fanangonana:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...Koa satria ity mpandraharaha ity dia hikarakara fampahalalana momba ny fiarovana manokana, dia mila manondro ny mari-pamantarana (marika) amin'ity fiarovana ity amin'ny hafatra. Ho an'ity tanjona ity ao faus dia misy β kilasy izay manambara ny rafitra hafatra amin'ny lohahevitra mpandraharaha.
Amin'ity tranga ity, andao ho any ary lazao hoe inona no tokony ho endriky ny hafatra ho an'ity lohahevitra ity:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Araka ny efa noeritreretinao, faust dia mampiasa ny karazana python fanoritsoritana ny schema hafatra, ka izany no mahatonga ny kinova farany ambany tohanan'ny tranomboky. .
Andao hiverina any amin'ny mpandraharaha, apetraho ireo karazana ary ampio:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Araka ny hitanao dia mandefa paramètre vaovao misy lamina mankany amin'ny fomba fanombohana lohahevitra - value_type. Fanampin'izany, ny zava-drehetra dia manaraka ny tetika mitovy, ka tsy hitako izay tokony hieritreretana zavatra hafa.
Eny, ny fikitihana farany dia ny manampy antso amin'ny mpanangom-baovao meta mba collect_securitites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....Mampiasa ny tetika nambara teo aloha izahay ho an'ny hafatra. Amin'ity tranga ity dia nampiasa ny fomba .cast aho satria tsy mila miandry ny vokatra avy amin'ny mpandraharaha isika, saingy mendrika ny manamarika fa mandefasa hafatra amin'ny lohahevitra:
cast - tsy manakana satria tsy manantena vokatra. Tsy afaka mandefa ny valiny amin'ny lohahevitra hafa ho hafatra ianao.
mandefa - tsy manakana satria tsy manantena valiny. Azonao atao ny mamaritra mpandraharaha iray amin'ny lohahevitra izay halehan'ny valiny.
manontany - miandry valiny. Azonao atao ny mamaritra mpandraharaha iray amin'ny lohahevitra izay halehan'ny valiny.
Noho izany, izany rehetra izany miaraka amin'ny masoivoho anio!
Ny ekipa nofy
Ny zavatra farany nampanantenaiko hosoratako amin'ity ampahany ity dia baiko. Araka ny voalaza teo aloha, ny baiko ao amin'ny fau dia fonon-tsindry. Raha ny marina, i faust dia ampifandraisina fotsiny amin'ny baiko mahazatra antsika amin'ny seha-pifandraisana rehefa mamaritra ny lakile -A
Taorian'ny nanambaran'ireo masoivoho tao ampio asa miaraka amin'ny haingo app.commandmiantso ny fomba nandatsaka Ρ collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Noho izany, raha miantso ny lisitry ny baiko isika dia ho ao anatiny ny baiko vaovao:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Azontsika ampiasaina toy ny olon-drehetra izany, koa andao atomboka indray ny mpiasa faust ary hanomboka fanangonana tahiry feno:
> faust -A horton.agents start-collect-securitiesInona no hitranga manaraka?
Amin'ny ampahany manaraka, amin'ny fampiasana ireo mpiasa sisa tavela ho ohatra, dia hodinihintsika ny mekanika milentika amin'ny fikatsahana tafahoatra amin'ny vidin'ny fanakatonana ny varotra ho an'ny taona sy ny fandefasana cron ny mpiasa.
Izay ihany ny anio! Misaotra namaky :)
PS Eo ambanin'ny tapany farany dia nanontaniana momba ny faust sy confluent kafka aho (). Toa ny confluent dia miasa kokoa amin'ny lafiny maro, fa ny zava-misy dia ny faust dia tsy manana fanohanan'ny mpanjifa feno amin'ny confluent - izany dia avy amin'ny.
Source: www.habr.com
