Úlohy na pozadí Fausta, Časť II: Agenti a tímy

Úlohy na pozadí Fausta, Časť II: Agenti a tímy

obsah

  1. Časť I: Úvod

  2. Časť II: Agenti a tímy

čo tu robíme?

Takže druhá časť. Ako bolo napísané vyššie, v ňom urobíme nasledovné:

  1. Napíšme malého klienta pre alphavantage na aiohttp s požiadavkami na koncové body, ktoré potrebujeme.

  2. Vytvorme agenta, ktorý bude zbierať údaje o cenných papieroch a metainformácie o nich.

Ale to je to, čo urobíme pre samotný projekt a v rámci faustovho výskumu sa naučíme písať agentov, ktorí spracovávajú streamované udalosti z kafka, ako aj písať príkazy (click wrapper), v našom prípade - pre manuálne push správy na tému, ktorú agent monitoruje.

Tréning

Klient AlphaVantage

Najprv napíšme malého klienta aiohttp pre požiadavky na alphavantage.

alphavantage.py

Spojler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

V skutočnosti je z toho všetko jasné:

  1. AlphaVantage API je celkom jednoducho a krásne navrhnuté, takže som sa rozhodol všetky požiadavky vykonať prostredníctvom tejto metódy construct_query kde zasa prebieha volanie http.

  2. Prinášam všetky polia snake_case pre pohodlie.

  3. Dekorácia logger.catch pre krásny a informatívny výstup sledovania.

PS Nezabudnite pridať token alphavantage lokálne do config.yml alebo exportovať premennú prostredia HORTON_SERVICE_APIKEY. Dostávame token tu.

trieda CRUD

Budeme mať zbierku cenných papierov na ukladanie meta informácií o cenných papieroch.

database/security.py

Podľa mňa tu netreba nič vysvetľovať a samotná základná trieda je celkom jednoduchá.

get_app()

Pridajme funkciu na vytvorenie objektu aplikácie v app.py

Spojler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Zatiaľ budeme mať najjednoduchšiu tvorbu aplikácie, o niečo neskôr ju rozšírime, aby ste však nenechali čakať, tu referencie do triedy aplikácií. Odporúčam vám tiež pozrieť sa na triedu nastavení, pretože je zodpovedná za väčšinu nastavení.

Hlavná časť

Agent pre zber a vedenie zoznamu cenných papierov

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Najprv teda získame objekt aplikácie faust - je to celkom jednoduché. Ďalej explicitne deklarujeme tému pre nášho agenta... Tu stojí za zmienku, čo to je, aký je interný parameter a ako sa to dá zariadiť inak.

  1. Témy v kafke, ak chceme poznať presnú definíciu, je lepšie si ju prečítať vypnuté. dokument, alebo si môžete prečítať abstraktné na Habré v ruštine, kde je všetko aj celkom presne odzrkadlené :)

  2. Interný parameter, celkom dobre popísaný vo faust doc, nám umožňuje nakonfigurovať tému priamo v kóde, samozrejme to znamená parametre poskytnuté vývojármi faust, napr.: retencia, retenčná politika (štandardne mazať, ale môžete kompaktné), počet oddielov na tému (skórerobiť napríklad menej ako celosvetový význam aplikácie faust).

  3. Vo všeobecnosti môže agent vytvoriť spravovanú tému s globálnymi hodnotami, rád však všetko deklarujem explicitne. Okrem toho niektoré parametre (napríklad počet oddielov alebo politika uchovávania) témy v reklame agenta nemožno nakonfigurovať.

    Takto by to mohlo vyzerať bez manuálneho definovania témy:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No a teraz si popíšme, čo náš agent urobí :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Takže na začiatku agenta otvoríme reláciu aiohttp pre požiadavky prostredníctvom nášho klienta. Pri spustení pracovníka sa teda pri spustení nášho agenta okamžite otvorí relácia – jedna, po celú dobu spustenia pracovníka (alebo niekoľko, ak zmeníte parameter súbežnosť od agenta s predvolenou jednotkou).

Ďalej sledujeme prúd (správu umiestnime do _, keďže nám v tomto agentovi nezáleží na obsahu) správ z našej témy, ak existujú pri aktuálnom posune, inak náš cyklus počká na ich príchod. Vo vnútri našej slučky zaprotokolujeme prijatie správy, získame zoznam aktívnych (štandardne sa get_securities vracia iba aktívny, pozri kód klienta) cenných papierov a uložíme ho do databázy, pričom skontrolujeme, či existuje cenný papier s rovnakým tickerom a výmena v databáze, ak existuje, potom sa (papier) jednoducho aktualizuje.

Poďme spustiť našu tvorbu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Vlastnosti PS webový komponent V článkoch nebudem uvažovať nad faustom, preto nastavujeme príslušný príznak.

V našom príkaze na spustenie sme faustovi povedali, kde má hľadať objekt aplikácie a čo s ním robiť (spustiť pracovníka) s úrovňou výstupu info logu. Získame nasledujúci výstup:

Spojler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Je nažive!!!

Pozrime sa na sadu oddielov. Ako vidíme, bola vytvorená téma s názvom, ktorý sme určili v kóde, predvoleným počtom oddielov (8, prevzaté z topic_partitions - parameter aplikačného objektu), keďže sme nešpecifikovali individuálnu hodnotu pre našu tému (cez oddiely). Spustený agent v robotovi má priradených všetkých 8 oddielov, keďže je jediný, ale o tom bude podrobnejšie popísané v časti o klastrovaní.

Teraz môžeme prejsť do iného okna terminálu a poslať prázdnu správu našej téme:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

Použitie PS @ ukazujeme, že posielame správu téme s názvom „collect_securities“.

V tomto prípade sa správa dostala na oddiel 6 - môžete to skontrolovať tak, že prejdete na kafdrop on localhost:9000

Keď prejdeme do okna terminálu s naším pracovníkom, uvidíme šťastnú správu odoslanú pomocou loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Môžeme sa tiež pozrieť do mongo (pomocou Robo3T alebo Studio3T) a vidieť, že cenné papiere sú v databáze:

Nie som miliardár, a preto sme spokojní s prvou možnosťou zobrazenia.

Úlohy na pozadí Fausta, Časť II: Agenti a tímyÚlohy na pozadí Fausta, Časť II: Agenti a tímy

Šťastie a radosť - prvý agent je pripravený :)

Agent pripravený, nech žije nový agent!

Áno, páni, prešli sme len 1/3 cesta pripraveného týmto článkom, ale nenechajte sa odradiť, pretože teraz to bude jednoduchšie.

Takže teraz potrebujeme agenta, ktorý zbiera meta informácie a vkladá ich do zberného dokumentu:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Keďže tento agent bude spracovávať informácie o konkrétnom cennom papieri, musíme v správe uviesť ticker (symbol) tohto zabezpečenia. Na tento účel vo faust existujú Evidencia — triedy, ktoré deklarujú schému správ v téme agenta.

V tomto prípade poďme na záznamy.py a popíšte, ako by mala vyzerať správa pre túto tému:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Ako ste možno uhádli, faust používa anotáciu typu python na popis schémy správ, a preto je minimálna verzia podporovaná knižnicou 3.6.

Vráťme sa k agentovi, nastavte typy a pridajte ho:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ako vidíte, metóde inicializácie témy odovzdávame nový parameter so schémou - typ_hodnoty. Ďalej sa všetko riadi rovnakou schémou, takže nevidím dôvod zaoberať sa niečím iným.

Posledným krokom je pridanie hovoru agentovi zhromažďovania meta informácií na collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Pre správu používame predtým oznámenú schému. V tomto prípade som použil metódu .cast, pretože nemusíme čakať na výsledok od agenta, ale stojí za zmienku, že spôsoby poslať správu k téme:

  1. obsadenie - neblokuje, pretože neočakáva výsledok. Výsledok nemôžete poslať do inej témy ako správu.

  2. odoslať - neblokuje, pretože neočakáva výsledok. V téme môžete určiť agenta, ku ktorému sa dostane výsledok.

  3. opýtať sa - čaká na výsledok. V téme môžete určiť agenta, ku ktorému sa dostane výsledok.

Takže, to je na dnes všetko s agentmi!

Tím snov

Posledná vec, ktorú som sľúbil napísať v tejto časti, sú príkazy. Ako už bolo spomenuté, príkazy vo fauste sú obalom okolo kliknutia. V skutočnosti faust pri zadávaní klávesu -A jednoducho pripojí náš vlastný príkaz k svojmu rozhraniu

Po ohlásených agentoch v agenti.py pridať funkciu s dekoratérom app.commandvolanie metódy obsadenie у zbierať_cenné papiere:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ak teda zavoláme zoznam príkazov, náš nový príkaz v ňom bude:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Môžeme ho použiť ako ktokoľvek iný, takže reštartujme faust workera a začnime s plnohodnotnou zbierkou cenných papierov:

> faust -A horton.agents start-collect-securities

čo bude ďalej?

V ďalšej časti sa na príklade zvyšných agentov pozrieme na sink mechanizmus na hľadanie extrémov v záverečných cenách obchodovania na daný rok a spustenie cronu agentov.

To je na dnes všetko! Vďaka za prečítanie :)

Kód pre túto časť

Úlohy na pozadí Fausta, Časť II: Agenti a tímy

PS Pod poslednou časťou sa ma pýtali na faust a splývajúcu kafku (aké vlastnosti má confluent?). Zdá sa, že confluent je v mnohých smeroch funkčnejší, ale faktom je, že faust nemá plnú klientsku podporu pre confluent - vyplýva to z popisy klientských obmedzení v doc.

Zdroj: hab.com

Pridať komentár