Attività di fondo nantu à Faust, Part II: Agenti è squadre

Attività di fondo nantu à Faust, Part II: Agenti è squadre

Indice di cuntenutu

  1. Parte I: Introduzione

  2. Parte II: Agenti è squadre

Chì facemu quì ?

Allora, cusì, a seconda parte. Comu scrittu prima, in questu faremu i seguenti:

  1. Scrivemu un picculu cliente per alphaavantage in aiohttp cù richieste per l'endpoints chì avemu bisognu.

  2. Creemu un agentu chì cullighjarà dati nantu à i tituli è meta infurmazione nantu à elli.

Ma, questu hè ciò chì faremu per u prughjettu stessu, è in quantu à a ricerca di faust, avemu da amparà à scrive l'agenti chì processanu l'avvenimenti di flussu da kafka, è ancu cumu scrive cumandamenti (cliccate wrapper), in u nostru casu - per i missaghji push manuali à u tema chì l'agente hè monitoratu.

A preparazione di

Client AlphaVantage

Prima, scrivemu un picculu cliente aiohttp per richieste à alphaavantage.

alphaavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

In fatti, tuttu hè chjaru da questu:

  1. L'API AlphaVantage hè abbastanza simplice è bellu cuncepitu, cusì aghju decisu di fà tutte e dumande attraversu u metudu construct_query induve à u turnu ci hè una chjama http.

  2. Aghju purtatu tutti i campi à snake_case per cunfortu.

  3. Ebbè, a decorazione di logger.catch per una bella è informativa traceback output.

PS Ùn vi scurdate di aghjunghje u token alphaavantage in u locu à config.yml, o esportà a variabile di l'ambiente HORTON_SERVICE_APIKEY. Ricevemu un token ccà.

Classe CRUD

Averemu una cullizzioni di securities per almacenà meta infurmazione nantu à i securities.

database/security.py

In u mo parè, ùn ci hè bisognu di spiegà nunda quì, è a classa di basa stessu hè abbastanza simplice.

get_app()

Aghjunghjite una funzione per creà un oggettu di l'applicazione app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Per avà averemu a creazione di l'applicazioni più simplice, un pocu dopu l'ampliemu, però, per ùn fà micca aspittà, quì riferimenti à App-class. Vi cunsigliu ancu di piglià un ochju à a classa di paràmetri, postu chì hè rispunsevule per a maiò parte di i paràmetri.

A parte principale

Agente per a cullizzioni è u mantenimentu di una lista di securities

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Allora, prima avemu l'ughjettu di l'applicazione faust - hè abbastanza simplice. In seguitu, dichjarà esplicitamente un tema per u nostru agentu ... Eccu vale a pena di dì ciò chì hè, quale hè u paràmetru internu è cumu si pò esse disposti in modu diversu.

  1. Temi in kafka, se vulemu sapè a definizione esatta, hè megliu di leghje off. document, o pudete leghje cumpendiu nantu à Habré in Russu, induve tuttu hè ancu riflessu abbastanza precisamente :)

  2. Parametru internu, Descritta abbastanza bè in u faust doc, ci permette di cunfigurà u tema direttamente in u codice, sicuru, questu significa i paràmetri furniti da i sviluppatori di faust, per esempiu: retenzioni, pulitica di retenzioni (per default eliminà, ma pudete stabilisce fundute), numeru di partizioni per tema (partitionsper fà, per esempiu, menu di significatu glubale applicazioni faust).

  3. In generale, l'agente pò creà un tema gestionatu cù i valori glubale, in ogni modu, mi piace à dichjarà tuttu esplicitamente. Inoltre, certi paràmetri (per esempiu, u numeru di partizioni o pulitica di retenzioni) di u tema in l'annunziu di l'agente ùn ponu esse cunfigurati.

    Eccu ciò chì puderia vede senza definisce manualmente u tema:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ebbè, avà descrivimu ciò chì u nostru agente farà :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Allora, à u principiu di l'agente, apremu una sessione aiohttp per e dumande attraversu u nostru cliente. Cusì, quandu principia un travagliadore, quandu u nostru agente hè lanciatu, una sessione serà immediatamente aperta - unu, per tuttu u tempu chì u travagliadore hè in esecuzione (o parechji, se cambiate u paràmetru). cuncurrenza da un agente cù una unità predeterminata).

Dopu, seguitamu u flussu (pusemu u messagiu in _, Siccomu noi, in questu agentu, ùn importa micca u cuntenutu) di i missaghji da u nostru tema, si esistinu à l'offset attuale, altrimenti u nostru ciculu aspittà per a so ghjunta. Ebbè, in u nostru ciclu, registremu a ricezione di u missaghju, uttene una lista di tituli attivi (get_securities ritorna solu attivu per automaticamente, vede u codice di u cliente) è salvemu in a basa di dati, verificate s'ellu ci hè una sicurità cù u listessu ticker è scambià in a basa di dati, se ci hè, allora (u carta) serà solu aghjurnatu.

Lanciamu a nostra creazione !

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Caratteristiche PS cumpunente web Ùn cunsiderà micca faust in l'articuli, cusì avemu stabilitu a bandiera approprita.

In u nostru cumandamentu di lanciamentu, avemu dettu à faust induve circà l'ughjettu di l'applicazione è ciò chì deve fà cun ellu (lanciare un travagliadore) cù u livellu di output di u logu d'infurmazioni. Avemu a seguente output:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Hè viva !!!

Fighjemu u settore di partizioni. Comu pudemu vede, un tema hè statu creatu cù u nome chì avemu designatu in u codice, u numeru predeterminatu di partizioni (8, pigliatu da topic_partitions - paràmetru di l'ughjettu di l'applicazione), postu chì ùn avemu micca specificatu un valore individuale per u nostru tema (via partizioni). L'agente lanciatu in u travagliu hè attribuitu tutte e 8 partizioni, postu chì hè l'unicu, ma questu serà discutitu in più detail in a parte di clustering.

Ebbè, avà pudemu andà in una altra finestra di terminal è mandà un missaghju viotu à u nostru tema:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usendu @ mostramu chì avemu da mandà un missaghju à un tema chjamatu "collect_securities".

In questu casu, u missaghju andò à a partizione 6 - pudete verificà questu andendu in kafdrop on localhost:9000

Andendu à a finestra di u terminal cù u nostru travagliadore, vedemu un missaghju felice mandatu cù loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Pudemu ancu guardà in mongo (usendu Robo3T o Studio3T) è vede chì i tituli sò in a basa di dati:

Ùn sò micca un miliardariu, è per quessa simu cuntentati cù a prima opzione di visualizazione.

Attività di fondo nantu à Faust, Part II: Agenti è squadreAttività di fondo nantu à Faust, Part II: Agenti è squadre

Felicità è gioia - u primu agentu hè prestu :)

Agente prontu, viva u novu agente!

Iè, signori, avemu cupartu solu 1/3 di a strada preparata da questu articulu, ma ùn vi scuraggiate, perchè avà serà più faciule.

Allora avà avemu bisognu di un agente chì recullà meta infurmazione è a mette in un documentu di cullizzioni:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Siccomu questu agentu prucederà l'infurmazioni nantu à una sicurità specifica, avemu bisognu di indicà u ticker (simbulu) di sta sicurità in u messagiu. Per questu scopu in faust ci sò vinile - classi chì dichjaranu u schema di messagiu in u tema di l'agente.

In questu casu, andemu à records.py è descrive ciò chì u missaghju per questu tema deve esse cum'è:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Cum'è avete capitu, faust usa l'annotazione di u tipu python per descriverà u schema di u messagiu, per quessa chì a versione minima supportata da a biblioteca hè 3.6.

Riturnemu à l'agente, stabilisce i tipi è aghjunghje:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Comu pudete vede, passemu un novu paràmetru cù un schema à u metudu di inizializazione di tema - value_type. In più, tuttu seguita u listessu schema, cusì ùn vecu micca u puntu di aspittà nantu à qualcosa altru.

Ebbè, u toccu finali hè di aghjunghje una chjama à l'agente di cullizzioni di meta infurmazione per collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Utilizemu u schema annunziatu prima per u messagiu. In questu casu, aghju utilizatu u metudu .cast postu chì ùn avemu micca bisognu di aspittà u risultatu da l'agente, ma vale a pena dì chì modi mandate un missaghju à u tema:

  1. cast - ùn blucca micca perchè ùn aspetta micca un risultatu. Ùn pudete micca mandà u risultatu à un altru tema cum'è missaghju.

  2. mandate - ùn blucca micca perchè ùn aspetta micca un risultatu. Pudete specificà un agentu in u tema à quale u risultatu andarà.

  3. dumandà - aspetta un risultatu. Pudete specificà un agente in u tema à quale u risultatu andarà.

Allora, questu hè tuttu cù l'agenti per oghje!

A squadra di sognu

L'ultima cosa chì aghju prumessu di scrive in questa parte hè cumandamenti. Comu diciatu prima, i cumandamenti in faust sò un wrapper intornu à cliccà. In fatti, faust attache semplicemente u nostru cumandamentu persunalizatu à a so interfaccia quandu specifica a chjave -A

Dopu à l'agenti annunciati in agenti.py aghjunghje una funzione cù un decoratore app.commandchjamà u metudu u cast у collect_securities:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Cusì, se chjamemu a lista di cumandamenti, u nostru novu cumandamentu serà in questu:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Pudemu usà cum'è qualcunu altru, allora riavvia u faust worker è cuminciamu una cullizzioni cumpleta di securities:

> faust -A horton.agents start-collect-securities

Chì succede dopu ?

In a parti dopu, utilizendu l'agenti rimanenti cum'è un esempiu, cunsideremu u mecanismu di sink per a ricerca di l'estremi in i prezzi di chjusi di cummerciale per l'annu è u cron launch of agents.

Hè tuttu per oghje ! Grazie per a lettura :)

Codice per sta parte

Attività di fondo nantu à Faust, Part II: Agenti è squadre

PS Sottu l'ultima parte m'hà dumandatu nantu à faust è kafka cunfluente (chì caratteristiche hà u cunfluente ?). Sembra chì u cunfluente hè più funziunale in parechje manere, ma u fattu hè chì faust ùn hà micca un supportu tutale di u cliente per cunfluente - questu seguita da descrizzioni di e restrizioni di u cliente in u documentu.

Source: www.habr.com

Add a comment