Zamkatimu
-
Gawo II: Othandizira ndi Magulu
Titani pano?
Kotero, kotero, gawo lachiwiri. Monga tanenera kale, mmenemo tidzachita zotsatirazi:
-
Tiyeni tilembe makasitomala ang'onoang'ono a alphavantage pa aiohttp ndi zopempha zomaliza zomwe tikufuna.
-
Tiyeni tipange wothandizira yemwe angasonkhanitse zidziwitso zachitetezo ndi chidziwitso cha meta pa iwo.
Koma, izi ndi zomwe tidzachitire pulojekitiyo yokha, ndipo ponena za kafukufuku wofulumira, tiphunzira momwe tingalembere antchito omwe amayendetsa zochitika kuchokera ku kafka, komanso momwe angalembere malamulo (dinani wrapper), kwa ife - kwa mauthenga okankhira pamanja pamutu womwe wothandizira akuyang'anira.
Kukonzekera
AlphaVantage Client
Choyamba, tiyeni tilembe kakasitomala kakang'ono ka aiohttp pazofunsira ku alphavantage.
zoyambilira
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Kwenikweni, zonse zimamveka bwino:
-
AlphaVantage API ndi yosavuta komanso yokongola, kotero ndidaganiza zopempha zonse kudzera munjirayi
construct_query
pomwe palinso foni ya http. -
Ndimabweretsa minda yonse
snake_case
kuti zitheke. -
Chabwino, chokongoletsera cha logger.catch cha kutulutsa kokongola komanso kodziwitsa za traceback.
PS Musaiwale kuwonjezera chizindikiro cha alphavantage kwanuko ku config.yml, kapena kutumiza zosintha zachilengedwe. HORTON_SERVICE_APIKEY
. Ife timalandira chizindikiro
Gawo la CRUD
Tidzakhala ndi zosunga zotetezedwa kuti tisunge zambiri zachitetezo cha meta.
Malingaliro anga, palibe chifukwa chofotokozera chilichonse apa, ndipo kalasi yoyambira yokha ndiyosavuta.
get_app()
Tiyeni tiwonjezere ntchito yopangira chinthu chogwiritsira ntchito
zoyambilira
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Pakadali pano tikhala ndi zopanga zosavuta kwambiri, pakapita nthawi tidzakulitsa, komabe, kuti tisakudikireni, apa
Thupi lalikulu
Wothandizira kusonkhanitsa ndi kusunga mndandanda wa zotetezedwa
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Chifukwa chake, choyamba timapeza chinthu chosavuta kugwiritsa ntchito - ndichosavuta. Kenaka, timalengeza momveka bwino mutu wa wothandizira wathu ... Apa ndikuyenera kutchula zomwe ziri, zomwe zili mkati mwake ndi momwe izi zingakonzedwere mosiyana.
-
Mitu mu kafka, ngati tikufuna kudziwa tanthauzo lenileni, ndi bwino kuwerenga
kuzimitsa. chikalata , kapena mukhoza kuwerengaphatikiza pa Habré mu Chirasha, pomwe zonse zimawonekeranso molondola :) -
Parameter mkati , yofotokozedwa bwino mu faust doc, imatilola kuti tikonze mutuwo mwachindunji mu code, ndithudi, izi zikutanthauza magawo omwe amaperekedwa ndi omanga amphamvu, mwachitsanzo: kusunga, kusunga ndondomeko (mwachisawawa kuchotsa, koma mukhoza kukhazikitsayaying'ono ), chiwerengero cha magawo pa mutu uliwonse (zambiri kuchita, mwachitsanzo, zochepa kuposakufunikira kwapadziko lonse ntchito mwachangu). -
Kawirikawiri, wothandizira akhoza kupanga mutu woyendetsedwa ndi makhalidwe apadziko lonse, komabe, ndimakonda kulengeza zonse momveka bwino. Kuphatikiza apo, magawo ena (mwachitsanzo, kuchuluka kwa magawo kapena mfundo zosungira) za mutu womwe uli muzotsatsa za wothandizira sungathe kukhazikitsidwa.
Izi ndi momwe zingawonekere popanda kufotokozera mutuwo pamanja:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Chabwino, tsopano tiyeni tifotokoze zomwe wothandizira wathu adzachita :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Chifukwa chake, kumayambiriro kwa wothandizira, timatsegula gawo la aiohttp pazopempha kudzera mwa kasitomala wathu. Chifukwa chake, poyambitsa wogwira ntchito, wothandizila akayamba, gawo lidzatsegulidwa nthawi yomweyo - imodzi, nthawi yonse yomwe wogwira ntchitoyo akugwira ntchito (kapena angapo, ngati musintha magawo.
Kenako, timatsatira mtsinje (timayika uthengawo _
, popeza ife, mwa wothandizira uyu, sitisamala za zomwe zili) za mauthenga ochokera pamutu wathu, ngati zilipo pakalipano, mwinamwake kuzungulira kwathu kudzadikirira kufika kwawo. Chabwino, mkati mwa kuzungulira kwathu, timalemba chiphaso cha uthengawo, pezani mndandanda wazomwe zimagwira (get_securities returns only active by default, onani kasitomala code) zotetezedwa ndikuzisunga ku database, kuyang'ana ngati pali chitetezo chokhala ndi ticker yomweyo ndi kusinthana mu database , ngati ilipo, ndiye (pepala) lidzasinthidwa.
Tiyeni tiyambitse chilengedwe chathu!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Features
Mu lamulo lathu loyambitsa, tidauza faust komwe angayang'ane chinthucho ndi choti tichite nacho (kuyambitsa wogwira ntchito) ndi mulingo wotulutsa chidziwitso. Timapeza zotsatira zotsatirazi:
zoyambilira
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
Ndi moyo!!!
Tiyeni tiwone gawo la magawo. Monga tikuonera, mutu unapangidwa ndi dzina lomwe tidasankha mu code, chiwerengero chosasinthika cha magawo (8, otengedwa kuchokera
Chabwino, tsopano titha kupita pawindo lina lakutsogolo ndikutumiza uthenga wopanda kanthu pamutu wathu:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS kugwiritsa ntchito @
tikuwonetsa kuti tikutumiza uthenga kumutu wotchedwa "collect_securities".
Pankhaniyi, uthenga udapita ku gawo 6 - mutha kuyang'ana izi popita ku kafdrop on localhost:9000
Kupita pawindo la terminal ndi wogwira ntchito, tiwona uthenga wosangalatsa womwe watumizidwa pogwiritsa ntchito loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Titha kuyang'ananso mongo (pogwiritsa ntchito Robo3T kapena Studio3T) ndikuwona kuti zotetezedwa zili m'nkhokwe:
Sindine bilionea, chifukwa chake ndife okhutira ndi njira yoyamba yowonera.
Chimwemwe ndi chisangalalo - wothandizira woyamba ali wokonzeka :)
Wokonzeka, khalani ndi moyo wautali wothandizira watsopano!
Inde, abambo, tangophimba 1/3 ya njira yomwe yakonzedwa ndi nkhaniyi, koma musataye mtima, chifukwa tsopano zidzakhala zosavuta.
Chifukwa chake tsopano tikufuna wothandizira yemwe amasonkhanitsa zambiri za meta ndikuziyika muzolemba zosonkhanitsira:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Popeza wothandizirayo akonza zambiri zokhudza chitetezo china, tiyenera kusonyeza chizindikiro (chizindikiro) cha chitetezo ichi mu uthengawo. Kwa cholinga ichi mu faust alipo
Pankhaniyi, tiyeni tipite
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Monga momwe mungaganizire, faust amagwiritsa ntchito mawu amtundu wa python pofotokoza schema ya uthenga, ndichifukwa chake mtundu wochepera wothandizidwa ndi laibulale ndi.
Tiyeni tibwerere kwa wothandizira, ikani mitundu ndikuwonjezera:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Monga mukuwonera, timadutsa gawo latsopano ndi chiwembu kunjira yoyambira mutu - value_type. Kuphatikiza apo, zonse zimatsata dongosolo lomwelo, kotero sindikuwona kufunikira kokhala pa china chilichonse.
Chabwino, kukhudza komaliza ndikuwonjezera kuyimba kwa meta wosonkhanitsa zidziwitso kuti atole_securitites:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Timagwiritsa ntchito chiwembu chomwe chidalengezedwa kale ku uthengawo. Pachifukwa ichi, ndinagwiritsa ntchito njira ya .cast popeza sitiyenera kuyembekezera zotsatira kuchokera kwa wothandizira, koma ndi bwino kutchula izi.
-
kuponya - sikuletsa chifukwa sikuyembekezera zotsatira. Simungathe kutumiza zotsatira ku mutu wina ngati uthenga.
-
kutumiza - sikuletsa chifukwa sikuyembekezera zotsatira. Mutha kufotokozera wothandizira pamutu womwe zotsatira zake zidzapita.
-
funsani - akuyembekezera zotsatira. Mutha kufotokozera wothandizira pamutu womwe zotsatira zake zidzapita.
Kotero, ndizo zonse ndi othandizira lero!
The Dream Team
Chinthu chomaliza chimene ndinalonjeza kuti ndidzalemba m’gawoli ndi malamulo. Monga tanenera kale, malamulo mu faust ndi wrapper kuzungulira. M'malo mwake, faust amangophatikizira lamulo lathu pamawonekedwe ake pofotokoza -A kiyi
Pambuyo polengeza ma agents mu
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Chifukwa chake, ngati tiitana mndandanda wamalamulo, lamulo lathu latsopano likhala momwemo:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Titha kuzigwiritsa ntchito ngati wina aliyense, ndiye tiyeni tiyambitsenso wogwira ntchitoyo ndikuyamba kusonkhanitsa zotetezedwa:
> faust -A horton.agents start-collect-securities
Nanga n’ciani cidzacitika pambuyo pake?
Mu gawo lotsatira, pogwiritsa ntchito othandizira otsala monga mwachitsanzo, tiwona njira yozama yofufuzira monyanyira pamitengo yotseka yamalonda pachaka komanso kukhazikitsidwa kwa othandizira.
Ndizo zonse lero! Zikomo powerenga :)
PS Pansi pa gawo lomaliza ndidafunsidwa za faust and confluent kafka (
Source: www.habr.com