Tâches de fond sur Faust, Partie II : Agents et équipes

Tâches de fond sur Faust, Partie II : Agents et équipes

table des matières

  1. Partie I : Introduction

  2. Partie II : Agents et équipes

Que faisons-nous ici?

Alors, voilà, la deuxième partie. Comme écrit précédemment, nous y ferons ce qui suit :

  1. Écrivons un petit client pour alphavantage sur aiohttp avec des requêtes pour les points de terminaison dont nous avons besoin.

  2. Créons un agent qui collectera des données sur les titres et des méta-informations sur ceux-ci.

Mais c'est ce que nous ferons pour le projet lui-même, et en termes de recherche Faust, nous apprendrons comment écrire des agents qui traitent les événements de flux de Kafka, ainsi que comment écrire des commandes (clic wrapper), dans notre cas - pour les messages push manuels vers le sujet que l'agent surveille.

Formation

Client AlphaVantage

Tout d'abord, écrivons un petit client aiohttp pour les requêtes adressées à alphavantage.

alphavantage.py

Becquet

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

En fait, tout en ressort clairement :

  1. L'API AlphaVantage est assez simple et magnifiquement conçue, j'ai donc décidé de faire toutes les requêtes via la méthode construct_query où à son tour il y a un appel http.

  2. J'apporte tous les champs à snake_case pour plus de commodité.

  3. Eh bien, la décoration logger.catch pour une sortie de traçage belle et informative.

PS N'oubliez pas d'ajouter le jeton alphavantage localement à config.yml, ou d'exporter la variable d'environnement HORTON_SERVICE_APIKEY. Nous recevons un jeton ici.

Classe CRUD

Nous aurons une collection de titres pour stocker des méta-informations sur les titres.

base de données/sécurité.py

À mon avis, il n'est pas nécessaire d'expliquer quoi que ce soit ici, et la classe de base elle-même est assez simple.

obtenir l'application()

Ajoutons une fonction pour créer un objet application dans app.py

Becquet

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Pour l'instant, nous aurons la création d'application la plus simple, un peu plus tard nous l'étendrons, cependant, pour ne pas vous faire attendre, ici les références à la classe App. Je vous conseille également de jeter un œil à la classe settings, puisqu'elle est responsable de la plupart des paramètres.

Principal

Agent de collecte et de tenue d'une liste de titres

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Donc, nous obtenons d’abord le faux objet d’application - c’est assez simple. Ensuite, nous déclarons explicitement un sujet pour notre agent... Ici, il convient de mentionner de quoi il s'agit, quel est le paramètre interne et comment cela peut être organisé différemment.

  1. Sujets en kafka, si on veut connaître la définition exacte, il vaut mieux lire désactivé. document, ou vous pouvez lire synopsis sur Habré en russe, où tout se reflète également assez précisément :)

  2. Paramètre interne, assez bien décrit dans la doc faust, permet de configurer le sujet directement dans le code, bien sûr, cela signifie les paramètres fournis par les développeurs faust, par exemple : rétention, politique de rétention (par défaut supprimer, mais vous pouvez définir compact), nombre de partitions par sujet (partitionsfaire, par exemple, moins de valeur globale applications fausses).

  3. En général, l'agent peut créer un sujet géré avec des valeurs globales, cependant, j'aime tout déclarer explicitement. De plus, certains paramètres (par exemple, le nombre de partitions ou la stratégie de rétention) du sujet dans la publication de l'agent ne peuvent pas être configurés.

    Voici à quoi cela pourrait ressembler sans définir manuellement le sujet :

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Eh bien, décrivons maintenant ce que fera notre agent :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ainsi, au début de l'agent, nous ouvrons une session aiohttp pour les requêtes via notre client. Ainsi, au démarrage d'un Worker, au lancement de notre agent, une session sera immédiatement ouverte - une, pour toute la durée d'exécution du Worker (ou plusieurs, si vous modifiez le paramètre concurrence d'un agent avec une unité par défaut).

Ensuite, on suit le flux (on place le message dans _, puisque nous, dans cet agent, ne nous soucions pas du contenu) des messages de notre sujet, s'ils existent au décalage actuel, sinon notre cycle attendra leur arrivée. Eh bien, dans notre boucle, nous enregistrons la réception du message, obtenons une liste des titres actifs (get_securities ne renvoie que les titres actifs par défaut, voir le code client) et l'enregistrons dans la base de données, vérifiant s'il existe un titre avec le même ticker et échange dans la base de données, s'il y en a, alors il (le papier) sera simplement mis à jour.

Lançons notre création !

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Fonctionnalités PS composant Web Je ne considérerai pas Faust dans les articles, nous définissons donc le drapeau approprié.

Dans notre commande de lancement, nous avons indiqué à Faust où rechercher l'objet d'application et quoi en faire (lancer un travailleur) avec le niveau de sortie du journal d'informations. Nous obtenons le résultat suivant :

Becquet

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

C'est vivant!!!

Regardons l'ensemble de partitions. Comme on peut le voir, un sujet a été créé avec le nom que nous avons désigné dans le code, le nombre de partitions par défaut (8, tiré de sujet_partitions - paramètre d'objet d'application), puisque nous n'avons pas spécifié de valeur individuelle pour notre sujet (via les partitions). L'agent lancé dans le travailleur se voit attribuer les 8 partitions, puisqu'il est la seule, mais cela sera discuté plus en détail dans la partie sur le clustering.

Eh bien, nous pouvons maintenant accéder à une autre fenêtre de terminal et envoyer un message vide à notre sujet :

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS utilisant @ nous montrons que nous envoyons un message à un sujet nommé « collect_securities ».

Dans ce cas, le message est allé à la partition 6 - vous pouvez le vérifier en allant sur kafdrop sur localhost:9000

En allant à la fenêtre du terminal avec notre travailleur, nous verrons un message joyeux envoyé en utilisant loguru :

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

On peut aussi regarder dans mongo (en utilisant Robo3T ou Studio3T) et voir que les titres sont dans la base de données :

Je ne suis pas milliardaire et nous nous contentons donc de la première option de visualisation.

Tâches de fond sur Faust, Partie II : Agents et équipesTâches de fond sur Faust, Partie II : Agents et équipes

Bonheur et joie - le premier agent est prêt :)

Agent prêt, vive le nouvel agent !

Oui, messieurs, nous n'avons parcouru que 1/3 du chemin préparé par cet article, mais ne vous découragez pas, car désormais ce sera plus facile.

Nous avons donc maintenant besoin d'un agent qui collecte des méta-informations et les met dans un document de collecte :

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Puisque cet agent traitera des informations sur un titre spécifique, nous devons indiquer le ticker (symbole) de ce titre dans le message. A cet effet, il y a en Faut Enregistrements — classes qui déclarent le schéma de message dans la rubrique de l'agent.

Dans ce cas, allons à enregistrements.py et décrivez à quoi devrait ressembler le message pour ce sujet :

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Comme vous l'avez peut-être deviné, Faust utilise l'annotation de type Python pour décrire le schéma du message, c'est pourquoi la version minimale prise en charge par la bibliothèque est 3.6.

Revenons à l'agent, définissons les types et ajoutons-le :

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Comme vous pouvez le voir, nous transmettons un nouveau paramètre avec un schéma à la méthode d'initialisation du sujet - value_type. De plus, tout suit le même schéma, donc je ne vois pas l’intérêt de m’attarder sur autre chose.

Eh bien, la touche finale consiste à ajouter un appel à l'agent de collecte de méta-informations à collect_securitites :

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Nous utilisons le schéma annoncé précédemment pour le message. Dans ce cas, j'ai utilisé la méthode .cast puisque nous n'avons pas besoin d'attendre le résultat de l'agent, mais il convient de mentionner que façons de envoyer un message sur le sujet :

  1. cast - ne bloque pas car il n'attend pas de résultat. Vous ne pouvez pas envoyer le résultat à un autre sujet sous forme de message.

  2. envoyer - ne bloque pas car il n'attend pas de résultat. Vous pouvez spécifier un agent dans le sujet auquel le résultat sera envoyé.

  3. demander - attend un résultat. Vous pouvez spécifier un agent dans le sujet auquel le résultat sera envoyé.

Voilà, c'est tout avec les agents pour aujourd'hui !

L'équipe de rêve

La dernière chose que j'ai promis d'écrire dans cette partie, ce sont les commandes. Comme mentionné précédemment, les commandes de Faust enveloppent le clic. En fait, Faust attache simplement notre commande personnalisée à son interface lors de la spécification de la clé -A

Après les agents annoncés dans agents.py ajouter une fonction avec un décorateur app.commandeappeler la méthode en fonte у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ainsi, si nous appelons la liste des commandes, notre nouvelle commande sera dedans :

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Nous pouvons l’utiliser comme n’importe qui d’autre, alors redémarrons le Faust Worker et commençons une collection complète de titres :

> faust -A horton.agents start-collect-securities

Qu'est-ce qui va se passer?

Dans la partie suivante, en utilisant les agents restants comme exemple, nous examinerons le mécanisme de puits pour rechercher les extrêmes dans les prix de clôture de l'année et le lancement cron des agents.

C'est tout pour aujourd'hui! Merci d'avoir lu :)

Code pour cette pièce

Tâches de fond sur Faust, Partie II : Agents et équipes

PS Dans la dernière partie, on m'a posé des questions sur Faust et Confluent Kafka (Quelles sont les fonctionnalités de Confluent ?). Il semble que Confluent soit plus fonctionnel à bien des égards, mais le fait est que Faust ne dispose pas d'un support client complet pour Confluent - cela découle de descriptions des restrictions client dans la doc.

Source: habr.com

Ajouter un commentaire