HĂĄttĂ©rfeladatok a FaustrĂłl, II. rĂ©sz: Ügynökök Ă©s csapatok

HĂĄttĂ©rfeladatok a FaustrĂłl, II. rĂ©sz: Ügynökök Ă©s csapatok

tartalomjegyzék

  1. I. rész: Bevezetés

  2. II. rĂ©sz: Ügynökök Ă©s csapatok

Mit csinĂĄlunk itt?

SzĂłval, szĂłval, a mĂĄsodik rĂ©sz. Ahogy korĂĄbban Ă­rtuk, ebben a következƑket fogjuk tenni:

  1. Írjunk egy kis klienst az alphavantage szĂĄmĂĄra az aiohttp-n a szĂŒksĂ©ges vĂ©gpontok kĂ©rĂ©sĂ©vel.

  2. Hozzunk lĂ©tre egy ĂŒgynököt, aki adatokat gyƱjt az Ă©rtĂ©kpapĂ­rokrĂłl Ă©s metainformĂĄciĂłkat azokrĂłl.

De ezt fogjuk megtenni magĂĄnak a projektnek, Ă©s a faust-kutatĂĄs szempontjĂĄbĂłl megtanuljuk, hogyan Ă­rjunk ĂŒgynököket, amelyek feldolgozzĂĄk a kafkĂĄbĂłl szĂĄrmazĂł stream esemĂ©nyeket, valamint hogyan Ă­rjunk parancsokat (a mi esetĂŒnkben kattintson a wrapperre) az ĂŒgynök ĂĄltal figyelt tĂ©makörhöz kĂŒldött kĂ©zi push ĂŒzenetekhez.

Edzés

AlphaVantage kliens

ElƑször Ă­rjunk egy kis aiohttp klienst az alphavantage kĂ©rĂ©sekhez.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

TulajdonkĂ©ppen minden vilĂĄgos belƑle:

  1. Az AlphaVantage API meglehetƑsen egyszerƱ Ă©s gyönyörƱen megtervezett, ezĂ©rt Ășgy döntöttem, hogy minden kĂ©rĂ©st a mĂłdszerrel teszek meg construct_query ahol viszont van egy http hĂ­vĂĄs.

  2. elhozom az összes mezƑt snake_case szĂŒksĂ©gszerƱsĂ©g miatt.

  3. Nos, a logger.catch dekoråció a gyönyörƱ és informatív nyomkövetési eredményért.

PS Ne felejtse el helyileg hozzåadni az alphavantage tokent a config.yml fåjlhoz, vagy exportålni a környezeti våltozót HORTON_SERVICE_APIKEY. Kapunk egy tokent itt.

CRUD osztĂĄly

ÉrtĂ©kpapĂ­r-gyƱjtemĂ©nyĂŒnk lesz az Ă©rtĂ©kpapĂ­rokkal kapcsolatos metainformĂĄciĂłk tĂĄrolĂĄsĂĄra.

adatbĂĄzis/security.py

VĂ©lemĂ©nyem szerint itt nem kell semmit magyarĂĄzni, Ă©s maga az alaposztĂĄly meglehetƑsen egyszerƱ.

alkalmazås beszerzése()

Adjunk hozzĂĄ egy fĂŒggvĂ©nyt egy alkalmazĂĄsobjektum lĂ©trehozĂĄsĂĄhoz app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

EgyelƑre a legegyszerƱbb alkalmazĂĄskĂ©szĂ­tĂ©s ĂĄll rendelkezĂ©sĂŒnkre, kicsit kĂ©sƑbb bƑvĂ­tjĂŒk, de hogy ne kelljen vĂĄrakoznia, itt hivatkozĂĄsok az App-osztĂĄlyhoz. Azt is tanĂĄcsolom, hogy vessen egy pillantĂĄst a beĂĄllĂ­tĂĄsi osztĂĄlyra, mivel ez felelƑs a legtöbb beĂĄllĂ­tĂĄsĂ©rt.

FƑ rĂ©sz

Ügynök az Ă©rtĂ©kpapĂ­rok listĂĄjĂĄnak összegyƱjtĂ©sĂ©re Ă©s karbantartĂĄsĂĄra

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

TehĂĄt elƑször megkapjuk a faust alkalmazĂĄs objektumot - ez meglehetƑsen egyszerƱ. EzutĂĄn kifejezetten deklarĂĄlunk egy tĂ©mĂĄt az ĂŒgynökĂŒnk szĂĄmĂĄra... Itt Ă©rdemes megemlĂ­teni, hogy mi az, mi a belsƑ paramĂ©ter, Ă©s hogyan lehet ezt mĂĄskĂ©pp elrendezni.

  1. TĂ©mĂĄk kafkĂĄban, ha a pontos definĂ­ciĂłt akarjuk tudni, Ă©rdemes elolvasni ki. dokumentum, vagy olvashatsz absztrakt a HabrĂ©n oroszul, ahol szintĂ©n elĂ©g pontosan tĂŒkrözƑdik minden :)

  2. BelsƑ paramĂ©ter, a faust doc-ban elĂ©g jĂłl le van Ă­rva, lehetƑvĂ© teszi, hogy közvetlenĂŒl a kĂłdban ĂĄllĂ­tsuk be a tĂ©mĂĄt, termĂ©szetesen ez a faust fejlesztƑk ĂĄltal megadott paramĂ©tereket jelenti, pl.: retention, retention policy (alapĂ©rtelmezĂ©s szerint törlĂ©s, de beĂĄllĂ­thatĂł kompakt), partĂ­ciĂłk szĂĄma tĂ©mĂĄnkĂ©nt (vĂĄlaszfalakhogy pĂ©ldĂĄul kevesebbet, mint globĂĄlis Ă©rtĂ©k alkalmazĂĄsok faust).

  3. ÁltalĂĄnossĂĄgban elmondhatĂł, hogy az ĂŒgynök lĂ©trehozhat globĂĄlis Ă©rtĂ©kekkel kezelt tĂ©mĂĄt, de Ă©n szeretek mindent kifejezetten deklarĂĄlni. EzenkĂ­vĂŒl az ĂŒgynökhirdetĂ©sben szereplƑ tĂ©makör nĂ©hĂĄny paramĂ©tere (pĂ©ldĂĄul a partĂ­ciĂłk szĂĄma vagy a megƑrzĂ©si szabĂĄlyzat) nem konfigurĂĄlhatĂł.

    Így nĂ©zhet ki a tĂ©ma manuĂĄlis meghatĂĄrozĂĄsa nĂ©lkĂŒl:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nos, most Ă­rjuk le, mit fog tenni az ĂŒgynökĂŒnk :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

TehĂĄt az ĂŒgynök elejĂ©n megnyitunk egy aiohttp munkamenetet az ĂŒgyfelĂŒnkön keresztĂŒli kĂ©rĂ©sekhez. Így egy dolgozĂł indĂ­tĂĄsakor, amikor az ĂŒgynökĂŒnk elindul, azonnal megnyĂ­lik egy munkamenet - egy, a dolgozĂł teljes futĂĄsi idejĂ©re (vagy több, ha megvĂĄltoztatja a paramĂ©tert egyidejƱsĂ©g alapĂ©rtelmezett egysĂ©ggel rendelkezƑ ĂŒgynöktƑl).

EzutĂĄn követjĂŒk a folyamot (behelyezzĂŒk az ĂŒzenetet _, mivel mi ebben az ĂŒgynökben nem törƑdĂŒnk a tĂ©mĂĄnk ĂŒzeneteinek tartalmĂĄval, ha lĂ©teznek az aktuĂĄlis eltolĂĄsnĂĄl, kĂŒlönben a ciklusunk megvĂĄrja Ă©rkezĂ©sĂŒket. Nos, a ciklusunkon belĂŒl naplĂłzzuk az ĂŒzenet beĂ©rkezĂ©sĂ©t, megkapjuk az aktĂ­v (a get_securities alapĂ©rtelmezĂ©s szerint csak aktĂ­v Ă©rtĂ©kkel tĂ©r vissza, lĂĄsd az ĂŒgyfĂ©lkĂłdot) Ă©rtĂ©kpapĂ­rok listĂĄjĂĄt, Ă©s elmentjĂŒk az adatbĂĄzisba, ellenƑrizve, hogy van-e ugyanazzal a tickerrel rendelkezƑ Ă©rtĂ©kpapĂ­r Ă©s Exchange az adatbĂĄzisban, ha van, akkor az (a papĂ­r) egyszerƱen frissĂŒl.

IndĂ­tsuk el alkotĂĄsunkat!

> docker-compose up -d
... ЗапусĐș ĐșĐŸĐœŃ‚Đ”ĐčĐœĐ”Ń€ĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info

PS jellemzƑk web komponens Faust nem fogom figyelembe venni a cikkekben, ezĂ©rt kitƱztĂŒk a megfelelƑ zĂĄszlĂłt.

IndĂ­tĂĄsi parancsunkban megmondtuk a faustnak, hogy hol kell keresni az alkalmazĂĄsobjektumot, Ă©s mit kell tenni vele (worker indĂ­tĂĄsa) az infonaplĂł kimeneti szintjĂ©vel. A következƑ kimenetet kapjuk:

Spoiler

ڟa”S† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┮───────────────────────────────────────────────────┘
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┌─────────────
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┮────────────┘ 

Életben van!!!

NĂ©zzĂŒk a partĂ­ciĂłkĂ©szletet. Amint lĂĄtjuk, lĂ©trejött egy tĂ©makör a kĂłdban megadott nĂ©vvel, a partĂ­ciĂłk alapĂ©rtelmezett szĂĄmĂĄval (8 topic_partitions - alkalmazĂĄsobjektum paramĂ©ter), mivel tĂ©mĂĄnknak nem adtunk meg egyedi Ă©rtĂ©ket (partĂ­ciĂłkon keresztĂŒl). A workerben elindĂ­tott ĂŒgynökhöz mind a 8 partĂ­ciĂł hozzĂĄ van rendelve, mivel ez az egyetlen, de errƑl a fĂŒrtözĂ©srƑl szĂłlĂł rĂ©szben lesz mĂ©g szĂł.

Nos, most egy mĂĄsik terminĂĄl ablakba lĂ©phetĂŒnk, Ă©s ĂŒres ĂŒzenetet kĂŒldhetĂŒnk a tĂ©mĂĄnknak:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS hasznĂĄlata @ megmutatjuk, hogy ĂŒzenetet kĂŒldĂŒnk a „collect_securities” nevƱ tĂ©mĂĄhoz.

Ebben az esetben az ĂŒzenet a 6-os partĂ­ciĂłhoz ment – ​​ezt a kafdrop on-ra lĂ©pve ellenƑrizheti localhost:9000

MunkatĂĄrsunkkal a terminĂĄlablakba lĂ©pve egy boldog ĂŒzenetet fogunk lĂĄtni, amelyet loguru hasznĂĄlatĂĄval kĂŒldĂŒnk:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

MegnĂ©zhetjĂŒk a mongo-t is (Robo3T vagy Studio3T segĂ­tsĂ©gĂ©vel), Ă©s lĂĄthatjuk, hogy az Ă©rtĂ©kpapĂ­rok az adatbĂĄzisban vannak:

Nem vagyok milliĂĄrdos, ezĂ©rt elĂ©gedettek vagyunk az elsƑ megtekintĂ©si lehetƑsĂ©ggel.

HĂĄttĂ©rfeladatok a FaustrĂłl, II. rĂ©sz: Ügynökök Ă©s csapatokHĂĄttĂ©rfeladatok a FaustrĂłl, II. rĂ©sz: Ügynökök Ă©s csapatok

BoldogsĂĄg Ă©s öröm – kĂ©sz az elsƑ ĂŒgynök :)

Ügynök kĂ©sz, Ă©ljen az Ășj ĂŒgynök!

Igen, uraim, mĂ©g csak az 1/3-ĂĄt jĂĄrtuk be a cikk ĂĄltal elƑkĂ©szĂ­tett Ăștnak, de ne csĂŒggedjenek, mert most könnyebb lesz.

TehĂĄt most szĂŒksĂ©gĂŒnk van egy ĂŒgynökre, amely összegyƱjti a metainformĂĄciĂłkat Ă©s elhelyezi egy gyƱjtƑdokumentumban:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Mivel ez az ĂŒgynök egy adott Ă©rtĂ©kpapĂ­rral kapcsolatos informĂĄciĂłkat dolgoz fel, az ĂŒzenetben fel kell tĂŒntetnĂŒnk ennek a biztonsĂĄgi Ă©rtĂ©knek a tickerjĂ©t (szimbĂłlumĂĄt). Erre a cĂ©lra faustban vannak Records — osztĂĄlyok, amelyek deklarĂĄljĂĄk az ĂŒzenetsĂ©mĂĄt az ĂŒgynök tĂ©makörben.

Ebben az esetben menjĂŒnk tovĂĄbb records.py Ă©s Ă­rja le, hogyan kell kinĂ©znie a tĂ©ma ĂŒzenetĂ©nek:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Ahogy azt sejteni lehetett, a Faust a python tĂ­pusĂș annotĂĄciĂłt hasznĂĄlja az ĂŒzenetsĂ©ma leĂ­rĂĄsĂĄra, ezĂ©rt a könyvtĂĄr ĂĄltal tĂĄmogatott minimĂĄlis verziĂł 3.6.

TĂ©rjĂŒnk vissza az ĂŒgynökhöz, ĂĄllĂ­tsuk be a tĂ­pusokat Ă©s adjuk hozzĂĄ:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Mint lĂĄthatĂł, egy Ășj paramĂ©tert adunk ĂĄt egy sĂ©mĂĄval a tĂ©ma inicializĂĄlĂĄsi metĂłdusĂĄnak - value_type. TovĂĄbbĂĄ minden ugyanazt a sĂ©mĂĄt követi, Ă­gy nem lĂĄtom Ă©rtelmĂ©t annak, hogy mĂĄssal foglalkozzak.

Nos, az utolsĂł simĂ­tĂĄs az, hogy fel kell hĂ­vni a metainformĂĄciĂł-gyƱjtƑ ĂŒgynököt a collection_securitites-re:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Az ĂŒzenethez a korĂĄbban bejelentett sĂ©mĂĄt hasznĂĄljuk. Ebben az esetben a .cast mĂłdszert alkalmaztam, mivel nem kell megvĂĄrnunk az ĂŒgynök eredmĂ©nyĂ©t, de Ă©rdemes megemlĂ­teni, hogy mĂłdja kĂŒldj ĂŒzenetet a tĂ©mĂĄhoz:

  1. leadott - nem blokkol, mert nem vĂĄr eredmĂ©nyt. Az eredmĂ©nyt nem kĂŒldheti el ĂŒzenetkĂ©nt mĂĄsik tĂ©mĂĄba.

  2. kĂŒldĂ©s - nem blokkol, mert nem vĂĄr eredmĂ©nyt. A tĂ©makörben megadhat egy ĂŒgynököt, amelyhez az eredmĂ©ny kerĂŒl.

  3. kĂ©rdezni - eredmĂ©nyre vĂĄr. A tĂ©makörben megadhat egy ĂŒgynököt, amelyhez az eredmĂ©ny kerĂŒl.

TehĂĄt mĂĄra ennyi az ĂŒgynökökkel!

Álom csapat

Az utolsĂł dolog, amit ebbe a rĂ©szben Ă­gĂ©rtem, a parancsok. Amint korĂĄbban emlĂ­tettĂŒk, a faust parancsok egy kattintĂĄs körĂŒli elemek. ValĂłjĂĄban a Faust egyszerƱen csatolja az egyĂ©ni parancsunkat a felĂŒletĂ©hez, amikor megadja az -A billentyƱt

MiutĂĄn a bejelentett ĂŒgynökök be ĂŒgynökök.py adjunk hozzĂĄ egy funkciĂłt egy dekorĂĄtorral app.commandmĂłdszernek nevezve öntött у gyƱjt_Ă©rtĂ©kpapĂ­rokat:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Így, ha meghĂ­vjuk a parancsok listĂĄjĂĄt, az Ășj parancsunk benne lesz:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

HasznĂĄlhatjuk, mint bĂĄrki mĂĄs, tehĂĄt indĂ­tsuk Ășjra a faust workert, Ă©s kezdjĂŒk el az Ă©rtĂ©kpapĂ­rok teljes Ă©rtĂ©kƱ gyƱjtĂ©sĂ©t:

> faust -A horton.agents start-collect-securities

Mi fog ezutån történni?

A következƑ rĂ©szben a fennmaradĂł ĂŒgynökök pĂ©ldĂĄjĂĄval a szĂ©lsƑsĂ©gek keresĂ©sĂ©nek elsĂŒllyedƑ mechanizmusĂĄt vesszĂŒk szemĂŒgyre a kereskedĂ©s Ă©vre vonatkozĂł zĂĄróårfolyamaiban Ă©s az ĂŒgynökök kronindĂ­tĂĄsĂĄban.

Ez minden måra! Köszönöm, hogy elolvasta :)

Ennek a résznek a kódja

HĂĄttĂ©rfeladatok a FaustrĂłl, II. rĂ©sz: Ügynökök Ă©s csapatok

PS Az utolsĂł rĂ©sz alatt faust Ă©s összefolyĂł kafkĂĄrĂłl kĂ©rdeztek (milyen tulajdonsĂĄgokkal rendelkezik a confluent?). Úgy tƱnik, hogy a confluent több szempontbĂłl is funkcionĂĄlisabb, de tĂ©ny, hogy a Faust nem rendelkezik teljes kliens tĂĄmogatĂĄssal a confluenthez - ez következik abbĂłl, a kliens korlĂĄtozĂĄsok leĂ­rĂĄsa a dokumentumban.

ForrĂĄs: will.com

HozzĂĄszĂłlĂĄs