Szóval, szóval, a második rész. Ahogy korábban írtuk, ebben a következőket fogjuk tenni:
Írjunk egy kis klienst az alphavantage számára az aiohttp-n a szükséges végpontok kérésével.
Hozzunk létre egy ügynököt, aki adatokat gyűjt az értékpapírokról és metainformációkat azokról.
De ezt fogjuk megtenni magának a projektnek, és a faust-kutatás szempontjából megtanuljuk, hogyan írjunk ügynököket, amelyek feldolgozzák a kafkából származó stream eseményeket, valamint hogyan írjunk parancsokat (a mi esetünkben kattintson a wrapperre) az ügynök által figyelt témakörhöz küldött kézi push üzenetekhez.
Edzés
AlphaVantage kliens
Először írjunk egy kis aiohttp klienst az alphavantage kérésekhez.
Az AlphaVantage API meglehetősen egyszerű és gyönyörűen megtervezett, ezért úgy döntöttem, hogy minden kérést a módszerrel teszek meg construct_query ahol viszont van egy http hívás.
elhozom az összes mezőt snake_case szükségszerűség miatt.
Nos, a logger.catch dekoráció a gyönyörű és informatív nyomkövetési eredményért.
PS Ne felejtse el helyileg hozzáadni az alphavantage tokent a config.yml fájlhoz, vagy exportálni a környezeti változót HORTON_SERVICE_APIKEY. Kapunk egy tokent itt.
CRUD osztály
Értékpapír-gyűjteményünk lesz az értékpapírokkal kapcsolatos metainformációk tárolására.
Egyelőre a legegyszerűbb alkalmazáskészítés áll rendelkezésünkre, kicsit később bővítjük, de hogy ne kelljen várakoznia, itt hivatkozások az App-osztályhoz. Azt is tanácsolom, hogy vessen egy pillantást a beállítási osztályra, mivel ez felelős a legtöbb beállításért.
Fő rész
Ügynök az értékpapírok listájának összegyűjtésére és karbantartására
Tehát először megkapjuk a faust alkalmazás objektumot - ez meglehetősen egyszerű. Ezután kifejezetten deklarálunk egy témát az ügynökünk számára... Itt érdemes megemlíteni, hogy mi az, mi a belső paraméter, és hogyan lehet ezt másképp elrendezni.
Témák kafkában, ha a pontos definíciót akarjuk tudni, érdemes elolvasni ki. dokumentum, vagy olvashatsz absztrakt a Habrén oroszul, ahol szintén elég pontosan tükröződik minden :)
Belső paraméter, a faust doc-ban elég jól le van írva, lehetővé teszi, hogy közvetlenül a kódban állítsuk be a témát, természetesen ez a faust fejlesztők által megadott paramétereket jelenti, pl.: retention, retention policy (alapértelmezés szerint törlés, de beállítható kompakt), partíciók száma témánként (válaszfalakhogy például kevesebbet, mint globális érték alkalmazások faust).
Általánosságban elmondható, hogy az ügynök létrehozhat globális értékekkel kezelt témát, de én szeretek mindent kifejezetten deklarálni. Ezenkívül az ügynökhirdetésben szereplő témakör néhány paramétere (például a partíciók száma vagy a megőrzési szabályzat) nem konfigurálható.
Így nézhet ki a téma manuális meghatározása nélkül:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Tehát az ügynök elején megnyitunk egy aiohttp munkamenetet az ügyfelünkön keresztüli kérésekhez. Így egy dolgozó indításakor, amikor az ügynökünk elindul, azonnal megnyílik egy munkamenet - egy, a dolgozó teljes futási idejére (vagy több, ha megváltoztatja a paramétert egyidejűség alapértelmezett egységgel rendelkező ügynöktől).
Ezután követjük a folyamot (behelyezzük az üzenetet _, mivel mi ebben az ügynökben nem törődünk a témánk üzeneteinek tartalmával, ha léteznek az aktuális eltolásnál, különben a ciklusunk megvárja érkezésüket. Nos, a ciklusunkon belül naplózzuk az üzenet beérkezését, megkapjuk az aktív (a get_securities alapértelmezés szerint csak aktív értékkel tér vissza, lásd az ügyfélkódot) értékpapírok listáját, és elmentjük az adatbázisba, ellenőrizve, hogy van-e ugyanazzal a tickerrel rendelkező értékpapír és Exchange az adatbázisban, ha van, akkor az (a papír) egyszerűen frissül.
Indítsuk el alkotásunkat!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS jellemzők web komponens Faust nem fogom figyelembe venni a cikkekben, ezért kitűztük a megfelelő zászlót.
Indítási parancsunkban megmondtuk a faustnak, hogy hol kell keresni az alkalmazásobjektumot, és mit kell tenni vele (worker indítása) az infonapló kimeneti szintjével. A következő kimenetet kapjuk:
Nézzük a partíciókészletet. Amint látjuk, létrejött egy témakör a kódban megadott névvel, a partíciók alapértelmezett számával (8 topic_partitions - alkalmazásobjektum paraméter), mivel témánknak nem adtunk meg egyedi értéket (partíciókon keresztül). A workerben elindított ügynökhöz mind a 8 partíció hozzá van rendelve, mivel ez az egyetlen, de erről a fürtözésről szóló részben lesz még szó.
Nos, most egy másik terminál ablakba léphetünk, és üres üzenetet küldhetünk a témánknak:
PS használata @ megmutatjuk, hogy üzenetet küldünk a „collect_securities” nevű témához.
Ebben az esetben az üzenet a 6-os partícióhoz ment – ezt a kafdrop on-ra lépve ellenőrizheti localhost:9000
Munkatársunkkal a terminálablakba lépve egy boldog üzenetet fogunk látni, amelyet loguru használatával küldünk:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Megnézhetjük a mongo-t is (Robo3T vagy Studio3T segítségével), és láthatjuk, hogy az értékpapírok az adatbázisban vannak:
Nem vagyok milliárdos, ezért elégedettek vagyunk az első megtekintési lehetőséggel.
Boldogság és öröm – kész az első ügynök :)
Ügynök kész, éljen az új ügynök!
Igen, uraim, még csak az 1/3-át jártuk be a cikk által előkészített útnak, de ne csüggedjenek, mert most könnyebb lesz.
Tehát most szükségünk van egy ügynökre, amely összegyűjti a metainformációkat és elhelyezi egy gyűjtődokumentumban:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Mivel ez az ügynök egy adott értékpapírral kapcsolatos információkat dolgoz fel, az üzenetben fel kell tüntetnünk ennek a biztonsági értéknek a tickerjét (szimbólumát). Erre a célra faustban vannak Records — osztályok, amelyek deklarálják az üzenetsémát az ügynök témakörben.
Ebben az esetben menjünk tovább records.pyés írja le, hogyan kell kinéznie a téma üzenetének:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Ahogy azt sejteni lehetett, a Faust a python típusú annotációt használja az üzenetséma leírására, ezért a könyvtár által támogatott minimális verzió 3.6.
Térjünk vissza az ügynökhöz, állítsuk be a típusokat és adjuk hozzá:
Mint látható, egy új paramétert adunk át egy sémával a téma inicializálási metódusának - value_type. Továbbá minden ugyanazt a sémát követi, így nem látom értelmét annak, hogy mással foglalkozzak.
Nos, az utolsó simítás az, hogy fel kell hívni a metainformáció-gyűjtő ügynököt a collection_securitites-re:
Az üzenethez a korábban bejelentett sémát használjuk. Ebben az esetben a .cast módszert alkalmaztam, mivel nem kell megvárnunk az ügynök eredményét, de érdemes megemlíteni, hogy módja küldj üzenetet a témához:
leadott - nem blokkol, mert nem vár eredményt. Az eredményt nem küldheti el üzenetként másik témába.
küldés - nem blokkol, mert nem vár eredményt. A témakörben megadhat egy ügynököt, amelyhez az eredmény kerül.
kérdezni - eredményre vár. A témakörben megadhat egy ügynököt, amelyhez az eredmény kerül.
Tehát mára ennyi az ügynökökkel!
Álom csapat
Az utolsó dolog, amit ebbe a részben ígértem, a parancsok. Amint korábban említettük, a faust parancsok egy kattintás körüli elemek. Valójában a Faust egyszerűen csatolja az egyéni parancsunkat a felületéhez, amikor megadja az -A billentyűt
Miután a bejelentett ügynökök be ügynökök.py adjunk hozzá egy funkciót egy dekorátorral app.commandmódszernek nevezve öntött у gyűjt_értékpapírokat:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Így, ha meghívjuk a parancsok listáját, az új parancsunk benne lesz:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Használhatjuk, mint bárki más, tehát indítsuk újra a faust workert, és kezdjük el az értékpapírok teljes értékű gyűjtését:
> faust -A horton.agents start-collect-securities
Mi fog ezután történni?
A következő részben a fennmaradó ügynökök példájával a szélsőségek keresésének elsüllyedő mechanizmusát vesszük szemügyre a kereskedés évre vonatkozó záróárfolyamaiban és az ügynökök kronindításában.