Tareas previas sobre Fausto, Parte II: Agentes y equipos

Tareas previas sobre Fausto, Parte II: Agentes y equipos

tabla de contenidos

  1. Parte I: Introducción

  2. Parte II: Agentes y Equipos

¿Qué estamos haciendo aquí?

Así, así, la segunda parte. Como está escrito anteriormente, en él haremos lo siguiente:

  1. Escribamos un pequeño cliente para alphavantage en aiohttp con solicitudes para los puntos finales que necesitamos.

  2. Creemos un agente que recopilará datos sobre valores y metainformación sobre ellos.

Pero esto es lo que haremos para el proyecto en sí y, en términos de la investigación de Faust, aprenderemos cómo escribir agentes que procesen eventos de flujo desde Kafka, así como también cómo escribir comandos (haga clic en el contenedor), en nuestro caso: para mensajes push manuales al tema que el agente está monitoreando.

Formación

Cliente AlphaVantage

Primero, escribamos un pequeño cliente aiohttp para solicitudes a alphavantage.

alfavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

En realidad, todo queda claro en ello:

  1. La API AlphaVantage tiene un diseño bastante simple y hermoso, por lo que decidí realizar todas las solicitudes a través del método construct_query donde a su vez hay una llamada http.

  2. Traigo todos los campos a snake_case Por comodidad.

  3. Bueno, la decoración logger.catch para un resultado de rastreo hermoso e informativo.

PD: No olvide agregar el token alphavantage localmente a config.yml o exportar la variable de entorno HORTON_SERVICE_APIKEY. Recibimos una ficha aquí.

clase CRUD

Tendremos una colección de valores para almacenar metainformación sobre valores.

base de datos/seguridad.py

En mi opinión, no es necesario explicar nada aquí y la clase base en sí es bastante simple.

Obtener aplicaciones()

Agreguemos una función para crear un objeto de aplicación en aplicación.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Por ahora tendremos la creación de aplicaciones más sencilla, un poco más adelante la ampliaremos, sin embargo, para no hacerte esperar, aquí referencias a clase de aplicación. También te aconsejo que eches un vistazo a la clase de configuración, ya que es responsable de la mayoría de las configuraciones.

principal

Agente para recopilar y mantener una lista de valores.

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Entonces, primero obtenemos el objeto de aplicación Faust; es bastante simple. A continuación, declaramos explícitamente un tema para nuestro agente... Aquí vale la pena mencionar qué es, cuál es el parámetro interno y cómo se puede organizar de manera diferente.

  1. Temas en kafka, si queremos saber la definición exacta, es mejor leer apagado. documento, o puedes leer abstracto en Habré en ruso, donde todo también se refleja con bastante precisión :)

  2. Parámetro interno, descrito bastante bien en el documento de faust, nos permite configurar el tema directamente en el código, por supuesto, esto significa los parámetros proporcionados por los desarrolladores de faust, por ejemplo: retención, política de retención (de forma predeterminada, eliminar, pero puede configurar compacto), número de particiones por tema (puntajeshacer, por ejemplo, menos de importancia global aplicaciones fausto).

  3. En general, el agente puede crear un tema administrado con valores globales, sin embargo, a mí me gusta declararlo todo explícitamente. Además, algunos parámetros (por ejemplo, el número de particiones o la política de retención) del tema en el anuncio del agente no se pueden configurar.

    Así es como se vería sin definir manualmente el tema:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bueno, ahora describamos lo que hará nuestro agente :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Entonces, al comienzo del agente, abrimos una sesión aiohttp para solicitudes a través de nuestro cliente. Por lo tanto, al iniciar un trabajador, cuando se inicia nuestro agente, inmediatamente se abrirá una sesión: una, durante todo el tiempo que el trabajador esté en ejecución (o varias, si cambia el parámetro concurrencia de un agente con una unidad predeterminada).

A continuación, seguimos el stream (colocamos el mensaje en _, ya que a nosotros, en este agente, no nos importa el contenido) de los mensajes de nuestro tema, si existen en el desplazamiento actual, de lo contrario nuestro ciclo esperará su llegada. Bueno, dentro de nuestro bucle, registramos la recepción del mensaje, obtenemos una lista de valores activos (get_securities devuelve solo activo de forma predeterminada, consulte el código del cliente) y la guardamos en la base de datos, verificando si hay un valor con el mismo ticker y intercambio en la base de datos, si lo hay, entonces (el documento) simplemente se actualizará.

¡Lanzamos nuestra creación!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Funciones de PS componente web No consideraré a Fausto en los artículos, por eso colocamos la bandera adecuada.

En nuestro comando de inicio, le dijimos a Fausto dónde buscar el objeto de la aplicación y qué hacer con él (iniciar un trabajador) con el nivel de salida del registro de información. Obtenemos el siguiente resultado:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

¡¡¡Está vivo!!!

Veamos el conjunto de particiones. Como podemos ver, se creó un tema con el nombre que designamos en el código, el número de particiones por defecto (8, tomado de particiones_tema - parámetro del objeto de la aplicación), ya que no especificamos un valor individual para nuestro tema (a través de particiones). Al agente iniciado en el trabajador se le asignan las 8 particiones, ya que es la única, pero esto se discutirá con más detalle en la parte sobre agrupación.

Bueno, ahora podemos ir a otra ventana de terminal y enviar un mensaje vacío a nuestro tema:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PD usando @ Mostramos que estamos enviando un mensaje a un tema llamado "collect_securities".

En este caso, el mensaje fue a la partición 6; puede verificar esto yendo a kafdrop en localhost:9000

Al ir a la ventana de la terminal con nuestro trabajador, veremos un mensaje feliz enviado usando loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

También podemos buscar en mongo (usando Robo3T o Studio3T) y ver que los valores están en la base de datos:

No soy multimillonario y, por lo tanto, nos contentamos con la primera opción de visualización.

Tareas previas sobre Fausto, Parte II: Agentes y equiposTareas previas sobre Fausto, Parte II: Agentes y equipos

Felicidad y alegría: el primer agente está listo :)

Agente listo, ¡viva el nuevo agente!

Sí señores, solo hemos recorrido 1/3 del camino preparado por este artículo, pero no se desanimen, porque ahora será más fácil.

Ahora necesitamos un agente que recopile metainformación y la coloque en un documento de recopilación:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Dado que este agente procesará información sobre un valor específico, debemos indicar el ticker (símbolo) de este valor en el mensaje. Para ello en fausto hay Archivos — clases que declaran el esquema de mensaje en el tema del agente.

En este caso, vayamos a registros.py y describa cómo debería verse el mensaje para este tema:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Como habrás adivinado, faust usa la anotación de tipo python para describir el esquema del mensaje, razón por la cual la versión mínima admitida por la biblioteca es 3.6.

Volvamos al agente, configuremos los tipos y agréguelo:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Como puede ver, pasamos un nuevo parámetro con un esquema al método de inicialización del tema: value_type. Además, todo sigue el mismo esquema, por lo que no veo ningún sentido a detenerme en nada más.

Bueno, el toque final es agregar una llamada al agente de recopilación de metainformación para recopilar_seguridades:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Usamos el esquema previamente anunciado para el mensaje. En este caso utilicé el método .cast ya que no necesitamos esperar el resultado del agente, pero vale la pena mencionar que formas de enviar un mensaje al tema:

  1. cast: no bloquea porque no espera un resultado. No puedes enviar el resultado a otro tema como mensaje.

  2. enviar: no bloquea porque no espera un resultado. Puede especificar un agente en el tema al que irá el resultado.

  3. preguntar - espera un resultado. Puede especificar un agente en el tema al que irá el resultado.

¡Eso es todo con los agentes por hoy!

El equipo de ensueño

Lo último que prometí escribir en esta parte son comandos. Como se mencionó anteriormente, los comandos en fausto son una envoltura alrededor del clic. De hecho, fausto simplemente adjunta nuestro comando personalizado a su interfaz al especificar la clave -A

Después de los agentes anunciados en agentes.py agregar una función con un decorador aplicación.comandollamando al método emitir у recoger_valores:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Así, si llamamos a la lista de comandos, nuestro nuevo comando estará en ella:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Podemos usarlo como cualquier otra persona, así que reiniciemos el trabajador Faust y comencemos una colección completa de valores:

> faust -A horton.agents start-collect-securities

¿Qué pasará después?

En la siguiente parte, tomando como ejemplo el resto de agentes, consideraremos el mecanismo sumidero para buscar extremos en los precios de cierre de las operaciones del año y el lanzamiento cron de agentes.

¡Eso es todo por hoy! Gracias por leer :)

Código para esta parte

Tareas previas sobre Fausto, Parte II: Agentes y equipos

PD: En la última parte me preguntaron sobre Fausto y Kafka confluente (¿Qué características tiene confluente?). Parece que confluent es más funcional en muchos sentidos, pero el hecho es que faust no tiene soporte completo para el cliente de confluent; esto se desprende de descripciones de las restricciones del cliente en el documento.

Fuente: habr.com

Añadir un comentario