tartalomjegyzék
-
II. rĂ©sz: Ăgynökök Ă©s csapatok
Mit csinĂĄlunk itt?
SzĂłval, szĂłval, a mĂĄsodik rĂ©sz. Ahogy korĂĄbban Ărtuk, ebben a következĆket fogjuk tenni:
-
Ărjunk egy kis klienst az alphavantage szĂĄmĂĄra az aiohttp-n a szĂŒksĂ©ges vĂ©gpontok kĂ©rĂ©sĂ©vel.
-
Hozzunk lĂ©tre egy ĂŒgynököt, aki adatokat gyƱjt az Ă©rtĂ©kpapĂrokrĂłl Ă©s metainformĂĄciĂłkat azokrĂłl.
De ezt fogjuk megtenni magĂĄnak a projektnek, Ă©s a faust-kutatĂĄs szempontjĂĄbĂłl megtanuljuk, hogyan Ărjunk ĂŒgynököket, amelyek feldolgozzĂĄk a kafkĂĄbĂłl szĂĄrmazĂł stream esemĂ©nyeket, valamint hogyan Ărjunk parancsokat (a mi esetĂŒnkben kattintson a wrapperre) az ĂŒgynök ĂĄltal figyelt tĂ©makörhöz kĂŒldött kĂ©zi push ĂŒzenetekhez.
Edzés
AlphaVantage kliens
ElĆször Ărjunk egy kis aiohttp klienst az alphavantage kĂ©rĂ©sekhez.
Spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
TulajdonkĂ©ppen minden vilĂĄgos belĆle:
-
Az AlphaVantage API meglehetĆsen egyszerƱ Ă©s gyönyörƱen megtervezett, ezĂ©rt Ășgy döntöttem, hogy minden kĂ©rĂ©st a mĂłdszerrel teszek meg
construct_query
ahol viszont van egy http hĂvĂĄs. -
elhozom az összes mezĆt
snake_case
szĂŒksĂ©gszerƱsĂ©g miatt. -
Nos, a logger.catch dekorĂĄciĂł a gyönyörƱ Ă©s informatĂv nyomkövetĂ©si eredmĂ©nyĂ©rt.
PS Ne felejtse el helyileg hozzåadni az alphavantage tokent a config.yml fåjlhoz, vagy exportålni a környezeti våltozót HORTON_SERVICE_APIKEY
. Kapunk egy tokent
CRUD osztĂĄly
ĂrtĂ©kpapĂr-gyƱjtemĂ©nyĂŒnk lesz az Ă©rtĂ©kpapĂrokkal kapcsolatos metainformĂĄciĂłk tĂĄrolĂĄsĂĄra.
VĂ©lemĂ©nyem szerint itt nem kell semmit magyarĂĄzni, Ă©s maga az alaposztĂĄly meglehetĆsen egyszerƱ.
alkalmazås beszerzése()
Adjunk hozzĂĄ egy fĂŒggvĂ©nyt egy alkalmazĂĄsobjektum lĂ©trehozĂĄsĂĄhoz
Spoiler
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
EgyelĆre a legegyszerƱbb alkalmazĂĄskĂ©szĂtĂ©s ĂĄll rendelkezĂ©sĂŒnkre, kicsit kĂ©sĆbb bĆvĂtjĂŒk, de hogy ne kelljen vĂĄrakoznia, itt
FĆ rĂ©sz
Ăgynök az Ă©rtĂ©kpapĂrok listĂĄjĂĄnak összegyƱjtĂ©sĂ©re Ă©s karbantartĂĄsĂĄra
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
TehĂĄt elĆször megkapjuk a faust alkalmazĂĄs objektumot - ez meglehetĆsen egyszerƱ. EzutĂĄn kifejezetten deklarĂĄlunk egy tĂ©mĂĄt az ĂŒgynökĂŒnk szĂĄmĂĄra... Itt Ă©rdemes megemlĂteni, hogy mi az, mi a belsĆ paramĂ©ter, Ă©s hogyan lehet ezt mĂĄskĂ©pp elrendezni.
-
TĂ©mĂĄk kafkĂĄban, ha a pontos definĂciĂłt akarjuk tudni, Ă©rdemes elolvasni
ki. dokumentum , vagy olvashatszabsztrakt a HabrĂ©n oroszul, ahol szintĂ©n elĂ©g pontosan tĂŒkrözĆdik minden :) -
BelsĆ paramĂ©ter , a faust doc-ban elĂ©g jĂłl le van Ărva, lehetĆvĂ© teszi, hogy közvetlenĂŒl a kĂłdban ĂĄllĂtsuk be a tĂ©mĂĄt, termĂ©szetesen ez a faust fejlesztĆk ĂĄltal megadott paramĂ©tereket jelenti, pl.: retention, retention policy (alapĂ©rtelmezĂ©s szerint törlĂ©s, de beĂĄllĂthatĂłkompakt ), partĂciĂłk szĂĄma tĂ©mĂĄnkĂ©nt (vĂĄlaszfalak hogy pĂ©ldĂĄul kevesebbet, mintglobĂĄlis Ă©rtĂ©k alkalmazĂĄsok faust). -
ĂltalĂĄnossĂĄgban elmondhatĂł, hogy az ĂŒgynök lĂ©trehozhat globĂĄlis Ă©rtĂ©kekkel kezelt tĂ©mĂĄt, de Ă©n szeretek mindent kifejezetten deklarĂĄlni. EzenkĂvĂŒl az ĂŒgynökhirdetĂ©sben szereplĆ tĂ©makör nĂ©hĂĄny paramĂ©tere (pĂ©ldĂĄul a partĂciĂłk szĂĄma vagy a megĆrzĂ©si szabĂĄlyzat) nem konfigurĂĄlhatĂł.
Ăgy nĂ©zhet ki a tĂ©ma manuĂĄlis meghatĂĄrozĂĄsa nĂ©lkĂŒl:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
pass
Nos, most Ărjuk le, mit fog tenni az ĂŒgynökĂŒnk :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
TehĂĄt az ĂŒgynök elejĂ©n megnyitunk egy aiohttp munkamenetet az ĂŒgyfelĂŒnkön keresztĂŒli kĂ©rĂ©sekhez. Ăgy egy dolgozĂł indĂtĂĄsakor, amikor az ĂŒgynökĂŒnk elindul, azonnal megnyĂlik egy munkamenet - egy, a dolgozĂł teljes futĂĄsi idejĂ©re (vagy több, ha megvĂĄltoztatja a paramĂ©tert
EzutĂĄn követjĂŒk a folyamot (behelyezzĂŒk az ĂŒzenetet _
, mivel mi ebben az ĂŒgynökben nem törĆdĂŒnk a tĂ©mĂĄnk ĂŒzeneteinek tartalmĂĄval, ha lĂ©teznek az aktuĂĄlis eltolĂĄsnĂĄl, kĂŒlönben a ciklusunk megvĂĄrja Ă©rkezĂ©sĂŒket. Nos, a ciklusunkon belĂŒl naplĂłzzuk az ĂŒzenet beĂ©rkezĂ©sĂ©t, megkapjuk az aktĂv (a get_securities alapĂ©rtelmezĂ©s szerint csak aktĂv Ă©rtĂ©kkel tĂ©r vissza, lĂĄsd az ĂŒgyfĂ©lkĂłdot) Ă©rtĂ©kpapĂrok listĂĄjĂĄt, Ă©s elmentjĂŒk az adatbĂĄzisba, ellenĆrizve, hogy van-e ugyanazzal a tickerrel rendelkezĆ Ă©rtĂ©kpapĂr Ă©s Exchange az adatbĂĄzisban, ha van, akkor az (a papĂr) egyszerƱen frissĂŒl.
IndĂtsuk el alkotĂĄsunkat!
> docker-compose up -d
... ĐĐ°ĐżŃŃĐș ĐșĐŸĐœŃĐ”ĐčĐœĐ”ŃĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info
PS jellemzĆk
IndĂtĂĄsi parancsunkban megmondtuk a faustnak, hogy hol kell keresni az alkalmazĂĄsobjektumot, Ă©s mit kell tenni vele (worker indĂtĂĄsa) az infonaplĂł kimeneti szintjĂ©vel. A következĆ kimenetet kapjuk:
Spoiler
âÆa”Sâ v1.10.4âŹââââââââââââââââââââââââââââââââââââââââââââââââââââ
â id â horton â
â transport â [URL('kafka://localhost:9092')] â
â store â memory: â
â log â -stderr- (info) â
â pid â 1271262 â
â hostname â host-name â
â platform â CPython 3.8.2 (Linux x86_64) â
â drivers â â
â transport â aiokafka=1.1.6 â
â web â aiohttp=3.6.2 â
â datadir â /path/to/project/horton-data â
â appdir â /path/to/project/horton-data/v1 â
âââââââââââââââŽââââââââââââââââââââââââââââââââââââââââââââââââââââ
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...
âTopic Partition SetââââââââââŹâââââââââââââ
â topic â partitions â
ââââââââââââââââââââââââââââââŒâââââââââââââ€
â collect_securities â {0-7} â
â horton-__assignor-__leader â {0} â
ââââââââââââââââââââââââââââââŽâââââââââââââ
Ăletben van!!!
NĂ©zzĂŒk a partĂciĂłkĂ©szletet. Amint lĂĄtjuk, lĂ©trejött egy tĂ©makör a kĂłdban megadott nĂ©vvel, a partĂciĂłk alapĂ©rtelmezett szĂĄmĂĄval (8
Nos, most egy mĂĄsik terminĂĄl ablakba lĂ©phetĂŒnk, Ă©s ĂŒres ĂŒzenetet kĂŒldhetĂŒnk a tĂ©mĂĄnknak:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}
PS hasznĂĄlata @
megmutatjuk, hogy ĂŒzenetet kĂŒldĂŒnk a âcollect_securitiesâ nevƱ tĂ©mĂĄhoz.
Ebben az esetben az ĂŒzenet a 6-os partĂciĂłhoz ment â ââezt a kafdrop on-ra lĂ©pve ellenĆrizheti localhost:9000
MunkatĂĄrsunkkal a terminĂĄlablakba lĂ©pve egy boldog ĂŒzenetet fogunk lĂĄtni, amelyet loguru hasznĂĄlatĂĄval kĂŒldĂŒnk:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
MegnĂ©zhetjĂŒk a mongo-t is (Robo3T vagy Studio3T segĂtsĂ©gĂ©vel), Ă©s lĂĄthatjuk, hogy az Ă©rtĂ©kpapĂrok az adatbĂĄzisban vannak:
Nem vagyok milliĂĄrdos, ezĂ©rt elĂ©gedettek vagyunk az elsĆ megtekintĂ©si lehetĆsĂ©ggel.
BoldogsĂĄg Ă©s öröm â kĂ©sz az elsĆ ĂŒgynök :)
Ăgynök kĂ©sz, Ă©ljen az Ășj ĂŒgynök!
Igen, uraim, mĂ©g csak az 1/3-ĂĄt jĂĄrtuk be a cikk ĂĄltal elĆkĂ©szĂtett Ăștnak, de ne csĂŒggedjenek, mert most könnyebb lesz.
TehĂĄt most szĂŒksĂ©gĂŒnk van egy ĂŒgynökre, amely összegyƱjti a metainformĂĄciĂłkat Ă©s elhelyezi egy gyƱjtĆdokumentumban:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Mivel ez az ĂŒgynök egy adott Ă©rtĂ©kpapĂrral kapcsolatos informĂĄciĂłkat dolgoz fel, az ĂŒzenetben fel kell tĂŒntetnĂŒnk ennek a biztonsĂĄgi Ă©rtĂ©knek a tickerjĂ©t (szimbĂłlumĂĄt). Erre a cĂ©lra faustban vannak
Ebben az esetben menjĂŒnk tovĂĄbb
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Ahogy azt sejteni lehetett, a Faust a python tĂpusĂș annotĂĄciĂłt hasznĂĄlja az ĂŒzenetsĂ©ma leĂrĂĄsĂĄra, ezĂ©rt a könyvtĂĄr ĂĄltal tĂĄmogatott minimĂĄlis verziĂł
TĂ©rjĂŒnk vissza az ĂŒgynökhöz, ĂĄllĂtsuk be a tĂpusokat Ă©s adjuk hozzĂĄ:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Mint lĂĄthatĂł, egy Ășj paramĂ©tert adunk ĂĄt egy sĂ©mĂĄval a tĂ©ma inicializĂĄlĂĄsi metĂłdusĂĄnak - value_type. TovĂĄbbĂĄ minden ugyanazt a sĂ©mĂĄt követi, Ăgy nem lĂĄtom Ă©rtelmĂ©t annak, hogy mĂĄssal foglalkozzak.
Nos, az utolsĂł simĂtĂĄs az, hogy fel kell hĂvni a metainformĂĄciĂł-gyƱjtĆ ĂŒgynököt a collection_securitites-re:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....
Az ĂŒzenethez a korĂĄbban bejelentett sĂ©mĂĄt hasznĂĄljuk. Ebben az esetben a .cast mĂłdszert alkalmaztam, mivel nem kell megvĂĄrnunk az ĂŒgynök eredmĂ©nyĂ©t, de Ă©rdemes megemlĂteni, hogy
-
leadott - nem blokkol, mert nem vĂĄr eredmĂ©nyt. Az eredmĂ©nyt nem kĂŒldheti el ĂŒzenetkĂ©nt mĂĄsik tĂ©mĂĄba.
-
kĂŒldĂ©s - nem blokkol, mert nem vĂĄr eredmĂ©nyt. A tĂ©makörben megadhat egy ĂŒgynököt, amelyhez az eredmĂ©ny kerĂŒl.
-
kĂ©rdezni - eredmĂ©nyre vĂĄr. A tĂ©makörben megadhat egy ĂŒgynököt, amelyhez az eredmĂ©ny kerĂŒl.
TehĂĄt mĂĄra ennyi az ĂŒgynökökkel!
Ălom csapat
Az utolsĂł dolog, amit ebbe a rĂ©szben ĂgĂ©rtem, a parancsok. Amint korĂĄbban emlĂtettĂŒk, a faust parancsok egy kattintĂĄs körĂŒli elemek. ValĂłjĂĄban a Faust egyszerƱen csatolja az egyĂ©ni parancsunkat a felĂŒletĂ©hez, amikor megadja az -A billentyƱt
MiutĂĄn a bejelentett ĂŒgynökök be
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Ăgy, ha meghĂvjuk a parancsok listĂĄjĂĄt, az Ășj parancsunk benne lesz:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
HasznĂĄlhatjuk, mint bĂĄrki mĂĄs, tehĂĄt indĂtsuk Ășjra a faust workert, Ă©s kezdjĂŒk el az Ă©rtĂ©kpapĂrok teljes Ă©rtĂ©kƱ gyƱjtĂ©sĂ©t:
> faust -A horton.agents start-collect-securities
Mi fog ezutån történni?
A következĆ rĂ©szben a fennmaradĂł ĂŒgynökök pĂ©ldĂĄjĂĄval a szĂ©lsĆsĂ©gek keresĂ©sĂ©nek elsĂŒllyedĆ mechanizmusĂĄt vesszĂŒk szemĂŒgyre a kereskedĂ©s Ă©vre vonatkozĂł zĂĄróårfolyamaiban Ă©s az ĂŒgynökök kronindĂtĂĄsĂĄban.
Ez minden måra! Köszönöm, hogy elolvasta :)
PS Az utolsó rész alatt faust és összefolyó kafkåról kérdeztek (
ForrĂĄs: will.com