ማውጫ
-
ክፍል II: ወኪሎች እና ቡድኖች
እዚህ ምን እየሰራን ነው?
ስለዚህ, ሁለተኛው ክፍል. ቀደም ሲል እንደተፃፈው, በውስጡ የሚከተሉትን እናደርጋለን.
-
የምንፈልጋቸውን የመጨረሻ ነጥቦችን በመጠየቅ በaiohttp ላይ ለፊደል የሚሆን ትንሽ ደንበኛን እንፃፍ።
-
በእነሱ ላይ የደህንነት እና የሜታ መረጃን የሚሰበስብ ወኪል እንፍጠር።
ግን ይህ እኛ ለፕሮጀክቱ ራሱ የምናደርገው ነው ፣ እና ከተሳሳተ ምርምር አንፃር ፣ ክስተቶችን ከካፍ የሚያስተላልፉ ወኪሎችን እንዴት እንደሚጽፉ ፣ እንዲሁም ትዕዛዞችን እንዴት እንደሚጽፉ (ጠቅታ መጠቅለያ) ፣ በእኛ ሁኔታ - እንማራለን ። ወኪሉ ወደሚከታተለው ርዕስ በእጅ የሚገፋ መልእክት።
ዝግጅት
የአልፋቫንቴጅ ደንበኛ
በመጀመሪያ፣ ለፊደል ጥያቄዎች ትንሽ የaiohttp ደንበኛ እንፃፍ።
አጣሚ
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
በእውነቱ ፣ ሁሉም ነገር ከእሱ ግልፅ ነው-
-
የአልፋቫንቴጅ ኤፒአይ ቀላል እና በሚያምር ሁኔታ የተነደፈ ነው፣ ስለዚህ ሁሉንም ጥያቄዎች በዘዴ ለማቅረብ ወሰንኩ።
construct_query
በተራው ደግሞ http ጥሪ አለ. -
ሁሉንም መስኮች አመጣለሁ
snake_case
ለምቾት። -
እንግዲህ፣ logger.catch ጌጥ ለቆንጆ እና መረጃ ሰጪ የመከታተያ ውጤት።
PS ወደ config.yml የሆሄያት ቶከንን በአገር ውስጥ ማከል ወይም የአካባቢ ተለዋዋጭ ወደ ውጭ መላክን አይርሱ HORTON_SERVICE_APIKEY
. ማስመሰያ እንቀበላለን
CRUD ክፍል
ስለ ደህንነቶች ሜታ መረጃን ለማከማቸት የዋስትናዎች ስብስብ ይኖረናል።
በእኔ አስተያየት, እዚህ ምንም ነገር ማብራራት አያስፈልግም, እና የመሠረት ክፍሉ ራሱ በጣም ቀላል ነው.
አግኙ()
ውስጥ የመተግበሪያ ነገር ለመፍጠር ተግባር እንጨምር
አጣሚ
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
ለአሁኑ በጣም ቀላል የሆነውን የመተግበሪያ ፈጠራ ይኖረናል ፣ ትንሽ ቆይተን እናሰፋዋለን ፣ ግን እርስዎን እየጠበቁ እንዳንቆይ ፣ እዚህ
ዋናው ክፍል
የዋስትናዎች ዝርዝርን ለመሰብሰብ እና ለማቆየት ወኪል
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
ስለዚህ ፣ በመጀመሪያ ፈጣን መተግበሪያን እናገኛለን - በጣም ቀላል ነው። በመቀጠል, ለተወካያችን አንድ ርዕስ በግልፅ እናውጃለን ... እዚህ ምን እንደሆነ, የውስጥ መለኪያው ምን እንደሆነ እና ይህ በተለየ መንገድ እንዴት እንደሚስተካከል መጥቀስ ተገቢ ነው.
-
በካፍካ ውስጥ ያሉ ርዕሶች, ትክክለኛውን ፍቺ ማወቅ ከፈለግን ማንበብ የተሻለ ነው
ጠፍቷል ሰነድ , ወይም ማንበብ ይችላሉማጠቃለያ ሁሉም ነገር በትክክል በሚንጸባረቅበት በሩሲያኛ ሀበሬ ላይ :) -
መለኪያ ውስጣዊ , በፋስት ዶክ ውስጥ በደንብ የተገለፀው, ርዕሱን በቀጥታ በኮዱ ውስጥ እንድናዋቅር ያስችለናል, በእርግጥ ይህ ማለት በፋስት ገንቢዎች የቀረቡትን መለኪያዎች ለምሳሌ ማቆየት, ማቆየት ፖሊሲ (በነባሪ ሰርዝ, ነገር ግን ማቀናበር ይችላሉ).የተጠጋጋ በአንድ ርዕስ ክፍልፋዮች ብዛት (ክፋዮች ማድረግ, ለምሳሌ, ያነሰዓለም አቀፋዊ ጠቀሜታ መተግበሪያዎች ፈጣን)። -
በአጠቃላይ ተወካዩ በአለምአቀፍ እሴቶች የሚተዳደር ርዕስ መፍጠር ይችላል, ሆኖም ግን, ሁሉንም ነገር በግልፅ ማወጅ እፈልጋለሁ. በተጨማሪም፣ በወኪል ማስታወቂያ ውስጥ ያለው ርዕስ አንዳንድ መለኪያዎች (ለምሳሌ፣ የክፍሎች ብዛት ወይም የማቆያ ፖሊሲ) ሊዋቀሩ አይችሉም።
ርዕሱን በእጅ ሳይገልጹ ምን ሊመስል እንደሚችል እነሆ፡-
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
ደህና፣ አሁን ወኪላችን ምን እንደሚያደርግ እንገልፃለን :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
ስለዚህ፣ በወኪሉ መጀመሪያ ላይ፣ በደንበኛችን በኩል ለሚቀርቡ ጥያቄዎች የ aiohttp ክፍለ ጊዜ እንከፍተዋለን። ስለዚህ ሰራተኛ ሲጀመር ወኪላችን ሲጀመር አንድ ክፍለ ጊዜ ወዲያውኑ ይከፈታል - አንድ ፣ ሰራተኛው በሚሰራበት ጊዜ ሁሉ (ወይም ብዙ ፣ መለኪያውን ከቀየሩ)
በመቀጠል ዥረቱን እንከተላለን (መልእክቱን እናስቀምጣለን _
እኛ በዚህ ወኪል ውስጥ ስለ ይዘቱ ግድ ስለሌለው) ከርዕሰ ጉዳያችን የሚመጡ መልዕክቶች አሁን ባለው ማካካሻ ላይ ካሉ ፣ አለበለዚያ ዑደታችን እስኪደርሱ ድረስ ይጠብቃል። ደህና፣ በእኛ ሉፕ ውስጥ፣ የመልእክቱን ደረሰኝ ውስጥ እናስገባለን፣ የገባሪዎችን ዝርዝር አግኝተናል (get_securities በነባሪ ገቢር ብቻ ነው፣ የደንበኛ ኮድን ይመልከቱ) ደህንነቶች እና ወደ ዳታቤዝ እናስቀምጠዋለን፣ ተመሳሳይ ምልክት ያለው ደህንነት መኖሩን እናረጋግጣለን። በመረጃ ቋቱ ውስጥ መለዋወጥ , ካለ, ከዚያም (ወረቀቱ) በቀላሉ ይዘምናል.
ፈጠራችንን እንጀምር!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS ባህሪያት
በእኛ የማስጀመሪያ ትእዛዝ፣ የመተግበሪያውን ነገር የት መፈለግ እንዳለብን እና ምን ማድረግ እንዳለብን (ሰራተኛ ማስጀመር) በመረጃ ምዝግብ ማስታወሻ ውፅዓት ደረጃ ነግረናል። የሚከተለውን ውጤት እናገኛለን:
አጣሚ
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘
ህያው ነው!!!
የክፍልፋይ ስብስብን እንመልከት. እንደምናየው፣ በኮዱ ውስጥ የመረጥነውን ስም፣ ነባሪ የክፍሎች ቁጥር ያለው ርዕስ ተፈጠረ (8፣ ከ የተወሰደ
ደህና፣ አሁን ወደ ሌላ ተርሚናል መስኮት ሄደን ባዶ መልእክት ወደ ርዕሳችን መላክ እንችላለን፡-
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
ፒኤስ በመጠቀም @
"የስብስብ_ደህንነቶች" ወደተባለ ርዕስ መልእክት እየላክን መሆኑን እናሳያለን።
በዚህ አጋጣሚ መልእክቱ ወደ ክፍል 6 ሄዷል - ወደ kafdrop በመሄድ ይህንን ማረጋገጥ ይችላሉ localhost:9000
ከሰራተኞቻችን ጋር ወደ ተርሚናል መስኮት ስንሄድ ሎጉሩ በመጠቀም የተላከ አስደሳች መልእክት እናያለን፡-
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
እንዲሁም ወደ ሞንጎ (Robo3T ወይም Studio3T በመጠቀም) መመልከት እንችላለን እና ዋስትናዎቹ በመረጃ ቋቱ ውስጥ እንዳሉ ማየት እንችላለን፡
እኔ ቢሊየነር አይደለሁም, እና ስለዚህ በመጀመሪያው የመመልከቻ አማራጭ ረክተናል.
ደስታ እና ደስታ - የመጀመሪያው ወኪል ዝግጁ ነው :)
ወኪል ተዘጋጅቷል፣ አዲሱ ወኪል ይድረስ!
አዎን, ክቡራን, በዚህ ጽሑፍ የተዘጋጀውን መንገድ 1/3 ብቻ ሸፍነናል, ነገር ግን ተስፋ አትቁረጥ, ምክንያቱም አሁን ቀላል ይሆናል.
ስለዚህ አሁን ሜታ መረጃን የሚሰበስብ እና ወደ መሰብሰቢያ ሰነድ የሚያስቀምጥ ወኪል እንፈልጋለን፡-
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
ይህ ወኪል ስለ አንድ የተወሰነ ደህንነት መረጃ ስለሚያስኬድ፣ በመልእክቱ ውስጥ የዚህን ደህንነት ምልክት (ምልክት) መጠቆም አለብን። ለዚህ ዓላማ በፋስት ውስጥ አሉ
በዚህ ሁኔታ, ወደ እንሂድ
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
እርስዎ እንደገመቱት ፋስት የመልእክቱን እቅድ ለመግለፅ የpython አይነት ማብራሪያን ይጠቀማል፣ ለዚህም ነው በቤተ-መጽሐፍት የሚደገፈው አነስተኛው ስሪት የሆነው።
ወደ ወኪሉ እንመለስ፣ ዓይነቶችን እናዘጋጃለን እና እንጨምረው፡-
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
እንደሚመለከቱት ፣ አዲስ ግቤት ከእቅድ ጋር ወደ ርዕስ ማስጀመሪያ ዘዴ - value_type እናስተላልፋለን። በተጨማሪም, ሁሉም ነገር አንድ አይነት እቅድ ይከተላል, ስለዚህ በሌላ ነገር ላይ ለመኖር ምንም ፋይዳ አይታየኝም.
ደህና፣ የመጨረሻው ንክኪ ወደ ሜታ መረጃ መሰብሰቢያ ወኪል ጥሪን ወደ መሰብሰብ_ሴኩሪቲስ ማከል ነው።
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
ለመልእክቱ ቀደም ሲል የታወጀውን እቅድ እንጠቀማለን. በዚህ አጋጣሚ፣ ከተወካዩ የተገኘውን ውጤት መጠበቅ ስለማያስፈልገን የ .cast ዘዴን ተጠቀምኩ፣ ነገር ግን ያንን መጥቀስ ተገቢ ነው።
-
ውሰድ - ውጤትን ስለማይጠብቅ አያግድም. ውጤቱን እንደ መልእክት ወደ ሌላ ርዕስ መላክ አይችሉም።
-
መላክ - ውጤቱን ስለማይጠብቅ አያግድም. ውጤቱ በሚሄድበት ርዕስ ውስጥ ወኪልን መግለጽ ይችላሉ.
-
ይጠይቁ - ውጤቱን ይጠብቃል. ውጤቱ በሚሄድበት ርዕስ ውስጥ ወኪልን መግለጽ ይችላሉ.
ስለዚህ፣ ለዛሬ በወኪሎች ያ ብቻ ነው!
የህልም ቡድን
በዚህ ክፍል ለመጻፍ ቃል የገባሁት የመጨረሻው ነገር ትዕዛዞችን ነው። ቀደም ሲል እንደተገለፀው በፋስት ውስጥ ያሉ ትዕዛዞች በጠቅታ ዙሪያ መጠቅለያ ናቸው። እንደውም ፋስት የ -A ቁልፍን ሲገልፅ በቀላሉ ብጁ ትዕዛዛችንን ከበይነገጽ ጋር ያያይዘዋል
ከታወቁት ወኪሎች በኋላ
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
ስለዚህ፣ የትዕዛዞቹን ዝርዝር ከጠራን፣ አዲሱ ትዕዛዛችን በውስጡ ይሆናል።
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
እንደማንኛውም ሰው ልንጠቀምበት እንችላለን፣ስለዚህ ፈጣን ሰራተኛውን እንደገና እናስጀምር እና የተሟላ የደህንነት ስብስብ እንጀምር፡-
> faust -A horton.agents start-collect-securities
ቀጥሎ ምን ይሆናል?
በሚቀጥለው ክፍል የቀሩትን ወኪሎች እንደ ምሳሌ በመጠቀም በዓመቱ የግብይት ዋጋ መዝጊያ ላይ ጽንፍ የመፈለግ ዘዴን እና የወኪሎችን ክሮን ማስጀመርን እንመለከታለን።
ለዛሬ ያ ብቻ ነው! ስላነበቡ እናመሰግናለን :)
PS በመጨረሻው ክፍል ስር ስለ ፋስት እና ስለ ካፍካ ተጠየቅ (
ምንጭ: hab.com