Fono užduotys apie Faustą, II dalis: Agentai ir komandos

Fono užduotys apie Faustą, II dalis: Agentai ir komandos

Turinys

  1. I dalis: Įvadas

  2. II dalis: Agentai ir komandos

Ką mes čia veikiame?

Taigi, antra dalis. Kaip parašyta anksčiau, jame atliksime šiuos veiksmus:

  1. Parašykime nedidelį alfavantage klientą aiohttp su užklausomis dėl mums reikalingų galinių taškų.

  2. Sukurkime agentą, kuris rinks duomenis apie vertybinius popierius ir meta informaciją apie juos.

Bet tai mes padarysime pačiam projektui, o kalbant apie „Fausto“ tyrimą, išmoksime rašyti agentus, kurie apdoroja srautinius įvykius iš kafka, taip pat kaip rašyti komandas (spustelėkite įpakavimą), mūsų atveju - rankiniu būdu išsiųstiems pranešimams į temą, kurią agentas stebi.

Mokymai

„AlphaVantage“ klientas

Pirmiausia parašykime nedidelį aiohttp klientą, skirtą alfavantage užklausoms.

alfavantage.py

Spoileris

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Tiesą sakant, viskas iš to aišku:

  1. „AlphaVantage“ API yra gana paprasta ir gražiai sukurta, todėl nusprendžiau visus prašymus pateikti naudojant metodą construct_query kur savo ruožtu yra http skambutis.

  2. Atvežu visus laukus snake_case patogumui.

  3. Na, logger.catch puošmena gražiam ir informatyviam atsekimui.

PS Nepamirškite į config.yml pridėti alfavantage prieigos rakto arba eksportuoti aplinkos kintamąjį HORTON_SERVICE_APIKEY. Mes gauname žetoną čia.

CRUD klasė

Turėsime vertybinių popierių kolekciją, kurioje saugosime metainformaciją apie vertybinius popierius.

duomenų bazė/security.py

Mano nuomone, čia nereikia nieko aiškinti, o pati bazinė klasė yra gana paprasta.

get_app ()

Pridėkime funkciją, skirtą programos objektui sukurti app.py

Spoileris

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Kol kas turėsime paprasčiausią aplikacijų kūrimą, kiek vėliau ją išplėsime, tačiau kad nereikėtų laukti, čia nuorodos į App-klasę. Taip pat patariu pažvelgti į nustatymų klasę, nes ji atsakinga už daugumą nustatymų.

Pagrindinė dalis

Vertybinių popierių sąrašo rinkimo ir tvarkymo agentas

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Taigi, pirmiausia gauname „faust“ programos objektą - tai gana paprasta. Toliau mes aiškiai deklaruojame temą mūsų agentui... Čia verta paminėti, kas tai yra, koks yra vidinis parametras ir kaip tai galima kitaip išdėstyti.

  1. Temos kafka, jei norime sužinoti tikslų apibrėžimą, geriau perskaityti išjungti. dokumentas, arba galite skaityti sąvadas ant Habré rusų kalba, kur viskas taip pat gana tiksliai atspindėta :)

  2. Vidinis parametras, gana gerai aprašyta faust dokumente, leidžia temą sukonfigūruoti tiesiai kode, žinoma, tai reiškia fausto kūrėjų pateiktus parametrus, pvz.: retention, retention policy (pagal numatytuosius nustatymus ištrinti, bet galite nustatyti kompaktiškas), skyrių skaičius vienoje temoje (daugybėpadaryti, pavyzdžiui, mažiau nei pasaulinės reikšmės programos faust).

  3. Apskritai agentas gali sukurti valdomą temą su globaliomis vertybėmis, tačiau man patinka viską deklaruoti aiškiai. Be to, negalima konfigūruoti kai kurių agento skelbimo temos parametrų (pavyzdžiui, skaidinių skaičiaus arba saugojimo politikos).

    Štai kaip tai gali atrodyti rankiniu būdu neapibrėžus temos:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Na, o dabar aprašykime, ką veiks mūsų agentas :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Taigi agento pradžioje atidarome aiohttp seansą užklausoms per savo klientą. Taigi paleidus darbuotoją, paleidus mūsų agentą, iš karto bus atidaryta sesija – viena, visam darbuotojo veikimo laikui (arba kelioms, jei pakeisite parametrą sutapimas iš agento su numatytuoju vienetu).

Toliau sekame srautą (įdedame pranešimą _, nes mums, šiam agentui, nerūpi pranešimų iš mūsų temos turinys, jei jie egzistuoja dabartiniame poslinkyje, kitaip mūsų ciklas lauks jų atvykimo. Na, savo ciklo viduje registruojame pranešimo gavimą, gauname aktyvių (get_securities grąžina tik aktyvius pagal nutylėjimą, žr. kliento kodą) vertybinių popierių sąrašą ir išsaugome jį duomenų bazėje, patikriname, ar yra vertybinių popierių su tuo pačiu žymekliu ir mainai duomenų bazėje , jei yra, tada jis (popierius) bus tiesiog atnaujintas.

Pradėkime savo kūrybą!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS funkcijos žiniatinklio komponentas Fausto straipsniuose nesvarstysiu, todėl iškeliame atitinkamą vėliavėlę.

Savo paleidimo komandoje mes nurodėme faust, kur ieškoti programos objekto ir ką su juo daryti (paleisti darbuotoją) su informacijos žurnalo išvesties lygiu. Gauname tokią išvestį:

Spoileris

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Tai gyva!!!

Pažiūrėkime į skaidinių rinkinį. Kaip matome, buvo sukurta tema su pavadinimu, kurį nurodėme kode, numatytuoju skaidinių skaičiumi (8, paimti iš temos_skyriai - programos objekto parametras), nes nenurodėme individualios mūsų temos reikšmės (per skaidinius). Darbuotoje paleistam agentui priskiriami visi 8 skirsniai, nes jis yra vienintelis, tačiau tai bus išsamiau aptarta dalyje apie klasterizavimą.

Na, dabar galime pereiti į kitą terminalo langą ir išsiųsti tuščią žinutę į mūsų temą:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS naudojimas @ parodome, kad siunčiame pranešimą tema, pavadinta „surinkti_vertybinius popierius“.

Šiuo atveju pranešimas nukeliavo į 6 skaidinį – tai galite patikrinti apsilankę kafdrop on localhost:9000

Eidami į terminalo langą su savo darbuotoju, pamatysime laimingą pranešimą, išsiųstą naudojant loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Taip pat galime pažvelgti į mongo (naudojant Robo3T arba Studio3T) ir pamatyti, kad vertybiniai popieriai yra duomenų bazėje:

Nesu milijardierius, todėl esame patenkinti pirmuoju žiūrėjimo variantu.

Fono užduotys apie Faustą, II dalis: Agentai ir komandosFono užduotys apie Faustą, II dalis: Agentai ir komandos

Laimė ir džiaugsmas - pirmasis agentas pasiruošęs :)

Agentas pasiruošęs, tegyvuoja naujasis agentas!

Taip, ponai, įveikėme tik 1/3 šio straipsnio paruošto kelio, bet nenusiminkite, nes dabar bus lengviau.

Taigi dabar mums reikia agento, kuris renka meta informaciją ir įkelia ją į rinkimo dokumentą:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Kadangi šis agentas apdoros informaciją apie konkretų vertybinį popierių, pranešime turime nurodyti šio saugumo žymeklį (simbolį). Tam tikslui fauste yra Įrašai — klasės, deklaruojančios pranešimų schemą agento temoje.

Šiuo atveju pereikime prie records.py ir aprašykite, kaip turėtų atrodyti šios temos pranešimas:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kaip jau galėjote atspėti, „faust“ naudoja „python“ tipo anotaciją pranešimo schemai apibūdinti, todėl minimali bibliotekos palaikoma versija yra 3.6.

Grįžkime prie agento, nustatykime tipus ir pridėkime:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kaip matote, temos inicijavimo metodui perduodame naują parametrą su schema - vertės_tipas. Be to, viskas vyksta pagal tą pačią schemą, todėl nematau prasmės leistis į ką nors kita.

Na, paskutinis prisilietimas yra pridėti iškvietimą metainformacijos rinkimo agentui į collection_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Pranešimui naudojame anksčiau paskelbtą schemą. Šiuo atveju naudojau .cast metodą, nes nereikia laukti rezultato iš agento, tačiau verta paminėti, kad būdais siųsti žinutę į temą:

  1. cast - neblokuoja, nes nesitiki rezultato. Negalite siųsti rezultato į kitą temą kaip žinutės.

  2. siųsti – neblokuoja, nes nesitiki rezultato. Temoje galite nurodyti agentą, į kurį bus nukreiptas rezultatas.

  3. paklausti - laukia rezultato. Temoje galite nurodyti agentą, į kurį bus nukreiptas rezultatas.

Taigi, su agentais šiandien viskas!

Svajonių komanda

Paskutinis dalykas, kurį pažadėjau parašyti šioje dalyje, yra komandos. Kaip minėta anksčiau, „faust“ komandos yra paspaudimas. Tiesą sakant, Faustas tiesiog prideda mūsų pasirinktinę komandą prie savo sąsajos, kai nurodo klavišą -A

Paskelbtiems agentams įėjus agentai.py pridėti funkciją su dekoratoriumi app.commandvadindamas metodą mesti у rinkti_vertybinius popierius:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Taigi, jei iškviesime komandų sąrašą, jame bus mūsų nauja komanda:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Galime jį naudoti kaip bet kas kitas, todėl iš naujo paleiskite „faust worker“ ir pradėkime visavertį vertybinių popierių rinkimą:

> faust -A horton.agents start-collect-securities

Kas bus toliau?

Kitoje dalyje, kaip pavyzdį naudodamiesi likusiais agentais, apžvelgsime nugrimzdimo mechanizmą ieškant kraštutinumų prekybos metų uždarymo kainose ir agentų kronų paleidime.

Tai viskas siandienai! Ačiū, kad skaitėte :)

Šios dalies kodas

Fono užduotys apie Faustą, II dalis: Agentai ir komandos

PS Paskutinėje dalyje manęs paklausė apie faust and confluent kafka (kokias savybes turi confluentas?). Atrodo, kad „confluent“ daugeliu atžvilgių yra funkcionalesnis, tačiau faktas yra tas, kad „Faust“ neturi visiško „confluento“ klientų palaikymo – tai išplaukia iš kliento apribojimų aprašymai doc.

Šaltinis: www.habr.com

Добавить комментарий