Tasques de fons sobre Faust, Part II: Agents i equips

Tasques de fons sobre Faust, Part II: Agents i equips

Taula de continguts

  1. Part I: Introducció

  2. Part II: Agents i equips

Què estem fent aquí?

Així doncs, la segona part. Com s'ha escrit anteriorment, en ell farem el següent:

  1. Escrivim un petit client per alphavantage a aiohttp amb sol·licituds per als punts finals que necessitem.

  2. Creem un agent que recopilarà dades sobre valors i metainformació sobre ells.

Però això és el que farem per al projecte en si, i pel que fa a la recerca de Faust, aprendrem a escriure agents que processen esdeveniments de flux des de kafka, així com a escriure ordres (embolcall de clic), en el nostre cas: per als missatges push manuals al tema que l'agent està supervisant.

Entrenament

Client AlphaVantage

En primer lloc, escrivim un petit client aiohttp per a sol·licituds a alphaavantage.

alphaavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

De fet, tot queda clar:

  1. L'API AlphaVantage està dissenyada de manera senzilla i bonica, així que vaig decidir fer totes les sol·licituds mitjançant el mètode construct_query on al seu torn hi ha una trucada http.

  2. Porto tots els camps a snake_case per comoditat.

  3. Bé, la decoració logger.catch per a una sortida de traçament bonica i informativa.

PS No us oblideu d'afegir localment el testimoni alphaavantage a config.yml o exportar la variable d'entorn HORTON_SERVICE_APIKEY. Rebem un testimoni aquí.

Classe CRUD

Tindrem una col·lecció de valors per emmagatzemar metainformació sobre valors.

database/security.py

Al meu entendre, no cal explicar res aquí, i la classe base en si és bastant senzilla.

get_app()

Afegim una funció per crear un objecte d'aplicació app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

De moment tindrem la creació de l'aplicació més senzilla, una mica més endavant l'ampliarem, però, per no fer-vos esperar, aquí referències a la classe d'aplicacions. També us recomano que feu una ullada a la classe de configuració, ja que és responsable de la majoria de la configuració.

La part principal

Agent de recollida i manteniment d'una llista de valors

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Per tant, primer obtenim l'objecte d'aplicació faust: és bastant senzill. A continuació, declarem explícitament un tema per al nostre agent... Aquí val la pena esmentar què és, quin és el paràmetre intern i com es pot organitzar de manera diferent.

  1. Temes en kafka, si volem saber la definició exacta, millor llegir apagat. document, o pots llegir compendi a Habré en rus, on tot es reflecteix també amb força precisió :)

  2. Paràmetre intern, descrit força bé al document de faust, ens permet configurar el tema directament al codi, és clar, això vol dir els paràmetres proporcionats pels desenvolupadors de faust, per exemple: retenció, política de retenció (per defecte esborra, però podeu configurar compacte), nombre de particions per tema (puntuacionsfer, per exemple, menys de importància global aplicacions faust).

  3. En general, l'agent pot crear un tema gestionat amb valors globals, però m'agrada declarar-ho tot explícitament. A més, no es poden configurar alguns paràmetres (per exemple, el nombre de particions o la política de retenció) del tema de l'anunci de l'agent.

    A continuació es mostra com podria semblar sense definir manualment el tema:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bé, ara anem a descriure què farà el nostre agent :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Així doncs, a l'inici de l'agent, obrim una sessió aiohttp per a sol·licituds a través del nostre client. Així, quan s'inicia un treballador, quan s'inicia el nostre agent, immediatament s'obrirà una sessió: una, durant tot el temps que el treballador està en execució (o diverses, si canvieu el paràmetre). simultaneïtat d'un agent amb una unitat per defecte).

A continuació, seguim el flux (hi col·loquem el missatge _, ja que a nosaltres, en aquest agent, no ens importa el contingut) dels missatges del nostre tema, si existeixen al desplaçament actual, en cas contrari el nostre cicle esperarà la seva arribada. Bé, dins del nostre bucle, registrem la recepció del missatge, obtenim una llista de valors actius (get_securities retorna només actius per defecte, vegeu el codi del client) i el desem a la base de dades, comprovant si hi ha un títol amb el mateix ticker i intercanvi a la base de dades, si n'hi ha, simplement s'actualitzarà (el paper).

Llencem la nostra creació!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Característiques de PS component web No consideraré faust als articles, així que posem la bandera adequada.

A la nostra comanda de llançament, vam dir a Faust on buscar l'objecte de l'aplicació i què fer-hi (llançar un treballador) amb el nivell de sortida del registre d'informació. Obtenim la següent sortida:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Està viu!!!

Vegem el conjunt de particions. Com podem veure, es va crear un tema amb el nom que hem designat al codi, el nombre de particions per defecte (8, extret de particions_tema - paràmetre d'objecte d'aplicació), ja que no hem especificat un valor individual per al nostre tema (mitjançant particions). A l'agent llançat al treballador se li assignen les 8 particions, ja que és l'única, però això es tractarà amb més detall a la part sobre agrupació.

Bé, ara podem anar a una altra finestra de terminal i enviar un missatge buit al nostre tema:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS utilitzant @ mostrem que estem enviant un missatge a un tema anomenat "collect_securities".

En aquest cas, el missatge va anar a la partició 6; podeu comprovar-ho si aneu a kafdrop on localhost:9000

Anant a la finestra del terminal amb el nostre treballador, veurem un missatge feliç enviat mitjançant loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

També podem mirar a mongo (utilitzant Robo3T o Studio3T) i veure que els valors es troben a la base de dades:

No sóc multimilionari i, per tant, ens conformem amb la primera opció de visualització.

Tasques de fons sobre Faust, Part II: Agents i equipsTasques de fons sobre Faust, Part II: Agents i equips

Felicitat i alegria: el primer agent està preparat :)

Agent preparat, visca el nou agent!

Sí, senyors, només hem recorregut 1/3 del camí que ens ha preparat aquest article, però no us desanimau, perquè ara serà més fàcil.

Per tant, ara necessitem un agent que reculli metainformació i la introdueixi en un document de recollida:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Com que aquest agent processarà informació sobre una seguretat concreta, hem d'indicar el ticker (símbol) d'aquesta seguretat al missatge. Per a això en faust n'hi ha arxius — classes que declaren l'esquema de missatges al tema de l'agent.

En aquest cas, anem a records.py i descriu com hauria de ser el missatge d'aquest tema:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Com haureu endevinat, faust utilitza l'anotació de tipus Python per descriure l'esquema del missatge, per això la versió mínima admesa per la biblioteca és 3.6.

Tornem a l'agent, establim els tipus i l'afegim:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Com podeu veure, passem un nou paràmetre amb un esquema al mètode d'inicialització del tema - value_type. A més, tot segueix el mateix esquema, així que no veig cap sentit a detenir-me en res més.

Bé, el toc final és afegir una trucada a l'agent de recollida d'informació meta a collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Utilitzem l'esquema anunciat anteriorment per al missatge. En aquest cas, he utilitzat el mètode .cast ja que no hem d'esperar el resultat de l'agent, però val la pena esmentar que maneres envia un missatge al tema:

  1. cast: no bloqueja perquè no espera un resultat. No podeu enviar el resultat a un altre tema com a missatge.

  2. enviar - no bloqueja perquè no espera un resultat. Podeu especificar un agent al tema al qual anirà el resultat.

  3. preguntar - espera un resultat. Podeu especificar un agent al tema al qual anirà el resultat.

Així doncs, això és tot amb els agents d'avui!

L'equip dels somnis

L'últim que vaig prometre escriure en aquesta part són les ordres. Com s'ha esmentat anteriorment, les ordres de faust són un embolcall al voltant del clic. De fet, faust simplement adjunta la nostra ordre personalitzada a la seva interfície quan especifica la clau -A

Després dels agents anunciats agents.py afegir una funció amb un decorador app.commandcridant el mètode emetre у cobrar_valors:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Així, si cridem a la llista d'ordres, la nostra nova ordre hi estarà:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Podem utilitzar-lo com qualsevol altra persona, així que reiniciem el treballador de faust i comencem una col·lecció completa de valors:

> faust -A horton.agents start-collect-securities

Què passarà després?

A la següent part, prenent com a exemple la resta d'agents, considerarem el mecanisme d'amortització per a la recerca d'extrems en els preus de tancament de la negociació de l'any i el llançament cron d'agents.

Això és tot per avui! Gràcies per llegir :)

Codi d'aquesta part

Tasques de fons sobre Faust, Part II: Agents i equips

PD A l'última part em van preguntar sobre faust i kafka confluent (quines característiques té confluent?). Sembla que confluent és més funcional en molts aspectes, però el fet és que Faust no té un suport complet per al client per a confluent; això es deu a descripcions de les restriccions del client al document.

Font: www.habr.com

Afegeix comentari