Tarefas de fondo sobre Faust, Parte II: Axentes e equipos

Tarefas de fondo sobre Faust, Parte II: Axentes e equipos

Índice analítico

  1. Parte I: Introdución

  2. Parte II: Axentes e Equipos

Que facemos aquí?

Así, así, a segunda parte. Como se escribiu anteriormente, nel faremos o seguinte:

  1. Escribamos un pequeno cliente para alphaavantage en aiohttp con solicitudes para os puntos finais que necesitamos.

  2. Imos crear un axente que recompilará datos sobre valores e metainformación sobre eles.

Pero isto é o que faremos polo propio proxecto e, en termos de investigación de faust, aprenderemos a escribir axentes que procesen eventos de fluxo de kafka, así como a escribir comandos (clic envoltorio), no noso caso - para mensaxes push manuais ao tema que está a supervisar o axente.

Adestramento

Cliente AlphaVantage

Primeiro, imos escribir un pequeno cliente aiohttp para solicitudes de alfavantage.

alphaavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

De feito, todo queda claro:

  1. A API de AlphaVantage está deseñada de xeito bastante sinxelo e bonito, polo que decidín facer todas as solicitudes a través do método construct_query onde á súa vez hai unha chamada http.

  2. Traio todos os campos a snake_case por comodidade.

  3. Ben, a decoración logger.catch para unha saída de rastrexo fermosa e informativa.

PS Non esquezas engadir o token alphaavantage localmente a config.yml ou exportar a variable de ambiente HORTON_SERVICE_APIKEY. Recibimos unha ficha aquí.

Clase CRUD

Teremos unha colección de valores para almacenar metainformación sobre valores.

database/security.py

Na miña opinión, non hai que explicar nada aquí, e a propia clase base é bastante sinxela.

get_app()

Engademos unha función para crear un obxecto de aplicación aplicación.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Polo de agora teremos a creación de aplicacións máis sinxelas, un pouco máis adiante ampliarémola, non obstante, para non facervos esperar, aquí referencias á clase de aplicación. Tamén che aconsello que botes un ollo á clase de configuración, xa que é responsable da maioría das opcións.

Corpo principal

Axente de recollida e mantemento dunha lista de valores

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Entón, primeiro obtemos o obxecto da aplicación faust: é bastante sinxelo. A continuación, declaramos explícitamente un tema para o noso axente... Aquí paga a pena mencionar o que é, cal é o parámetro interno e como se pode organizar de forma diferente.

  1. Temas en kafka, se queremos saber a definición exacta, é mellor ler apagado. documento, ou podes ler compendio en Habré en ruso, onde tamén se reflicte todo con bastante precisión :)

  2. Parámetro interno, descrito bastante ben no documento de faust, permítenos configurar o tema directamente no código, por suposto, isto significa os parámetros proporcionados polos desenvolvedores de faust, por exemplo: retención, política de retención (por defecto eliminar, pero pode configurar compacto), número de particións por tema (puntuaciónsfacer, por exemplo, menos de importancia global aplicacións faust).

  3. En xeral, o axente pode crear un tema xestionado con valores globais, non obstante, gústame declarar todo de forma explícita. Ademais, algúns parámetros (por exemplo, o número de particións ou a política de retención) do tema no anuncio do axente non se poden configurar.

    Isto é o que pode parecer sen definir manualmente o tema:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ben, agora imos describir o que fará o noso axente :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Entón, ao comezo do axente, abrimos unha sesión aiohttp para solicitudes a través do noso cliente. Así, ao iniciar un traballador, cando se inicie o noso axente, abrirase inmediatamente unha sesión: unha, durante todo o tempo que o traballador estea en execución (ou varias, se cambia o parámetro). concorrencia dun axente cunha unidade predeterminada).

A continuación, seguimos o fluxo (colocamos a mensaxe en _, xa que a nós, neste axente, non nos importa o contido) das mensaxes do noso tema, se existen na compensación actual, se non, o noso ciclo agardará a súa chegada. Pois ben, dentro do noso bucle, rexistramos a recepción da mensaxe, obtemos unha lista de valores activos (get_securities só devolve activos por defecto, consulta o código do cliente) e gardámolo na base de datos, comprobando se hai un valor co mesmo ticker e intercambio na base de datos , se o hai, entón (o papel) simplemente se actualizará.

Imos lanzar a nosa creación!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Características PS compoñente web Non vou considerar fausto nos artigos, polo que poñemos a bandeira adecuada.

No noso comando de lanzamento, dixémoslle a Faust onde buscar o obxecto da aplicación e que facer con el (iniciar un traballador) co nivel de saída do rexistro de información. Obtemos a seguinte saída:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Está vivo!!!

Vexamos o conxunto de particións. Como podemos ver, creouse un tema co nome que designamos no código, o número de particións por defecto (8, tomado de particións_tema - parámetro de obxecto da aplicación), xa que non especificamos un valor individual para o noso tema (a través de particións). O axente iniciado no traballador ten asignadas as 8 particións, xa que é a única, pero isto comentarase con máis detalle na parte sobre a agrupación.

Ben, agora podemos ir a outra xanela de terminal e enviar unha mensaxe baleira ao noso tema:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usando @ mostramos que estamos enviando unha mensaxe a un tema chamado "coller_valores".

Neste caso, a mensaxe foi á partición 6; podes comprobalo indo a kafdrop on localhost:9000

Indo á xanela do terminal co noso traballador, veremos unha mensaxe feliz enviada usando loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Tamén podemos mirar en mongo (usando Robo3T ou Studio3T) e ver que os títulos están na base de datos:

Non son multimillonario e, polo tanto, conformámonos coa primeira opción de visualización.

Tarefas de fondo sobre Faust, Parte II: Axentes e equiposTarefas de fondo sobre Faust, Parte II: Axentes e equipos

Felicidade e alegría: o primeiro axente está listo :)

Axente listo, viva o novo axente!

Si, señores, só percorremos 1/3 do camiño que prepara este artigo, pero non vos desanimedes, porque agora será máis fácil.

Entón, agora necesitamos un axente que recolla metainformación e a poña nun documento de recollida:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Dado que este axente procesará información sobre unha seguridade específica, necesitamos indicar o ticker (símbolo) desta seguridade na mensaxe. Para este fin en faust hai Rexistros — clases que declaran o esquema de mensaxes no tema do axente.

Neste caso, imos a rexistros.py e describe como debería ser a mensaxe deste tema:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Como poderías ter adiviñado, faust usa a anotación de tipo Python para describir o esquema da mensaxe, polo que a versión mínima admitida pola biblioteca é 3.6.

Volvamos ao axente, establecemos os tipos e engadímolo:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Como podes ver, pasamos un novo parámetro cun esquema ao método de inicialización do tema - value_type. Ademais, todo segue o mesmo esquema, polo que non vexo ningún sentido a determe noutra cousa.

Ben, o toque final é engadir unha chamada ao axente de recollida de metainformación para collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Usamos o esquema anunciado previamente para a mensaxe. Neste caso, usei o método .cast xa que non necesitamos esperar o resultado do axente, pero vale a pena mencionar que xeitos enviar unha mensaxe ao tema:

  1. cast - non bloquea porque non espera un resultado. Non pode enviar o resultado a outro tema como mensaxe.

  2. enviar: non bloquea porque non espera un resultado. Podes especificar un axente no tema ao que irá o resultado.

  3. preguntar - agarda un resultado. Podes especificar un axente no tema ao que irá o resultado.

Entón, iso é todo cos axentes para hoxe!

O Dream Team

O último que prometín escribir nesta parte son os comandos. Como se mencionou anteriormente, os comandos en faust son un envoltorio ao redor do clic. De feito, faust simplemente anexa o noso comando personalizado á súa interface cando especifica a tecla -A

Despois dos axentes anunciados en axentes.py engadir unha función cun decorador aplicación.comandochamando ao método publicar у recoller_valores:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Así, se chamamos á lista de comandos, o noso novo comando estará nela:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Podemos usalo como calquera outra persoa, así que reiniciemos o faust worker e comecemos unha colección completa de valores:

> faust -A horton.agents start-collect-securities

Que pasará despois?

Na seguinte parte, tomando como exemplo os restantes axentes, consideraremos o mecanismo de sumidoiro para buscar extremos nos prezos de peche de negociación do ano e o lanzamento cron dos axentes.

Iso é todo por hoxe! Grazas por ler :)

Código para esta parte

Tarefas de fondo sobre Faust, Parte II: Axentes e equipos

PD Na última parte preguntáronme sobre fausto e kafka confluente (que características ten confluente?). Parece que confluent é máis funcional en moitos aspectos, pero o feito é que faust non ten soporte total para o cliente para confluent; descricións das restricións do cliente no documento.

Fonte: www.habr.com

Engadir un comentario