فهرست
-
دویمه برخه: استازي او ټیمونه
موږ دلته څه کوو؟
نو، نو، دویمه برخه. لکه څنګه چې مخکې لیکل شوي، پدې کې به موږ لاندې کار وکړو:
-
راځئ چې په aiohttp کې د الفاوانټیج لپاره یو کوچنی پیرودونکی ولیکئ د پای ټکي لپاره غوښتنې سره چې موږ ورته اړتیا لرو.
-
راځئ چې یو اجنټ جوړ کړو چې د امنیت او میټا معلوماتو په اړه معلومات راټول کړي.
مګر، دا هغه څه دي چې موږ به پخپله د پروژې لپاره ترسره کړو، او د چټکې څیړنې په شرایطو کې، موږ به د اجنټانو لیکلو څرنګوالی زده کړو چې د کافکا څخه پیښې پروسس کوي، او همدارنګه د امرونو لیکلو څرنګوالی (کلک ریپر)، زموږ په قضیه کې - د لاسي پیغامونو لپاره موضوع ته چې اجنټ یې څارنه کوي.
د چمتو کولو لپاره
د AlphaVantage مراجع
لومړی، راځئ چې د الفاوانټیج غوښتنې لپاره یو کوچنی aiohttp مراجع ولیکئ.
ټوټه ټوټه
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
په حقیقت کې، هرڅه له دې څخه روښانه دي:
-
د AlphaVantage API خورا ساده او ښکلی ډیزاین شوی، نو ما پریکړه وکړه چې ټولې غوښتنې د میتود له لارې ترسره کړم
construct_query
چیرې چې په بدل کې د http کال شتون لري. -
زه ټولې ساحې ته راوړم
snake_case
د آرامۍ لپاره. -
ښه، د ښکلي او معلوماتي ټریس بیک محصول لپاره logger.catch سجاوٹ.
PS مه هیروئ چې په ځایی ډول config.yml ته د الفاوانټاج نښه اضافه کړئ ، یا د چاپیریال متغیر صادر کړئ HORTON_SERVICE_APIKEY
. موږ یوه نښه ترلاسه کوو
د CRUD ټولګي
موږ به د امنیتونو په اړه د میټا معلوماتو ذخیره کولو لپاره د امنیت ټولګه ولرو.
زما په اند، دلته هیڅ شی تشریح کولو ته اړتیا نشته، او د اساس ټولګی پخپله خورا ساده دی.
get_app()
راځئ چې د اپلیکیشن اعتراض رامینځته کولو لپاره یو فنکشن اضافه کړو
ټوټه ټوټه
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
د اوس لپاره به موږ ترټولو ساده غوښتنلیک جوړ کړو، یو څه وروسته به یې پراخه کړو، په هرصورت، د دې لپاره چې تاسو انتظار ونه ساتو، دلته
اصلي برخه
د تضمیناتو لیست راټولولو او ساتلو لپاره اجنټ
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
نو، لومړی موږ د فاسټ غوښتنلیک اعتراض ترلاسه کوو - دا خورا ساده دی. بیا، موږ په ښکاره ډول زموږ د اجنټ لپاره یوه موضوع اعلانوو ... دلته دا د یادولو وړ ده چې دا څه دي، داخلي پیرامیټر څه شی دی او دا څنګه په مختلف ډول تنظیم کیدی شي.
-
په کافکا کې موضوعات، که موږ غواړو دقیق تعریف پوه شو، دا غوره ده چې ولولئ
بند سند ، یا تاسو لوستلی شئټولګه په روسی کې په هابری کې، چیرې چې هرڅه هم په سمه توګه منعکس کیږي :) -
داخلي پارامتر ، په فاسټ ډاک کې خورا ښه بیان شوی ، موږ ته اجازه راکوي چې موضوع مستقیم په کوډ کې تنظیم کړو ، البته ، دا پدې معنی ده چې د فاسټ پراختیا کونکو لخوا چمتو شوي پیرامیټونه ، د مثال په توګه: ساتل ، د ساتلو پالیسي (د ډیفالټ حذف کول ، مګر تاسو کولی شئ تنظیم کړئتړون د هرې موضوع د ویشونو شمیر (نمرې د مثال په توګه، لږ څه کولنړیوال اهمیت غوښتنلیکونه فاسټ). -
په عموم کې، اجنټ کولی شي د نړیوالو ارزښتونو سره اداره شوې موضوع رامینځته کړي، په هرصورت، زه غواړم هر څه په ښکاره توګه اعلان کړم. برسېره پردې، د اجنټ په اعلان کې د موضوع ځینې پیرامیټونه (د بیلګې په توګه، د برخو شمیر یا د ساتلو پالیسي) نشي تنظیم کیدی.
دلته هغه څه دي چې کیدای شي په لاسي ډول د موضوع تعریف کولو پرته ښکاري:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
ښه، اوس راځئ چې تشریح کړو چې زموږ استازی به څه وکړي :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
نو ، د اجنټ په پیل کې ، موږ زموږ د پیرودونکي له لارې غوښتنو لپاره aiohttp سیشن خلاصوو. پدې توګه ، کله چې د کارګر پیل کول ، کله چې زموږ اجنټ پیل شي ، سمدلاسه به یوه ناسته پرانستل شي - یو ، د ټول وخت لپاره چې کارګر روان وي (یا څو ، که تاسو پیرامیټر بدل کړئ
بیا، موږ جریان تعقیب کوو (موږ پیغام په کې ځای په ځای کوو _
، ځکه چې موږ ، پدې اجنټ کې ، زموږ د موضوع څخه د پیغامونو مینځپانګې ته پاملرنه نه کوو ، که دوی په اوسني آفسیټ کې شتون ولري ، که نه نو زموږ دوره به د دوی رارسیدو ته انتظار باسي. ښه، زموږ په لوپ کې دننه، موږ د پیغام رسید لاګ کوو، د فعال لیست ترلاسه کوو (get_securities یوازې د ډیفالټ له مخې فعاله راګرځي، د پیرودونکي کوډ وګورئ) امنیتونه او ډیټابیس ته یې خوندي کړئ، وګورئ چې د ورته ټیکر سره امنیت شتون لري او که نه. په ډیټابیس کې تبادله، که شتون ولري، نو دا (کاغذ) به په ساده ډول تازه شي.
راځئ چې زموږ جوړونه پیل کړو!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
د PS ځانګړتیاوې
زموږ د لانچ کمانډ کې ، موږ فاسټ ته وویل چې چیرې د غوښتنلیک څیز په لټه کې شئ او د دې سره څه وکړئ (کارګر پیل کړئ) د معلوماتو لاګ محصول کچې سره. موږ لاندې محصول ترلاسه کوو:
ټوټه ټوټه
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
ژوندی دی!!!
راځئ چې د برخې برخې ته وګورو. لکه څنګه چې موږ لیدلی شو، یوه موضوع د هغه نوم سره جوړه شوې وه چې موږ یې په کوډ کې ډیزاین کړې، د ویشونو اصلي شمیره (8، له دې څخه اخیستل شوي.
ښه، اوس موږ کولی شو بلې ټرمینل کړکۍ ته لاړ شو او زموږ موضوع ته یو خالي پیغام ولیږو:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS کارول @
موږ وښیو چې موږ د "collect_securities" په نوم یوې موضوع ته پیغام لیږو.
پدې حالت کې ، پیغام 6 برخې ته لاړ - تاسو کولی شئ دا د کافډراپ آن ته لاړشئ localhost:9000
زموږ د کارګر سره د ترمینل کړکۍ ته ځي، موږ به د خوښۍ پیغام وګورو چې د لوګورو په کارولو سره لیږل شوی:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
موږ کولی شو مونګو ته هم وګورو (د Robo3T یا Studio3T په کارولو سره) او وګورو چې امنیت په ډیټابیس کې دي:
زه ملیاردر نه یم، او له همدې امله موږ د لومړي لید اختیار څخه راضي یو.
خوښۍ او خوښۍ - لومړی اجنټ چمتو دی :)
اجنټ چمتو دی ، ژوندی دې وي نوی اجنټ!
هو ، ښاغلو ، موږ یوازې د دې مقالې لخوا چمتو شوې لاره 1/3 پوښلې ده ، مګر مایوسه مه کوئ ، ځکه چې اوس به اسانه وي.
نو اوس موږ یو اجنټ ته اړتیا لرو چې د میټا معلومات راټول کړي او د راټولولو سند کې یې واچوي:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
څرنګه چې دا اجنټ به د یو ځانګړي امنیت په اړه معلومات پروسس کړي، موږ باید په پیغام کې د دې امنیت ټیکر (سمبول) په ګوته کړو. د دې هدف لپاره په فاسټ کې شتون لري
په دې حالت کې، راځئ چې لاړ شو
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
لکه څنګه چې تاسو اټکل کړی وي، فاسټ د پیغام سکیما تشریح کولو لپاره د python ډول تشریح کاروي، له همدې امله د کتابتون لخوا ملاتړ شوی لږترلږه نسخه ده
راځئ چې اجنټ ته راستون شو، ډولونه یې تنظیم کړئ او اضافه کړئ:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
لکه څنګه چې تاسو لیدلی شئ، موږ د سکیم سره د موضوع پیل کولو میتود ته یو نوی پیرامیټر انتقالوو - value_type. سربیره پردې ، هرڅه ورته سکیم تعقیبوي ، نو زه په بل څه کې د اوسیدو هیڅ ټکی نه ګورم.
ښه، وروستی ټچ د میټا معلوماتو راټولولو اجنټ ته د کلیک_سیکوریټیټس لپاره زنګ اضافه کول دي:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
موږ د پیغام لپاره دمخه اعلان شوی سکیم کاروو. پدې حالت کې ، ما د .cast میتود کارولی ځکه چې موږ اړتیا نلرو د اجنټ پایلې ته انتظار وباسو ، مګر دا د یادونې وړ ده
-
کاسټ - بلاک نه کوي ځکه چې دا د پایلې تمه نلري. تاسو نشئ کولی پایلې بلې موضوع ته د پیغام په توګه واستوئ.
-
لیږل - بلاک نه کوي ځکه چې دا د پایلې تمه نلري. تاسو کولی شئ په موضوع کې یو استازی مشخص کړئ چې پایله به یې لاړ شي.
-
پوښتنه وکړئ - پایلې ته انتظار وکړئ. تاسو کولی شئ په موضوع کې یو استازی مشخص کړئ چې پایله به یې لاړ شي.
نو، دا ټول د نن ورځې لپاره د اجنټانو سره دي!
د خوب ټیم
وروستی شی چې ما په دې برخه کې د لیکلو ژمنه کړې وه حکمونه دي. لکه څنګه چې مخکې یادونه وشوه، په فاسټ کې کمانډونه د کلک شاوخوا یو ریپر دی. په حقیقت کې، فاسټ په ساده ډول زموږ دودیز کمانډ خپل انٹرفیس ته ضمیمه کوي کله چې د -A کیلي مشخص کوي
وروسته اعلان شوي اجنټان په کې
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
په دې توګه، که موږ د قوماندې لیست ووایو، زموږ نوې کمانډ به پدې کې وي:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
موږ کولی شو دا د بل چا په څیر وکاروو، نو راځئ چې فاسټ کارګر بیا پیل کړو او د تضمیناتو بشپړ ټولګه پیل کړو:
> faust -A horton.agents start-collect-securities
بیا به څه کیږي؟
په راتلونکې برخه کې، د مثال په توګه د پاتې اجنټانو په کارولو سره، موږ به د کال لپاره د سوداګرۍ د تړلو نرخونو او د اجنټانو کرون لانچ کې د افراطیت لټون کولو لپاره د سنک میکانیزم په پام کې ونیسو.
دا ټول د نن ورځې لپاره دي! د لوستلو لپاره مننه :)
PS په وروستۍ برخه کې له ما څخه د فاسټ او متضاد کافکا په اړه پوښتنه وشوه (
سرچینه: www.habr.com