Háttérfeladatok a Faustról, II. rész: Ügynökök és csapatok

Háttérfeladatok a Faustról, II. rész: Ügynökök és csapatok

tartalomjegyzék

  1. I. rész: Bevezetés

  2. II. rész: Ügynökök és csapatok

Mit csinálunk itt?

Szóval, szóval, a második rész. Ahogy korábban írtuk, ebben a következőket fogjuk tenni:

  1. Írjunk egy kis klienst az alphavantage számára az aiohttp-n a szükséges végpontok kérésével.

  2. Hozzunk létre egy ügynököt, aki adatokat gyűjt az értékpapírokról és metainformációkat azokról.

De ezt fogjuk megtenni magának a projektnek, és a faust-kutatás szempontjából megtanuljuk, hogyan írjunk ügynököket, amelyek feldolgozzák a kafkából származó stream eseményeket, valamint hogyan írjunk parancsokat (a mi esetünkben kattintson a wrapperre) az ügynök által figyelt témakörhöz küldött kézi push üzenetekhez.

Edzés

AlphaVantage kliens

Először írjunk egy kis aiohttp klienst az alphavantage kérésekhez.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Tulajdonképpen minden világos belőle:

  1. Az AlphaVantage API meglehetősen egyszerű és gyönyörűen megtervezett, ezért úgy döntöttem, hogy minden kérést a módszerrel teszek meg construct_query ahol viszont van egy http hívás.

  2. elhozom az összes mezőt snake_case szükségszerűség miatt.

  3. Nos, a logger.catch dekoráció a gyönyörű és informatív nyomkövetési eredményért.

PS Ne felejtse el helyileg hozzáadni az alphavantage tokent a config.yml fájlhoz, vagy exportálni a környezeti változót HORTON_SERVICE_APIKEY. Kapunk egy tokent itt.

CRUD osztály

Értékpapír-gyűjteményünk lesz az értékpapírokkal kapcsolatos metainformációk tárolására.

adatbázis/security.py

Véleményem szerint itt nem kell semmit magyarázni, és maga az alaposztály meglehetősen egyszerű.

alkalmazás beszerzése()

Adjunk hozzá egy függvényt egy alkalmazásobjektum létrehozásához app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Egyelőre a legegyszerűbb alkalmazáskészítés áll rendelkezésünkre, kicsit később bővítjük, de hogy ne kelljen várakoznia, itt hivatkozások az App-osztályhoz. Azt is tanácsolom, hogy vessen egy pillantást a beállítási osztályra, mivel ez felelős a legtöbb beállításért.

Fő rész

Ügynök az értékpapírok listájának összegyűjtésére és karbantartására

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Tehát először megkapjuk a faust alkalmazás objektumot - ez meglehetősen egyszerű. Ezután kifejezetten deklarálunk egy témát az ügynökünk számára... Itt érdemes megemlíteni, hogy mi az, mi a belső paraméter, és hogyan lehet ezt másképp elrendezni.

  1. Témák kafkában, ha a pontos definíciót akarjuk tudni, érdemes elolvasni ki. dokumentum, vagy olvashatsz absztrakt a Habrén oroszul, ahol szintén elég pontosan tükröződik minden :)

  2. Belső paraméter, a faust doc-ban elég jól le van írva, lehetővé teszi, hogy közvetlenül a kódban állítsuk be a témát, természetesen ez a faust fejlesztők által megadott paramétereket jelenti, pl.: retention, retention policy (alapértelmezés szerint törlés, de beállítható kompakt), partíciók száma témánként (válaszfalakhogy például kevesebbet, mint globális érték alkalmazások faust).

  3. Általánosságban elmondható, hogy az ügynök létrehozhat globális értékekkel kezelt témát, de én szeretek mindent kifejezetten deklarálni. Ezenkívül az ügynökhirdetésben szereplő témakör néhány paramétere (például a partíciók száma vagy a megőrzési szabályzat) nem konfigurálható.

    Így nézhet ki a téma manuális meghatározása nélkül:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nos, most írjuk le, mit fog tenni az ügynökünk :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Tehát az ügynök elején megnyitunk egy aiohttp munkamenetet az ügyfelünkön keresztüli kérésekhez. Így egy dolgozó indításakor, amikor az ügynökünk elindul, azonnal megnyílik egy munkamenet - egy, a dolgozó teljes futási idejére (vagy több, ha megváltoztatja a paramétert egyidejűség alapértelmezett egységgel rendelkező ügynöktől).

Ezután követjük a folyamot (behelyezzük az üzenetet _, mivel mi ebben az ügynökben nem törődünk a témánk üzeneteinek tartalmával, ha léteznek az aktuális eltolásnál, különben a ciklusunk megvárja érkezésüket. Nos, a ciklusunkon belül naplózzuk az üzenet beérkezését, megkapjuk az aktív (a get_securities alapértelmezés szerint csak aktív értékkel tér vissza, lásd az ügyfélkódot) értékpapírok listáját, és elmentjük az adatbázisba, ellenőrizve, hogy van-e ugyanazzal a tickerrel rendelkező értékpapír és Exchange az adatbázisban, ha van, akkor az (a papír) egyszerűen frissül.

Indítsuk el alkotásunkat!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS jellemzők web komponens Faust nem fogom figyelembe venni a cikkekben, ezért kitűztük a megfelelő zászlót.

Indítási parancsunkban megmondtuk a faustnak, hogy hol kell keresni az alkalmazásobjektumot, és mit kell tenni vele (worker indítása) az infonapló kimeneti szintjével. A következő kimenetet kapjuk:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Életben van!!!

Nézzük a partíciókészletet. Amint látjuk, létrejött egy témakör a kódban megadott névvel, a partíciók alapértelmezett számával (8 topic_partitions - alkalmazásobjektum paraméter), mivel témánknak nem adtunk meg egyedi értéket (partíciókon keresztül). A workerben elindított ügynökhöz mind a 8 partíció hozzá van rendelve, mivel ez az egyetlen, de erről a fürtözésről szóló részben lesz még szó.

Nos, most egy másik terminál ablakba léphetünk, és üres üzenetet küldhetünk a témánknak:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS használata @ megmutatjuk, hogy üzenetet küldünk a „collect_securities” nevű témához.

Ebben az esetben az üzenet a 6-os partícióhoz ment – ​​ezt a kafdrop on-ra lépve ellenőrizheti localhost:9000

Munkatársunkkal a terminálablakba lépve egy boldog üzenetet fogunk látni, amelyet loguru használatával küldünk:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Megnézhetjük a mongo-t is (Robo3T vagy Studio3T segítségével), és láthatjuk, hogy az értékpapírok az adatbázisban vannak:

Nem vagyok milliárdos, ezért elégedettek vagyunk az első megtekintési lehetőséggel.

Háttérfeladatok a Faustról, II. rész: Ügynökök és csapatokHáttérfeladatok a Faustról, II. rész: Ügynökök és csapatok

Boldogság és öröm – kész az első ügynök :)

Ügynök kész, éljen az új ügynök!

Igen, uraim, még csak az 1/3-át jártuk be a cikk által előkészített útnak, de ne csüggedjenek, mert most könnyebb lesz.

Tehát most szükségünk van egy ügynökre, amely összegyűjti a metainformációkat és elhelyezi egy gyűjtődokumentumban:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Mivel ez az ügynök egy adott értékpapírral kapcsolatos információkat dolgoz fel, az üzenetben fel kell tüntetnünk ennek a biztonsági értéknek a tickerjét (szimbólumát). Erre a célra faustban vannak Records — osztályok, amelyek deklarálják az üzenetsémát az ügynök témakörben.

Ebben az esetben menjünk tovább records.py és írja le, hogyan kell kinéznie a téma üzenetének:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Ahogy azt sejteni lehetett, a Faust a python típusú annotációt használja az üzenetséma leírására, ezért a könyvtár által támogatott minimális verzió 3.6.

Térjünk vissza az ügynökhöz, állítsuk be a típusokat és adjuk hozzá:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Mint látható, egy új paramétert adunk át egy sémával a téma inicializálási metódusának - value_type. Továbbá minden ugyanazt a sémát követi, így nem látom értelmét annak, hogy mással foglalkozzak.

Nos, az utolsó simítás az, hogy fel kell hívni a metainformáció-gyűjtő ügynököt a collection_securitites-re:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Az üzenethez a korábban bejelentett sémát használjuk. Ebben az esetben a .cast módszert alkalmaztam, mivel nem kell megvárnunk az ügynök eredményét, de érdemes megemlíteni, hogy módja küldj üzenetet a témához:

  1. leadott - nem blokkol, mert nem vár eredményt. Az eredményt nem küldheti el üzenetként másik témába.

  2. küldés - nem blokkol, mert nem vár eredményt. A témakörben megadhat egy ügynököt, amelyhez az eredmény kerül.

  3. kérdezni - eredményre vár. A témakörben megadhat egy ügynököt, amelyhez az eredmény kerül.

Tehát mára ennyi az ügynökökkel!

Álom csapat

Az utolsó dolog, amit ebbe a részben ígértem, a parancsok. Amint korábban említettük, a faust parancsok egy kattintás körüli elemek. Valójában a Faust egyszerűen csatolja az egyéni parancsunkat a felületéhez, amikor megadja az -A billentyűt

Miután a bejelentett ügynökök be ügynökök.py adjunk hozzá egy funkciót egy dekorátorral app.commandmódszernek nevezve öntött у gyűjt_értékpapírokat:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Így, ha meghívjuk a parancsok listáját, az új parancsunk benne lesz:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Használhatjuk, mint bárki más, tehát indítsuk újra a faust workert, és kezdjük el az értékpapírok teljes értékű gyűjtését:

> faust -A horton.agents start-collect-securities

Mi fog ezután történni?

A következő részben a fennmaradó ügynökök példájával a szélsőségek keresésének elsüllyedő mechanizmusát vesszük szemügyre a kereskedés évre vonatkozó záróárfolyamaiban és az ügynökök kronindításában.

Ez minden mára! Köszönöm, hogy elolvasta :)

Ennek a résznek a kódja

Háttérfeladatok a Faustról, II. rész: Ügynökök és csapatok

PS Az utolsó rész alatt faust és összefolyó kafkáról kérdeztek (milyen tulajdonságokkal rendelkezik a confluent?). Úgy tűnik, hogy a confluent több szempontból is funkcionálisabb, de tény, hogy a Faust nem rendelkezik teljes kliens támogatással a confluenthez - ez következik abból, a kliens korlátozások leírása a dokumentumban.

Forrás: will.com

Hozzászólás