Mündəricat
II hissə: Agentlər və Komandalar
Bizim burada nə işimiz var?
Beləliklə, ikinci hissə. Daha əvvəl yazıldığı kimi, onda aşağıdakıları edəcəyik:
Bizə lazım olan son nöqtələr üçün sorğularla aiohttp-də alfavantage üçün kiçik bir müştəri yazaq.
Qiymətli kağızlar haqqında məlumat və onlar haqqında meta məlumat toplayacaq agent yaradaq.
Ancaq bu, layihənin özü üçün edəcəyimiz şeydir və faust araşdırması baxımından, biz kafkadan axın hadisələrini emal edən agentləri necə yazmağı, həmçinin əmrləri necə yazmağı (klik sarğı) öyrənəcəyik, bizim vəziyyətimizdə - agentin nəzarət etdiyi mövzuya manuel təkan mesajları üçün.
Təlim
AlphaVantage Müştəri
Əvvəlcə alfavantage sorğuları üçün kiçik bir aiohttp müştəri yazaq.
spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Əslində ondan hər şey aydındır:
AlphaVantage API olduqca sadə və gözəl dizayn edilmişdir, ona görə də bütün sorğuları metod vasitəsilə etmək qərarına gəldim
construct_queryburada öz növbəsində http çağırışı var.Mən bütün sahələri gətirirəm
snake_caserahatlıq üçün.Yaxşı, gözəl və informativ izləmə çıxışı üçün logger.catch dekorasiyası.
PS Alfavantage işarəsini yerli olaraq config.yml-ə əlavə etməyi və ya mühit dəyişənini ixrac etməyi unutmayın. HORTON_SERVICE_APIKEY. Token alırıq .
CRUD sinfi
Qiymətli kağızlar haqqında meta məlumat saxlamaq üçün qiymətli kağızlar kolleksiyamız olacaq.
Fikrimcə, burada heç nə izah etməyə ehtiyac yoxdur və baza sinfinin özü olduqca sadədir.
get_app()
Tətbiq obyekti yaratmaq üçün funksiya əlavə edək
spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Hələlik bizdə ən sadə proqram yaradılması olacaq, bir az sonra onu genişləndirəcəyik, lakin sizi gözləməmək üçün burada Tətbiq sinfinə. Mən də sizə parametrlər sinfinə nəzər salmağı məsləhət görürəm, çünki o, əksər parametrlərə cavabdehdir.
Əsas hissəsidir
Qiymətli kağızların siyahısının toplanması və aparılması üzrə agent
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passBeləliklə, əvvəlcə faust tətbiq obyektini alırıq - bu olduqca sadədir. Sonra, biz agentimiz üçün açıq şəkildə bir mövzu elan edirik... Burada onun nə olduğunu, daxili parametrin nə olduğunu və bunun necə fərqli şəkildə təşkil oluna biləcəyini qeyd etməyə dəyər.
Kafkadakı mövzular, dəqiq tərifini bilmək istəyiriksə, oxumaq daha yaxşıdır və ya oxuya bilərsiniz Rus dilində Habré-də, burada hər şey olduqca dəqiq əks olunur :)
, faust doc-da olduqca yaxşı təsvir edilmişdir, mövzunu birbaşa kodda konfiqurasiya etməyə imkan verir, əlbəttə ki, bu, faust tərtibatçıları tərəfindən verilən parametrlər deməkdir, məsələn: saxlama, saxlama siyasəti (standart olaraq silmək, ancaq təyin edə bilərsiniz. ), mövzu üzrə bölmələrin sayı (etmək, məsələn, az etmək proqramlar faust).
Ümumiyyətlə, agent qlobal dəyərlərlə idarə olunan mövzu yarada bilər, lakin mən hər şeyi açıq şəkildə bəyan etməyi xoşlayıram. Bundan əlavə, agent reklamındakı mövzunun bəzi parametrləri (məsələn, bölmələrin sayı və ya saxlama siyasəti) konfiqurasiya edilə bilməz.
Mövzunu əl ilə təyin etmədən belə görünə bilər:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passYaxşı, indi agentimizin nə edəcəyini təsvir edək :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Beləliklə, agentin başlanğıcında müştərimiz vasitəsilə sorğular üçün aiohttp sessiyası açırıq. Beləliklə, bir işçi işə başladıqda, agentimiz işə salındıqda dərhal bir sessiya açılacaq - bir, işçinin işlədiyi bütün müddət üçün (və ya parametri dəyişdirsəniz, bir neçə). standart vahidi olan agentdən).
Sonra, axını izləyirik (mesajı yerləşdiririk _, çünki biz bu agentdə mövzumuzdan gələn mesajların məzmununa əhəmiyyət vermədiyimiz üçün, əgər onlar cari ofsetdə varsa, əks halda dövrümüz onların gəlişini gözləyəcək. Yaxşı, döngəmizdə mesajın alınmasını qeyd edirik, aktiv (get_securities yalnız default olaraq aktivdir, müştəri koduna baxın) qiymətli kağızların siyahısını əldə edirik və eyni ticker və qiymətli kağızın olub olmadığını yoxlayaraq verilənlər bazasında saxlayırıq. verilənlər bazasında mübadilə , əgər varsa, o (kağız) sadəcə yenilənəcəkdir.
Yaradıcılığımıza başlayaq!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l infoPS Xüsusiyyətləri Məqalələrdə faustu nəzərdən keçirməyəcəyəm, ona görə də müvafiq bayrağı qoyduq.
Başlatma əmrimizdə biz faust-a məlumat jurnalının çıxış səviyyəsi ilə tətbiq obyektini harada axtarmaq və onunla nə etmək lazım olduğunu (işçini işə salmaq) söylədik. Aşağıdakı çıxışı alırıq:
spoiler
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘ O canlıdır!!!
Bölmə dəstinə baxaq. Gördüyümüz kimi, kodda təyin etdiyimiz adla, standart bölmələrin sayı (8, götürülmüşdür) ilə bir mövzu yaradıldı. - tətbiq obyekti parametri), mövzumuz üçün fərdi dəyər təyin etmədiyimiz üçün (arakəsmələr vasitəsilə). İşçidə işə salınan agentə bütün 8 arakəsmə təyin olunur, çünki bu, yeganədir, lakin bu, klasterləşmə ilə bağlı hissədə daha ətraflı müzakirə olunacaq.
Yaxşı, indi başqa bir terminal pəncərəsinə keçib mövzumuza boş bir mesaj göndərə bilərik:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}PS istifadə edir @ biz “qiymətli kağızları toplamaq” adlı mövzuya mesaj göndərdiyimizi göstəririk.
Bu halda mesaj 6-cı bölməyə getdi - kafdrop-a keçərək bunu yoxlaya bilərsiniz localhost:9000
İşçimizlə terminal pəncərəsinə gedərkən loguru istifadə edərək göndərilən xoşbəxt mesajı görəcəyik:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securitiesBiz həmçinin mongo-ya (Robo3T və ya Studio3T istifadə edərək) baxa bilərik və qiymətli kağızların verilənlər bazasında olduğunu görə bilərik:
Mən milyarder deyiləm və buna görə də birinci baxış seçimi ilə kifayətlənirik.
Xoşbəxtlik və sevinc - ilk agent hazırdır :)
Agent hazır, yaşasın yeni agent!
Bəli, cənablar, biz bu məqalənin hazırladığı yolun yalnız 1/3-ni keçdik, amma ruhdan düşməyin, çünki indi daha asan olacaq.
Beləliklə, indi bizə meta məlumat toplayan və onu kolleksiya sənədinə qoyan agent lazımdır:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...Bu agent konkret qiymətli kağız haqqında məlumatları emal edəcəyi üçün mesajda bu qiymətli kağızın işarəsini (simvolunu) göstərməliyik. Bu məqsədlə faustda var — agent mövzusunda mesaj sxemini elan edən siniflər.
Bu vəziyyətdə, gedək və bu mövzu üçün mesajın necə görünəcəyini təsvir edin:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Təxmin etdiyiniz kimi, faust mesaj sxemini təsvir etmək üçün python tipli annotasiyadan istifadə edir, buna görə də kitabxana tərəfindən dəstəklənən minimum versiya .
Agentə qayıdaq, növləri təyin edək və əlavə edək:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Gördüyünüz kimi, mövzunun başlatma metoduna sxem ilə yeni bir parametr keçirik - dəyər_tipi. Bundan əlavə, hər şey eyni sxemə uyğundur, buna görə də başqa bir şey üzərində dayanmağın mənasını görmürəm.
Yaxşı, son toxunuş, collect_securitites üçün meta məlumat toplama agentinə zəng əlavə etməkdir:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....Mesaj üçün əvvəlcədən elan edilmiş sxemdən istifadə edirik. Bu halda mən .cast metodundan istifadə etdim, çünki agentdən nəticə gözləməyə ehtiyac yoxdur, lakin qeyd etmək lazımdır ki, mövzuya mesaj göndərin:
tökmə - nəticə gözləmədiyi üçün blok etmir. Nəticəni başqa mövzuya mesaj olaraq göndərə bilməzsiniz.
göndər - nəticə gözləmədiyi üçün blok etmir. Nəticənin gedəcəyi mövzuda agent təyin edə bilərsiniz.
soruşmaq - nəticəni gözləyir. Nəticənin gedəcəyi mövzuda agent təyin edə bilərsiniz.
Beləliklə, bu gün agentlərlə olan hər şey budur!
Xəyal Komandası
Bu hissədə yazmağa söz verdiyim son şey əmrlərdir. Daha əvvəl qeyd edildiyi kimi, faustdakı əmrlər klik ətrafında bir sarğıdır. Əslində, faust sadəcə olaraq -A düyməsini təyin edərkən bizim xüsusi əmrimizi onun interfeysinə əlavə edir
Agentlər içəri girdikdən sonra elan edildi dekorator ilə funksiya əlavə edin app.commandmetodu çağırır tökmə у təhlükəsizlikləri_toplayın:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Beləliklə, əmrlərin siyahısını çağırsaq, yeni əmrimiz onda olacaq:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Biz də hər kəs kimi istifadə edə bilərik, ona görə də gəlin faust işçisini yenidən işə salaq və qiymətli kağızların tam hüquqlu kolleksiyasına başlayaq:
> faust -A horton.agents start-collect-securitiesBundan sonra nə olacaq?
Növbəti hissədə, qalan agentlərdən nümunə kimi istifadə edərək, il üçün ticarətin bağlanış qiymətlərində ifratların axtarışı üçün sink mexanizmini və agentlərin cron işə salınmasını nəzərdən keçirəcəyik.
Bu gün üçün hamısı budur! Oxuduğunuz üçün təşəkkürlər :)
P.S. Son hissənin altında məndən faust və birləşən kafka haqqında soruşdular (). Görünür, birləşən bir çox cəhətdən daha funksionaldır, amma fakt budur ki, faust birləşən üçün tam müştəri dəstəyinə malik deyil - bundan belə çıxır..
Mənbə: www.habr.com
