Naloge v ozadju pri Faustu, II. del: Agenti in ekipe

Naloge v ozadju pri Faustu, II. del: Agenti in ekipe

Kazalo

  1. I. del: Uvod

  2. Del II: Agenti in ekipe

Kaj počnemo tukaj?

Tako, tako, drugi del. Kot smo že napisali, bomo v njem storili naslednje:

  1. Napišimo majhen odjemalec za alphavantage na aiohttp z zahtevami za končne točke, ki jih potrebujemo.

  2. Ustvarimo agenta, ki bo zbiral podatke o vrednostnih papirjih in meta informacije o njih.

Ampak, to je tisto, kar bomo naredili za sam projekt in v smislu raziskovanja fausta se bomo naučili pisati agente, ki obdelujejo tokovne dogodke iz kafke, pa tudi pisati ukaze (click wrapper), v našem primeru - za ročna potisna sporočila na temo, ki jo spremlja agent.

Izobraževanje

Odjemalec AlphaVantage

Najprej napišimo majhen odjemalec aiohttp za zahteve za alphavantage.

alphavantage.py

Spojler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Pravzaprav je iz tega vse jasno:

  1. AlphaVantage API je precej preprosto in lepo oblikovan, zato sem se odločil, da vse zahteve opravim prek metode construct_query kjer je nato klic http.

  2. Prinesem vsa polja snake_case za udobje.

  3. No, dekoracija logger.catch za čudovit in informativen izpis sledenja.

PS Ne pozabite dodati žetona alphavantage lokalno v config.yml ali izvoziti spremenljivko okolja HORTON_SERVICE_APIKEY. Prejmemo žeton tukaj.

razred CRUD

Imeli bomo zbirko vrednostnih papirjev za shranjevanje metainformacij o vrednostnih papirjih.

baza podatkov/varnost.py

Po mojem mnenju tukaj ni treba ničesar razlagati, sam osnovni razred pa je precej preprost.

get_app()

Dodajmo funkcijo za ustvarjanje objekta aplikacije app.py

Spojler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Zaenkrat bomo imeli najenostavnejšo izdelavo aplikacije, malo kasneje jo bomo razširili, da pa ne boste čakali tukaj reference v App-razred. Svetujem vam tudi, da si ogledate razred nastavitev, saj je odgovoren za večino nastavitev.

Glavni organ

Agent za zbiranje in vzdrževanje seznama vrednostnih papirjev

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Torej, najprej dobimo objekt aplikacije faust - to je precej preprosto. Nato eksplicitno deklariramo temo za našega agenta ... Tukaj je vredno omeniti, kaj to je, kaj je notranji parameter in kako je to mogoče drugače urediti.

  1. Teme v Kafki, če želimo vedeti natančno definicijo, je bolje prebrati izklopljeno. dokument, ali lahko preberete nabornik na Habréju v ruščini, kjer se vse odraža tudi precej natančno :)

  2. Notranji parameter, ki je precej dobro opisan v faust dokumentu, nam omogoča, da temo konfiguriramo neposredno v kodi, seveda to pomeni parametre, ki so jih zagotovili razvijalci faust, na primer: zadrževanje, politika zadrževanja (privzeto izbriši, vendar lahko nastavite kompaktna), število particij na temo (rezultatinarediti, na primer, manj kot globalna vrednost aplikacije faust).

  3. Na splošno lahko agent ustvari upravljano temo z globalnimi vrednostmi, vendar rad vse deklariram izrecno. Poleg tega nekaterih parametrov (na primer števila particij ali pravilnika zadrževanja) teme v oglasu posrednika ni mogoče konfigurirati.

    Takole bi lahko izgledalo brez ročne definicije teme:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No, zdaj pa opišemo, kaj bo naredil naš agent :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Torej, na začetku agenta odpremo sejo aiohttp za zahteve prek našega odjemalca. Tako se ob zagonu delavca, ko se zažene naš agent, takoj odpre seja - ena, za ves čas, ko se delavec izvaja (ali več, če spremenite parameter sočasnost od agenta s privzeto enoto).

Nato sledimo toku (sporočilo postavimo v _, saj nas v tem agentu ne zanima vsebina) sporočil iz naše teme, če obstajajo na trenutnem odmiku, sicer bo naš cikel počakal na njihov prihod. No, znotraj naše zanke zabeležimo prejem sporočila, pridobimo seznam aktivnih (get_securities privzeto vrne le aktivne, glejte kodo odjemalca) vrednostnih papirjev in ga shranimo v zbirko podatkov ter preverimo, ali obstaja vrednostni papir z isto oznako in izmenjavo v bazi podatkov , če obstaja, potem bo (papir) preprosto posodobljen.

Zaženimo naše ustvarjanje!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Lastnosti PS spletna komponenta Fausta v člankih ne bom upošteval, zato smo postavili ustrezno zastavico.

V našem ukazu za zagon smo Faustu povedali, kje naj išče objekt aplikacije in kaj naj z njim naredi (zažene delavca) z izhodno ravnjo dnevnika informacij. Dobimo naslednji rezultat:

Spojler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Živo je!!!

Poglejmo nabor particij. Kot lahko vidimo, je bila ustvarjena tema z imenom, ki smo ga določili v kodi, privzeto število particij (8, vzeto iz topic_partitions - parameter objekta aplikacije), saj nismo podali posamezne vrednosti za našo temo (preko particij). Zagnanemu agentu v delavcu je dodeljenih vseh 8 particij, saj je edini, a o tem bomo podrobneje razpravljali v delu o združevanju v gruče.

No, zdaj lahko gremo v drugo okno terminala in pošljemo prazno sporočilo naši temi:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

Uporaba PS @ pokažemo, da pošiljamo sporočilo v temo z imenom “collect_securities”.

V tem primeru je sporočilo šlo na particijo 6 - to lahko preverite tako, da obiščete kafdrop localhost:9000

Ko gremo z našim delavcem v okno terminala, bomo videli veselo sporočilo, poslano z uporabo loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Pogledamo lahko tudi v mongo (z uporabo Robo3T ali Studio3T) in vidimo, da so vrednostni papirji v bazi podatkov:

Nisem milijarder, zato smo zadovoljni s prvo možnostjo ogleda.

Naloge v ozadju pri Faustu, II. del: Agenti in ekipeNaloge v ozadju pri Faustu, II. del: Agenti in ekipe

Sreča in veselje - prvi agent je pripravljen :)

Agent pripravljen, naj živi novi agent!

Da, gospodje, prehodili smo le 1/3 poti, ki jo je pripravil ta članek, vendar naj vas ne obupa, saj bo zdaj lažje.

Zdaj potrebujemo posrednika, ki zbira meta informacije in jih vnaša v zbirni dokument:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ker bo ta agent obdelal informacije o določenem vrednostnem papirju, moramo v sporočilu navesti oznako (simbol) tega vrednostnega papirja. V ta namen so v faustu zapisi — razredi, ki deklarirajo shemo sporočila v temi posrednika.

V tem primeru pojdimo na records.py in opišite, kako naj izgleda sporočilo za to temo:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kot ste morda uganili, Faust uporablja opombo tipa python za opis sporočilne sheme, zato je najmanjša različica, ki jo podpira knjižnica 3.6.

Vrnimo se k agentu, nastavimo vrste in ga dodamo:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kot lahko vidite, posredujemo nov parameter s shemo metodi inicializacije teme - value_type. Nadalje, vse poteka po isti shemi, zato ne vidim smisla, da bi se ukvarjal s čim drugim.

No, zadnji dotik je dodajanje klica agentu za zbiranje meta informacij za collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Za sporočilo uporabljamo predhodno napovedano shemo. V tem primeru sem uporabil metodo .cast, saj nam ni treba čakati na rezultat agenta, vendar velja omeniti, da načine pošlji sporočilo na temo:

  1. cast - ne blokira, ker ne pričakuje rezultata. Rezultata ne morete poslati v drugo temo kot sporočilo.

  2. pošlji - ne blokira, ker ne pričakuje rezultata. V temi lahko določite agenta, ki mu bo šel rezultat.

  3. vprašaj - čaka na rezultat. V temi lahko določite agenta, ki mu bo šel rezultat.

Torej, to je vse z agenti za danes!

Sanjska ekipa

Zadnja stvar, ki sem jo obljubil napisati v tem delu, so ukazi. Kot smo že omenili, so ukazi v faustu ovoj okrog klika. Pravzaprav faust preprosto pripne naš ukaz po meri svojemu vmesniku, ko podaja ključ -A

Po napovedanih agentih v agenti.py dodajte funkcijo z okrasiteljem app.commandklicanje metode lite у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Torej, če pokličemo seznam ukazov, bo naš novi ukaz v njem:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Uporabljamo ga lahko kot kogarkoli drugega, zato znova zaženimo faust workerja in začnimo polno zbiranje vrednostnih papirjev:

> faust -A horton.agents start-collect-securities

Kaj bo potem?

V naslednjem delu bomo na primeru preostalih agentov obravnavali mehanizem ponora za iskanje ekstremov v zaključnih cenah trgovanja za leto in zagon agentov cron.

To je vse za danes! Hvala za branje :)

Koda za ta del

Naloge v ozadju pri Faustu, II. del: Agenti in ekipe

PS Pod zadnjim delom so me spraševali o faustu in sotočni kafki (kakšne lastnosti ima konfluent?). Zdi se, da je confluent bolj funkcionalen v mnogih pogledih, toda dejstvo je, da faust nima popolne podpore odjemalcev za confluent - to izhaja iz opise omejitev strank v dok.

Vir: www.habr.com

Dodaj komentar