පටුන
-
II කොටස: නියෝජිතයින් සහ කණ්ඩායම්
අපි මෙතන මොනවද කරන්නේ?
ඉතින්, ඉතින්, දෙවන කොටස. කලින් ලියා ඇති පරිදි, එහි අපි පහත සඳහන් දේ කරන්නෙමු:
-
අපට අවශ්ය අන්ත ලක්ෂ්ය සඳහා ඉල්ලීම් සමඟ aiohttp හි alphavantage සඳහා කුඩා සේවාලාභියෙකු ලියමු.
-
සුරැකුම්පත් පිළිබඳ දත්ත සහ ඒවා පිළිබඳ මෙටා තොරතුරු රැස් කරන නියෝජිතයෙකු නිර්මාණය කරමු.
නමුත්, අපි ව්යාපෘතිය සඳහාම කරන්නේ මෙයයි, සහ වේගවත් පර්යේෂණ අනුව, කෆ්කා වෙතින් ප්රවාහ සිදුවීම් සකසන නියෝජිතයන් ලියන්නේ කෙසේදැයි අපි ඉගෙන ගනිමු, මෙන්ම විධාන ලියන ආකාරය (එතන්න ක්ලික් කරන්න), අපගේ නඩුවේදී - නියෝජිතයා නිරීක්ෂණය කරන මාතෘකාවට අතින් තල්ලු පණිවිඩ සඳහා.
සකස් කිරීම
AlphaVantage සේවාලාභියා
පළමුව, අක්ෂරාංක සඳහා ඉල්ලීම් සඳහා කුඩා aiohttp සේවාදායකයක් ලියන්න.
ස්පොයිලර්
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
ඇත්ත වශයෙන්ම, එයින් සියල්ල පැහැදිලිය:
-
AlphaVantage API ඉතා සරලව හා අලංකාර ලෙස නිර්මාණය කර ඇත, එබැවින් මම ක්රමය හරහා සියලුම ඉල්ලීම් කිරීමට තීරණය කළෙමි
construct_query
එහිදී http ඇමතුමක් ඇත. -
මම සියලුම ක්ෂේත්ර ගෙන එනවා
snake_case
සුවපහසුව සඳහා. -
හොඳයි, අලංකාර සහ තොරතුරු සොයාගැනීමේ ප්රතිදානය සඳහා logger.catch සැරසිලි.
PS config.yml වෙත දේශීයව අක්ෂර ටෝකනය එක් කිරීමට හෝ පරිසර විචල්යය අපනයනය කිරීමට අමතක නොකරන්න HORTON_SERVICE_APIKEY
. අපට ටෝකනයක් ලැබේ
CRUD පන්තිය
සුරැකුම්පත් පිළිබඳ මෙටා තොරතුරු ගබඩා කිරීම සඳහා සුරැකුම්පත් එකතුවක් අප සතුව ඇත.
මගේ මතය අනුව, මෙහි කිසිවක් පැහැදිලි කිරීමට අවශ්ය නැත, සහ මූලික පන්තියම තරමක් සරල ය.
get_app()
යෙදුම් වස්තුවක් සෑදීම සඳහා ශ්රිතයක් එක් කරමු
ස්පොයිලර්
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
දැනට අපට සරලම යෙදුම් නිර්මාණයක් ඇත, ටික වේලාවකට පසුව අපි එය පුළුල් කරන්නෙමු, කෙසේ වෙතත්, ඔබව බලා නොසිටීම සඳහා, මෙහි
ප්රධාන ශරීරය
සුරැකුම්පත් ලැයිස්තුවක් එකතු කිරීම සහ නඩත්තු කිරීම සඳහා නියෝජිතයා
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
එබැවින්, පළමුව අපි වේගවත් යෙදුම් වස්තුව ලබා ගනිමු - එය තරමක් සරල ය. ඊළඟට, අපි අපගේ නියෝජිතයා සඳහා මාතෘකාවක් පැහැදිලිව ප්රකාශ කරමු ... මෙන්න එය කුමක්ද, අභ්යන්තර පරාමිතිය කුමක්ද සහ මෙය වෙනස් ලෙස සකස් කළ හැකි ආකාරය සඳහන් කිරීම වටී.
-
කෆ්කා හි මාතෘකා, අපට නිශ්චිත අර්ථ දැක්වීම දැන ගැනීමට අවශ්ය නම්, එය කියවීමට වඩා හොඳය
අක්රිය. ලේඛනය , හෝ ඔබට කියවිය හැකසංග්රහය රුසියානු භාෂාවෙන් හබ්රේ මත, සෑම දෙයක්ම ඉතා නිවැරදිව පිළිබිඹු වේ :) -
අභ්යන්තර පරාමිතිය , ෆාස්ට් ලේඛනයේ හොඳින් විස්තර කර ඇති අතර, මාතෘකාව කෙලින්ම කේතය තුළ වින්යාස කිරීමට අපට ඉඩ සලසයි, ඇත්ත වශයෙන්ම, මෙයින් අදහස් කරන්නේ ෆාස්ට් සංවර්ධකයින් විසින් සපයන ලද පරාමිතීන්, උදාහරණයක් ලෙස: රඳවා තබා ගැනීම, රඳවා තබා ගැනීමේ ප්රතිපත්තිය (පෙරනිමියෙන් මකා දැමීම, නමුත් ඔබට සැකසිය හැකිය.සංගත ), මාතෘකාවකට කොටස් ගණන (කොටස් කිරීමට, උදාහරණයක් ලෙස, වඩා අඩුගෝලීය වැදගත්කම යෙදුම් වේගවත්). -
පොදුවේ ගත් කල, නියෝජිතයාට ගෝලීය අගයන් සමඟ කළමනාකරණය කළ හැකි මාතෘකාවක් නිර්මාණය කළ හැකිය, කෙසේ වෙතත්, මම සියල්ල පැහැදිලිව ප්රකාශ කිරීමට කැමතියි. ඊට අමතරව, නියෝජිත වෙළඳ දැන්වීමේ මාතෘකාවේ සමහර පරාමිති (උදාහරණයක් ලෙස, කොටස් ගණන හෝ රඳවා ගැනීමේ ප්රතිපත්තිය) වින්යාස කළ නොහැක.
මාතෘකාව අතින් නිර්වචනය නොකර එය කෙබඳු විය හැකිද යන්න මෙන්න:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
හොඳයි, දැන් අපි අපේ නියෝජිතයා කරන්නේ කුමක්දැයි විස්තර කරමු :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
එබැවින්, නියෝජිතයාගේ ආරම්භයේදී, අපි අපගේ සේවාදායකයා හරහා ඉල්ලීම් සඳහා aiohttp සැසියක් විවෘත කරමු. මේ අනුව, සේවකයෙකු ආරම්භ කරන විට, අපගේ නියෝජිතයා දියත් කළ විට, සැසියක් වහාම විවෘත වේ - එකක්, සේවකයා ක්රියාත්මක වන මුළු කාලය සඳහා (හෝ කිහිපයක්, ඔබ පරාමිතිය වෙනස් කරන්නේ නම්
ඊළඟට, අපි ප්රවාහය අනුගමනය කරමු (අපි පණිවිඩය තබමු _
, අපි, මෙම නියෝජිතයා තුළ, අපගේ මාතෘකාවෙන් පණිවිඩවල අන්තර්ගතය ගැන සැලකිල්ලක් නොදක්වන බැවින්, ඒවා වත්මන් ඕෆ්සෙට්හි පවතී නම්, එසේ නොමැති නම් අපගේ චක්රය ඔවුන්ගේ පැමිණීම බලා සිටිනු ඇත. හොඳයි, අපගේ ලූපය තුළ, අපි පණිවිඩයේ රිසිට්පත ලොග් කර, ක්රියාකාරී (get_securities ආපසු ලබා දෙන්නේ පෙරනිමියෙන් සක්රීයයි, සේවාදායක කේතය බලන්න) සුරැකුම්පත් ලැයිස්තුවක් ලබාගෙන එය දත්ත සමුදායට සුරකින්න, එම ටිකර් එක සමඟ ආරක්ෂාවක් තිබේදැයි පරීක්ෂා කර බලන්න. දත්ත සමුදායේ හුවමාරු , තිබේ නම්, එය (කඩදාසි) සරලව යාවත්කාලීන කරනු ලැබේ.
අපි අපේ නිර්මාණය දියත් කරමු!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS විශේෂාංග
අපගේ දියත් කිරීමේ විධානය තුළ, අපි තොරතුරු ලොග් ප්රතිදාන මට්ටම සමඟ යෙදුම් වස්තුව සොයන්නේ කොතැනද සහ එය සමඟ කළ යුතු දේ (කම්කරුවෙකු දියත් කරන්න) ෆාස්ට්ට පැවසුවෙමු. අපට පහත ප්රතිදානය ලැබේ:
ස්පොයිලර්
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
ඒකට පන තියනවා!!!
අපි බලමු පාටිෂන් සෙට් එක. අපට පෙනෙන පරිදි, අපි කේතයේ නම් කළ නම සමඟ මාතෘකාවක් නිර්මාණය කර ඇත, පෙරනිමි කොටස් ගණන (8, උපුටා ගන්නා ලදී
හොඳයි, දැන් අපට වෙනත් පර්යන්ත කවුළුවකට ගොස් අපගේ මාතෘකාවට හිස් පණිවිඩයක් යැවිය හැක:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS භාවිතා කරයි @
අපි "collect_securities" නම් මාතෘකාවකට පණිවිඩයක් යවන බව පෙන්වමු.
මෙම අවස්ථාවේදී, පණිවිඩය 6 කොටස වෙත ගියේය - ඔබට kafdrop on වෙත යාමෙන් මෙය පරීක්ෂා කළ හැකිය localhost:9000
අපගේ සේවකයා සමඟ ටර්මිනල් කවුළුව වෙත ගිය විට, ලෝගුරු භාවිතයෙන් එවන ලද සතුටු පණිවිඩයක් අපට පෙනෙනු ඇත:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
අපට මොංගෝ (Robo3T හෝ Studio3T භාවිතයෙන්) ගැන සොයා බැලිය හැකි අතර සුරැකුම්පත් දත්ත ගබඩාවේ ඇති බව බලන්න:
මම ප්රකෝටිපතියෙක් නොවේ, එබැවින් අපි පළමු නැරඹීමේ විකල්පය සමඟ සෑහීමට පත් වෙමු.
සතුට සහ ප්රීතිය - පළමු නියෝජිතයා සූදානම් :)
නියෝජිතයා සූදානම්, නව නියෝජිතයාට දීර්ඝායුෂ ලැබේවා!
ඔව් මහත්වරුනි, අපි මෙම ලිපියෙන් සකස් කර ඇති මාර්ගයෙන් 1/3 ක් පමණක් ආවරණය කර ඇත, නමුත් අධෛර්යමත් නොවන්න, මන්ද දැන් එය පහසු වනු ඇත.
එබැවින් දැන් අපට මෙටා තොරතුරු රැස් කර එය එකතු කිරීමේ ලේඛනයකට ඇතුළත් කරන නියෝජිතයෙකු අවශ්ය වේ:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
මෙම නියෝජිතයා නිශ්චිත ආරක්ෂාවක් පිළිබඳ තොරතුරු සකසන බැවින්, අපි පණිවිඩයේ මෙම ආරක්ෂාවේ ටිකර් (සංකේතය) සඳහන් කළ යුතුය. මේ සඳහා ෆාස්ට් ඇත
මෙම අවස්ථාවේදී, අපි යමු
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
ඔබ අනුමාන කර ඇති පරිදි, ෆවුස්ට් පණිවිඩ ක්රමය විස්තර කිරීමට python ආකාරයේ විවරණයක් භාවිතා කරයි, එම නිසා පුස්තකාලය මඟින් සහාය දක්වන අවම අනුවාදය වේ.
අපි නියෝජිතයා වෙත ආපසු යමු, වර්ග සකසා එය එකතු කරමු:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
ඔබට පෙනෙන පරිදි, අපි මාතෘකා ආරම්භක ක්රමයට යෝජනා ක්රමයක් සහිත නව පරාමිතියක් ලබා දෙන්නෙමු - value_type. තවද, සෑම දෙයක්ම එකම යෝජනා ක්රමය අනුගමනය කරයි, එබැවින් වෙනත් කිසිවක් මත රැඳී සිටීමේ තේරුමක් මට නොපෙනේ.
හොඳයි, අවසාන ස්පර්ශය වන්නේ Meta තොරතුරු එකතු කිරීමේ නියෝජිතයාට එකතු කිරීමේ_ආරක්ෂක වෙත ඇමතුමක් එක් කිරීමයි:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
අපි පණිවිඩය සඳහා කලින් ප්රකාශයට පත් කළ යෝජනා ක්රමය භාවිතා කරමු. මෙම අවස්ථාවේ දී, නියෝජිතයාගෙන් ප්රති result ලය එනතෙක් බලා සිටීමට අවශ්ය නොවන බැවින් මම .cast ක්රමය භාවිතා කළෙමි, නමුත් එය සඳහන් කිරීම වටී.
-
cast - එය ප්රතිඵලයක් අපේක්ෂා නොකරන නිසා අවහිර නොකරයි. ඔබට ප්රතිඵලය පණිවිඩයක් ලෙස වෙනත් මාතෘකාවකට යැවිය නොහැක.
-
send - ප්රතිඵලයක් බලාපොරොත්තු නොවන නිසා අවහිර නොකරයි. ප්රති result ලය යන මාතෘකාව තුළ ඔබට නියෝජිතයෙකු සඳහන් කළ හැකිය.
-
අසන්න - ප්රතිඵලයක් එනතෙක් බලා සිටී. ප්රති result ලය යන මාතෘකාව තුළ ඔබට නියෝජිතයෙකු සඳහන් කළ හැකිය.
ඉතින්, අදට නියෝජිතයන් සමඟ එපමණයි!
සිහින කණ්ඩායම
මම මේ කොටසේ ලියන්න පොරොන්දු උන අන්තිම දේ තමයි commands. කලින් සඳහන් කළ පරිදි, Faust හි විධාන යනු ක්ලික් කිරීම වටා එතීමකි. ඇත්ත වශයෙන්ම, Faust සරලව -A යතුර නියම කිරීමේදී අපගේ අභිරුචි විධානය එහි අතුරු මුහුණතට අමුණයි
ප්රකාශිත නියෝජිතයින් පසු
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
මේ අනුව, අපි විධාන ලැයිස්තුව ඇමතුවහොත්, අපගේ නව විධානය එහි ඇත:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
අපට වෙනත් ඕනෑම අයෙකු මෙන් එය භාවිතා කළ හැකිය, එබැවින් අපි වේගවත් සේවකයා නැවත ආරම්භ කර සම්පූර්ණ සුරැකුම්පත් එකතුවක් ආරම්භ කරමු:
> faust -A horton.agents start-collect-securities
ඊළඟට කුමක් සිදුවේද?
ඊළඟ කොටසේදී, ඉතිරි නියෝජිතයින් උදාහරණයක් ලෙස භාවිතා කරමින්, වසර සඳහා වෙළඳාමේ අවසාන මිල ගණන් සහ නියෝජිතයින්ගේ ක්රෝන් දියත් කිරීම්වල අන්ත සෙවීම සඳහා සින්ක් යාන්ත්රණය අපි සලකා බලමු.
අදට එච්චරයි! කියවීමට ස්තූතියි :)
PS පහුගිය කොටස යටතේ මගෙන් ඇහුවා Faust and confluent kafka (
මූලාශ්රය: www.habr.com