Агуулга
-
II хэсэг: Агентууд ба багууд
Бид энд юу хийж байгаа юм бэ?
Тиймээс, хоёр дахь хэсэг. Өмнө нь бичсэнчлэн, үүнд бид дараахь зүйлийг хийх болно.
-
Бидэнд хэрэгтэй төгсгөлийн цэгүүдийн хүсэлтийг aiohttp дээр alphavantage-д зориулсан жижиг клиент бичье.
-
Үнэт цаасны талаарх мэдээлэл, тэдгээрийн мета мэдээлэл цуглуулах агент байгуулъя.
Гэхдээ энэ бол төслийн хувьд бид хийх зүйл бөгөөд фауст судалгааны хувьд бид кафкагаас урсгалын үйл явдлуудыг боловсруулдаг агентуудыг хэрхэн бичих, түүнчлэн командуудыг хэрхэн бичих талаар сурах болно (боодол дээр дарна уу), манай тохиолдолд - агентын хянаж буй сэдэв рүү гар аргаар түлхэх мессежийн хувьд.
Сургалт
AlphaVantage үйлчлүүлэгч
Эхлээд alphavantage-ийн хүсэлтэд зориулж жижиг aiohttp клиент бичье.
Хорлон сүйтгэгчид
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Үнэндээ бүх зүйл үүнээс тодорхой байна:
-
AlphaVantage API нь маш энгийн бөгөөд гоёмсог дизайнтай тул би бүх хүсэлтийг энэ аргаар хийхээр шийдсэн
construct_query
хаана нь эргээд http дуудлага байна. -
Би бүх талбайг авчирдаг
snake_case
тав тухтай байдлын төлөө. -
За, сайхан, мэдээлэл сайтай traceback гаралт нь logger.catch чимэглэл.
Жич: config.yml-д альфавантын токеныг дотооддоо нэмэх эсвэл орчны хувьсагчийг экспортлохоо бүү мартаарай. HORTON_SERVICE_APIKEY
. Бид тэмдэг хүлээн авдаг
CRUD анги
Бид үнэт цаасны талаарх мета мэдээллийг хадгалах үнэт цаасны цуглуулгатай болно.
Миний бодлоор энд юу ч тайлбарлах шаардлагагүй бөгөөд үндсэн анги нь өөрөө маш энгийн.
авах_app()
Програмын объект үүсгэх функцийг нэмье
Хорлон сүйтгэгчид
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Одоохондоо бид хамгийн энгийн програм үүсгэх болно, дараа нь бид үүнийг өргөжүүлэх болно, гэхдээ таныг хүлээхгүйн тулд энд
Үндсэн хэсэг
Үнэт цаасны жагсаалтыг цуглуулах, хөтлөх агент
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Тиймээс эхлээд бид faust програмын объектыг олж авдаг - энэ нь маш энгийн. Дараа нь бид төлөөлөгчдөө зориулсан сэдвийг тодорхой зарлаж байна ... Энд энэ нь юу болох, дотоод параметр нь юу болох, үүнийг хэрхэн өөрөөр зохион байгуулж болохыг дурдах нь зүйтэй юм.
-
Кафка дахь сэдвүүд, хэрэв бид яг тодорхой тодорхойлолтыг мэдэхийг хүсвэл уншсан нь дээр
унтраах. баримт бичиг , эсвэл та уншиж болноэмхэтгэл Хабре дээр орос хэл дээр, тэнд бүх зүйл маш нарийн тусгагдсан байдаг :) -
Дотоод параметр , faust doc-д маш сайн тайлбарласан нь бидэнд сэдвийг кодонд шууд тохируулах боломжийг олгодог, мэдээжийн хэрэг, энэ нь faust хөгжүүлэгчдийн өгсөн параметрүүдийг хэлнэ, жишээлбэл: хадгалах, хадгалах бодлого (анхдагчаар устгах, гэхдээ та тохируулж болно)компакт ), сэдэв бүрийн хуваалтын тоо (хуваалтууд хийх, жишээ нь, түүнээс багадэлхийн ач холбогдол програмууд faust). -
Ерөнхийдөө агент нь дэлхийн үнэ цэнэ бүхий удирддаг сэдвийг бий болгож чадна, гэхдээ би бүх зүйлийг тодорхой тунхаглах дуртай. Нэмж дурдахад, агентын сурталчилгааны сэдвийн зарим параметрүүдийг (жишээлбэл, хуваалтын тоо эсвэл хадгалах бодлого) тохируулах боломжгүй.
Сэдвийг гараар тодорхойлохгүйгээр иймэрхүү харагдах болно:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
За, одоо манай агент юу хийхийг тайлбарлая :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Тиймээс, агентын эхэнд бид үйлчлүүлэгчээрээ дамжуулан хүсэлт гаргах aiohttp сессийг нээдэг. Тиймээс, ажилтныг эхлүүлэх үед манай агент нээгдэхэд тэр даруй сесс нээгдэнэ - нэг нь тухайн ажилчин ажиллаж байгаа бүх хугацаанд (эсвэл хэд хэдэн, хэрэв та параметрийг өөрчилбөл).
Дараа нь бид урсгалыг дагадаг (бид мессежийг байрлуулна _
, учир нь бид энэ агентийн хувьд сэдэвийнхээ мессежүүдийн агуулгыг анхаарч үздэггүй, хэрэв тэдгээр нь одоогийн офсет дээр байгаа бол, эс тэгвээс бидний мөчлөг тэдний ирэлтийг хүлээх болно. За, манай гогцоонд бид мессеж хүлээн авснаа бүртгэж, идэвхтэй (get_securities нь зөвхөн өгөгдмөлөөр идэвхтэй буцдаг, үйлчлүүлэгчийн кодыг харна уу) үнэт цаасны жагсаалтыг авч, мэдээллийн санд хадгалдаг, ижил тэмдэгтэй үнэт цаас байгаа эсэхийг шалгадаг. мэдээллийн санд солилцох , хэрэв байгаа бол энэ нь (цаасан) зүгээр л шинэчлэгдэх болно.
Бүтээлээ эхлүүлцгээе!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS онцлогууд
Бидний эхлүүлэх команд дээр бид faust-д програмын объектыг хаанаас хайх, түүнтэй юу хийхийг (ажилчин ажиллуулах) мэдээллийн бүртгэлийн гаралтын түвшинд зааж өгсөн. Бид дараах гаралтыг авна.
Хорлон сүйтгэгчид
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
Энэ амьд!!!
Хуваалтын багцыг харцгаая. Бидний харж байгаагаар кодонд заасан нэрээр сэдэв үүсгэгдсэн, хуваалтын үндсэн тоо (8,
За, одоо бид өөр терминалын цонх руу орж, сэдэв рүүгээ хоосон мессеж илгээх боломжтой.
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
Жич ашиглаж байна @
Бид "үнэт цаас цуглуулах" сэдэвт мессеж илгээж байгаагаа харуулж байна.
Энэ тохиолдолд мессеж 6-р хуваалт руу очсон - та үүнийг kafdrop дээр очиж шалгаж болно localhost:9000
Ажилчинтайгаа хамт терминалын цонх руу очиход бид loguru ашиглан илгээсэн баяртай мессежийг харах болно.
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Мөн бид mongo-г (Robo3T эсвэл Studio3T ашиглан) үзэж, үнэт цааснууд нь мэдээллийн санд байгааг харж болно:
Би тэрбумтан биш, тиймээс бид анхны үзэх сонголтдоо сэтгэл хангалуун байна.
Аз жаргал, баяр баясгалан - анхны агент бэлэн боллоо :)
Агент бэлэн байна, шинэ агент урт наслаарай!
Тийм ээ, ноёд оо, бид энэ өгүүллээр бэлтгэсэн замын 1/3-ийг л даван туулсан, гэхдээ бүү шантар, учир нь одоо энэ нь илүү хялбар байх болно.
Одоо бидэнд мета мэдээлэл цуглуулж цуглуулах баримт бичигт оруулах агент хэрэгтэй байна.
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Энэ агент нь тодорхой үнэт цаасны талаарх мэдээллийг боловсруулах тул бид зурваст энэ үнэт цаасны тэмдэглэгээг (тэмдэг) зааж өгөх шаардлагатай. Энэ зорилгоор Faust-д байдаг
Энэ тохиолдолд бид тийшээ явцгаая
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Таны таамаглаж байсанчлан faust нь мессежийн схемийг тайлбарлахдаа python төрлийн тэмдэглэгээг ашигладаг тул номын сангийн дэмждэг хамгийн бага хувилбар нь
Агент руу буцаж очоод төрлүүдийг тохируулаад нэмнэ үү:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Таны харж байгаагаар бид сэдвийг эхлүүлэх аргад схем бүхий шинэ параметрийг дамжуулдаг - үнэ цэнэ_төрөл. Цаашилбал, бүх зүйл ижил схемийн дагуу явагддаг тул би өөр зүйлд анхаарлаа хандуулах нь утгагүй юм.
За, эцсийн мэдрэгчтэй зүйл бол collect_securitites-д мета мэдээлэл цуглуулах агент руу залгах юм:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Бид мессежийн хувьд өмнө нь зарласан схемийг ашигладаг. Энэ тохиолдолд агентаас үр дүнг хүлээх шаардлагагүй тул би .cast аргыг ашигласан, гэхдээ үүнийг дурдах нь зүйтэй болов уу.
-
цутгах - үр дүнг хүлээхгүй тул блоклодоггүй. Та үр дүнг өөр сэдэв рүү мессеж хэлбэрээр илгээх боломжгүй.
-
илгээх - үр дүн хүлээхгүй тул блоклодоггүй. Та үр дүн гарах сэдвийн төлөөлөгчийг зааж өгч болно.
-
асуух - үр дүнг хүлээж байна. Та үр дүн гарах сэдвийн төлөөлөгчийг зааж өгч болно.
Өнөөдөр энэ бүгд агентуудтай холбоотой!
Мөрөөдлийн баг
Энэ хэсэгт бичнэ гэж амласан сүүлчийн зүйл бол тушаалууд юм. Өмнө дурьдсанчлан, faust дахь командууд нь товшилтыг тойрон эргэлддэг. Үнэн хэрэгтээ faust нь -A товчлуурыг зааж өгөхдөө бидний захиалгат командыг интерфейсдээ хавсаргадаг
Зарлагдсан агентуудын дараа
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Тиймээс, хэрэв бид тушаалуудын жагсаалтыг дуудвал бидний шинэ тушаал үүнд байх болно:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Бид үүнийг бусадтай адил ашиглаж болох тул faust worker-ийг дахин эхлүүлж, үнэт цаасны бүрэн цуглуулгыг эхлүүлцгээе:
> faust -A horton.agents start-collect-securities
Дараа нь юу болох вэ?
Дараагийн хэсэгт үлдэгдэл агентуудыг жишээ болгон ашиглан тухайн жилийн арилжааны хаалтын үнэ болон агентуудыг ажиллуулахад хэт туйлшралыг хайж олох механизмыг авч үзэх болно.
Энэ бол өнөөдрийнх! Уншсанд баярлалаа :)
Жич Сүүлийн хэсгийн доор надаас фауст ба нийлсэн кафкагийн тухай асуусан (
Эх сурвалж: www.habr.com