Attività in background su Faust, parte II: agenti e team

Attività in background su Faust, parte II: agenti e team

Sommario

  1. Parte I: Introduzione

  2. Parte II: Agenti e Team

Cosa stiamo facendo qui?

Quindi, quindi, la seconda parte. Come scritto in precedenza, in esso faremo quanto segue:

  1. Scriviamo un piccolo client per alphavantage su aiohttp con le richieste per gli endpoint di cui abbiamo bisogno.

  2. Creiamo un agente che raccoglierà dati sui titoli e metainformazioni su di essi.

Ma questo è ciò che faremo per il progetto stesso e, in termini di ricerca Faust, impareremo come scrivere agenti che elaborano eventi di flusso da Kafka, nonché come scrivere comandi (click wrapper), nel nostro caso - per i messaggi push manuali all'argomento che l'agente sta monitorando.

Formazione

Cliente AlphaVantage

Per prima cosa scriviamo un piccolo client aihttp per le richieste ad alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

In realtà da ciò risulta tutto chiaro:

  1. L'API AlphaVantage è progettata in modo semplice e ben progettato, quindi ho deciso di effettuare tutte le richieste tramite questo metodo construct_query dove a sua volta c'è una chiamata http.

  2. Porto tutti i campi a snake_case per comodità.

  3. Bene, la decorazione logger.catch per un output di traceback bello e informativo.

PS Non dimenticare di aggiungere il token alphavantage localmente a config.yml o esportare la variabile di ambiente HORTON_SERVICE_APIKEY. Riceviamo un gettone qui.

Classe CRUD

Avremo una raccolta di titoli per archiviare metainformazioni sui titoli.

database/security.py

Secondo me, non è necessario spiegare nulla qui e la classe base stessa è abbastanza semplice.

ottieni l'applicazione()

Aggiungiamo una funzione per creare un oggetto applicazione app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Per ora avremo la creazione dell'applicazione più semplice, tra poco la amplieremo però, per non farvi aspettare, qui Riferimenti alla classe App. Ti consiglio anche di dare un'occhiata alla classe settings, poiché è responsabile della maggior parte delle impostazioni.

principale

Agente per la raccolta e il mantenimento di un elenco di titoli

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Quindi, per prima cosa otteniamo l'oggetto applicazione Faust: è abbastanza semplice. Successivamente, dichiariamo esplicitamente un argomento per il nostro agente... Qui vale la pena menzionare di cosa si tratta, qual è il parametro interno e come questo può essere organizzato diversamente.

  1. Argomenti in Kafka, se vogliamo conoscere la definizione esatta, è meglio leggere spento. documentooppure puoi leggere compendio su Habré in russo, dove tutto si riflette anche in modo abbastanza accurato :)

  2. Parametro interno, descritto abbastanza bene nel documento Faust, ci permette di configurare l'argomento direttamente nel codice, ovviamente, questo significa i parametri forniti dagli sviluppatori Faust, ad esempio: conservazione, criteri di conservazione (per impostazione predefinita elimina, ma puoi impostare compatto), numero di partizioni per argomento (punteggifare, ad esempio, meno di significato globale applicazioni Faust).

  3. In generale, l'agente può creare un argomento gestito con valori globali, tuttavia preferisco dichiarare tutto in modo esplicito. Inoltre, alcuni parametri (ad esempio, il numero di partizioni o i criteri di conservazione) dell'argomento nell'annuncio dell'agente non possono essere configurati.

    Ecco come potrebbe apparire senza definire manualmente l'argomento:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bene, ora descriviamo cosa farà il nostro agente :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Quindi, all'inizio dell'agente, apriamo una sessione aihttp per le richieste tramite il nostro client. Pertanto, quando si avvia un lavoratore, quando viene avviato il nostro agente, verrà immediatamente aperta una sessione: una, per tutto il tempo in cui il lavoratore è in esecuzione (o più, se si modifica il parametro concorrenza da un agente con un'unità predefinita).

Successivamente, seguiamo il flusso (inseriamo il messaggio _, poiché noi, in questo agente, non ci preoccupiamo del contenuto) dei messaggi del nostro argomento, se esistono all'offset corrente, altrimenti il ​​nostro ciclo attenderà il loro arrivo. Bene, all'interno del nostro ciclo, registriamo la ricezione del messaggio, otteniamo un elenco di titoli attivi (get_securities restituisce solo attivi per impostazione predefinita, vedi codice cliente) e lo salviamo nel database, controllando se esiste un titolo con lo stesso ticker e scambio nel database, se c'è, allora (il documento) verrà semplicemente aggiornato.

Lanciamo la nostra creazione!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Caratteristiche PS componente web Non prenderò in considerazione Faust negli articoli, quindi impostiamo il flag appropriato.

Nel nostro comando di avvio, abbiamo detto a Faust dove cercare l'oggetto applicazione e cosa farne (avviare un lavoratore) con il livello di output del registro informazioni. Otteniamo il seguente output:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

È viva!!!

Diamo un'occhiata al set di partizioni. Come possiamo vedere, è stato creato un topic con il nome che abbiamo indicato nel codice, il numero predefinito di partizioni (8, preso da topic_partitions - parametro dell'oggetto dell'applicazione), poiché non abbiamo specificato un valore individuale per il nostro argomento (tramite partizioni). All'agente avviato nel lavoratore vengono assegnate tutte le 8 partizioni, poiché è l'unica, ma questo verrà discusso in maggior dettaglio nella parte relativa al clustering.

Bene, ora possiamo andare in un'altra finestra di terminale e inviare un messaggio vuoto al nostro argomento:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS utilizzando @ mostriamo che stiamo inviando un messaggio a un argomento denominato "collect_securities".

In questo caso, il messaggio è andato alla partizione 6: puoi verificarlo andando su kafdrop localhost:9000

Andando alla finestra del terminale con il nostro lavoratore, vedremo un messaggio felice inviato utilizzando loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Possiamo anche esaminare mongo (utilizzando Robo3T o Studio3T) e vedere se i titoli sono nel database:

Non sono un miliardario e quindi ci accontentiamo della prima opzione di visualizzazione.

Attività in background su Faust, parte II: agenti e teamAttività in background su Faust, parte II: agenti e team

Felicità e gioia: il primo agente è pronto :)

Agente pronto, lunga vita al nuovo agente!

Sì signori, abbiamo percorso solo 1/3 del percorso preparato da questo articolo, ma non scoraggiatevi, perché ora sarà più semplice.

Quindi ora abbiamo bisogno di un agente che raccolga meta informazioni e le inserisca in un documento di raccolta:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Poiché questo agente elaborerà le informazioni su un titolo specifico, dobbiamo indicare il ticker (simbolo) di questo titolo nel messaggio. A questo scopo in Faust ci sono Records — classi che dichiarano lo schema del messaggio nell'argomento dell'agente.

In questo caso, andiamo a record.py e descrivi come dovrebbe apparire il messaggio per questo argomento:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Come avrai intuito, Faust utilizza l'annotazione di tipo python per descrivere lo schema del messaggio, motivo per cui la versione minima supportata dalla libreria è 3.6.

Torniamo all'agente, impostiamo i tipi e aggiungiamolo:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Come puoi vedere, passiamo un nuovo parametro con uno schema al metodo di inizializzazione dell'argomento: value_type. Inoltre, tutto segue lo stesso schema, quindi non vedo il motivo di soffermarmi su altro.

Bene, il tocco finale è aggiungere una chiamata all'agente di raccolta delle meta informazioni a collector_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Usiamo lo schema precedentemente annunciato per il messaggio. In questo caso, ho utilizzato il metodo .cast poiché non è necessario attendere il risultato dall'agente, ma vale la pena menzionarlo modi di invia un messaggio al topic:

  1. cast - non blocca perché non prevede un risultato. Non è possibile inviare il risultato a un altro argomento come messaggio.

  2. invia - non blocca perché non prevede un risultato. È possibile specificare un agente nell'argomento a cui verrà inviato il risultato.

  3. chiedi: attende un risultato. È possibile specificare un agente nell'argomento a cui verrà inviato il risultato.

Quindi per oggi è tutto con gli agenti!

La squadra dei sogni

L'ultima cosa che ho promesso di scrivere in questa parte sono i comandi. Come accennato in precedenza, i comandi in Faust sono un wrapper attorno al clic. Infatti, Faust associa semplicemente il nostro comando personalizzato alla sua interfaccia quando specifichi la chiave -A

Dopo che gli agenti annunciati sono entrati agenti.py aggiungi una funzione con un decoratore app.commandchiamando il metodo lanciare у raccogli_titoli:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Pertanto, se chiamiamo l'elenco dei comandi, il nostro nuovo comando sarà al suo interno:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Possiamo usarlo come chiunque altro, quindi riavviamo il Faust Worker e iniziamo una vera e propria raccolta di titoli:

> faust -A horton.agents start-collect-securities

Che cosa succederà dopo?

Nella parte successiva, utilizzando come esempio gli agenti rimanenti, considereremo il meccanismo sink per la ricerca degli estremi nei prezzi di chiusura delle negoziazioni dell'anno e il lancio cron degli agenti.

È tutto per oggi! Grazie per aver letto :)

Codice per questa parte

Attività in background su Faust, parte II: agenti e team

PS Nell'ultima parte mi è stato chiesto di Faust e Confluent Kafka (che caratteristiche ha confluent?). Sembra che confluent sia più funzionale in molti modi, ma il fatto è che Faust non ha il supporto completo del client per confluent - questo deriva da descrizioni delle limitazioni del client nel doc.

Fonte: habr.com

Aggiungi un commento