Úkoly na pozadí Fausta, část II: Agenti a týmy

Úkoly na pozadí Fausta, část II: Agenti a týmy

obsah

  1. Část I: Úvod

  2. Část II: Agenti a týmy

co tu děláme?

Takže druhá část. Jak bylo napsáno dříve, v něm provedeme následující:

  1. Pojďme napsat malého klienta pro alphavantage na aiohttp s požadavky na koncové body, které potřebujeme.

  2. Vytvořme agenta, který bude sbírat data o cenných papírech a metainformace o nich.

Ale to je to, co uděláme pro samotný projekt a z hlediska faustového výzkumu se naučíme psát agenty, kteří zpracovávají streamové události z kafka, a také jak psát příkazy (click wrapper), v našem případě - pro ruční push zprávy k tématu, které agent sleduje.

Trénink

Klient AlphaVantage

Nejprve napíšeme malého klienta aiohttp pro požadavky na alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Ve skutečnosti je z něj vše jasné:

  1. AlphaVantage API je poměrně jednoduše a krásně navrženo, takže jsem se rozhodl všechny požadavky provádět prostřednictvím této metody construct_query kde je zase volání http.

  2. Přináším všechna pole snake_case pro pohodlí.

  3. Dekorace logger.catch pro krásný a informativní výstup zpětného sledování.

PS Nezapomeňte přidat token alphavantage lokálně do config.yml nebo exportovat proměnnou prostředí HORTON_SERVICE_APIKEY. Dostáváme žeton zde.

třída CRUD

Budeme mít sbírku cenných papírů k ukládání metainformací o cenných papírech.

databáze/security.py

Dle mého názoru zde není potřeba nic vysvětlovat a samotná základní třída je vcelku jednoduchá.

get_app()

Přidejme funkci pro vytvoření aplikačního objektu v app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Prozatím budeme mít nejjednodušší tvorbu aplikace, o něco později ji rozšíříme, nicméně abyste nečekali, zde Reference do třídy aplikací. Také vám doporučuji podívat se na třídu nastavení, protože je zodpovědná za většinu nastavení.

Hlavní část

Zmocněnec pro shromažďování a vedení seznamu cenných papírů

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nejprve tedy získáme objekt aplikace faust - je to docela jednoduché. Dále výslovně deklarujeme téma pro našeho agenta... Zde stojí za zmínku, co to je, jaký je interní parametr a jak to lze zařídit jinak.

  1. Témata v kafce, pokud chceme znát přesnou definici, je lepší si ji přečíst vypnuto. dokument, nebo si můžete přečíst abstraktní na Habré v ruštině, kde se také vše odráží celkem přesně :)

  2. Parametr interní, celkem dobře popsaný ve faust doc, nám umožňuje konfigurovat téma přímo v kódu, samozřejmě to znamená parametry poskytnuté vývojáři faust, např.: retence, retenční politika (ve výchozím nastavení smazat, ale lze nastavit kompaktní), počet oddílů na téma (skóredělat například méně než globální význam aplikace faust).

  3. Obecně platí, že agent může vytvořit spravované téma s globálními hodnotami, nicméně rád vše deklaruji explicitně. Navíc některé parametry (například počet oddílů nebo zásady uchovávání) tématu v inzerci agenta nelze konfigurovat.

    Zde je návod, jak by to mohlo vypadat bez ručního definování tématu:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No a teď si popišme, co náš agent udělá :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Na začátku agenta tedy otevřeme relaci aiohttp pro požadavky prostřednictvím našeho klienta. Při spouštění pracovníka se tedy při spuštění našeho agenta okamžitě otevře relace – jedna, po celou dobu běhu pracovníka (nebo několik, pokud změníte parametr souběžnost od agenta s výchozí jednotkou).

Dále sledujeme stream (umístíme zprávu do _, jelikož nás v tomto agentu nezajímá obsah) zpráv z našeho tématu, pokud existují v aktuálním offsetu, jinak náš cyklus počká na jejich příchod. No, uvnitř naší smyčky zaprotokolujeme příjem zprávy, získáme seznam aktivních (get_securities vrací pouze aktivní ve výchozím nastavení, viz kód klienta) cenných papírů a uložíme jej do databáze, zkontrolujeme, zda existuje cenný papír se stejným tickerem a výměna v databázi, pokud existuje, pak se (papír) jednoduše aktualizuje.

Pusťme se do našeho výtvoru!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Vlastnosti PS webová komponenta Nebudu v článcích uvažovat o faustovi, proto nastavíme příslušný příznak.

V našem příkazu ke spuštění jsme faustovi řekli, kde hledat objekt aplikace a co s ním dělat (spustit pracovníka) s výstupní úrovní info logu. Získáme následující výstup:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Je to živé!!!

Podívejme se na sadu oddílů. Jak vidíme, bylo vytvořeno téma s názvem, který jsme určili v kódu, výchozím počtem oddílů (8, převzato z téma_oddíly - parametr aplikačního objektu), protože jsme nezadali individuální hodnotu pro naše téma (přes oddíly). Spuštěnému agentovi v workeru je přiřazeno všech 8 oddílů, protože je jediný, ale o tom bude podrobněji pojednáno v části o shlukování.

Nyní můžeme přejít do jiného okna terminálu a odeslat prázdnou zprávu do našeho tématu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

Použití PS @ ukazujeme, že posíláme zprávu tématu s názvem „collect_securities“.

V tomto případě se zpráva přesunula na oddíl 6 - můžete to zkontrolovat přechodem na kafdrop on localhost:9000

Když přejdeme do okna terminálu s naším pracovníkem, uvidíme šťastnou zprávu odeslanou pomocí loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Můžeme se také podívat do mongo (pomocí Robo3T nebo Studio3T) a zjistit, že cenné papíry jsou v databázi:

Nejsem miliardář, a proto se spokojíme s první možností sledování.

Úkoly na pozadí Fausta, část II: Agenti a týmyÚkoly na pozadí Fausta, část II: Agenti a týmy

Štěstí a radost - první agent je připraven :)

Agent připraven, ať žije nový agent!

Ano, pánové, prošli jsme pouze 1/3 cesty připravené tímto článkem, ale nenechte se odradit, protože teď to bude jednodušší.

Nyní tedy potřebujeme agenta, který shromažďuje meta informace a vkládá je do sbírkového dokumentu:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Protože tento agent bude zpracovávat informace o konkrétním zabezpečení, musíme ve zprávě uvést ticker (symbol) tohoto zabezpečení. K tomuto účelu ve faust existují Evidence — třídy, které deklarují schéma zpráv v tématu agenta.

V tomto případě pojďme na záznamy.py a popište, jak by zpráva pro toto téma měla vypadat:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Jak jste možná uhodli, faust používá k popisu schématu zprávy anotaci typu python, a proto je minimální verze podporovaná knihovnou 3.6.

Vraťme se k agentovi, nastavte typy a přidejte jej:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Jak vidíte, předáme metodě inicializace tématu nový parametr se schématem - typ_hodnoty. Dále se vše řídí stejným schématem, takže nevidím smysl zabývat se něčím jiným.

Posledním krokem je přidání volání agenta pro shromažďování meta informací, aby collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Pro zprávu používáme dříve oznámené schéma. V tomto případě jsem použil metodu .cast, protože nemusíme čekat na výsledek od agenta, ale stojí za zmínku, že způsoby poslat zprávu k tématu:

  1. cast - neblokuje, protože neočekává výsledek. Výsledek nemůžete odeslat do jiného tématu jako zprávu.

  2. odeslat - neblokuje, protože neočekává výsledek. V tématu můžete určit agenta, ke kterému bude výsledek převeden.

  3. zeptat se - čeká na výsledek. V tématu můžete určit agenta, ke kterému bude výsledek převeden.

Tak, to je pro dnešek s agenty vše!

Tým snů

Poslední věc, kterou jsem slíbil napsat v tomto díle, jsou příkazy. Jak již bylo zmíněno dříve, příkazy ve faust jsou obalem kolem kliknutí. Ve skutečnosti faust jednoduše připojí náš vlastní příkaz ke svému rozhraní při zadání klávesy -A

Po ohlášení agentů v agenti.py přidat funkci s dekoratérem app.commandvolání metody obsazení у sbírat_ cenné papíry:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Pokud tedy zavoláme seznam příkazů, náš nový příkaz v něm bude:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Můžeme to použít jako kdokoli jiný, takže restartujme faust worker a začněme plnohodnotnou sbírku cenných papírů:

> faust -A horton.agents start-collect-securities

Co se stane příště?

V další části se na příkladu zbývajících agentů podíváme na sink mechanismus pro hledání extrémů v závěrečných cenách obchodování pro daný rok a spuštění cronu agentů.

To je pro dnešek vše! Děkuji za přečtení :)

Kód pro tuto část

Úkoly na pozadí Fausta, část II: Agenti a týmy

PS V minulém díle se mě ptali na faust a splývající kafku (jaké vlastnosti má confluent?). Zdá se, že confluent je v mnoha ohledech funkčnější, ale faktem je, že faust nemá plnou klientskou podporu pro confluent - vyplývá to z popisy klientských omezení v doc.

Zdroj: www.habr.com

Přidat komentář