Hintergrundaufgaben zu Faust, Teil II: Agenten und Teams

Hintergrundaufgaben zu Faust, Teil II: Agenten und Teams

Inhaltsverzeichnis

  1. Teil I: Einführung

  2. Teil II: Agenten und Teams

Was machen wir hier?

So, so, der zweite Teil. Wie bereits geschrieben, werden wir darin Folgendes tun:

  1. Schreiben wir einen kleinen Client für Alphavantage auf aiohttp mit Anfragen für die Endpunkte, die wir benötigen.

  2. Erstellen wir einen Agenten, der Daten zu Wertpapieren und Metainformationen dazu sammelt.

Aber genau das werden wir für das Projekt selbst tun, und im Hinblick auf die Faust-Forschung werden wir lernen, wie man Agenten schreibt, die Stream-Ereignisse von Kafka verarbeiten, sowie wie man Befehle schreibt (Click-Wrapper), in unserem Fall – für manuelle Push-Nachrichten an das Thema, das der Agent überwacht.

Training

AlphaVantage-Client

Schreiben wir zunächst einen kleinen aiohttp-Client für Anfragen an Alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eigentlich ist daraus alles klar:

  1. Die AlphaVantage-API ist recht einfach und schön gestaltet, daher habe ich beschlossen, alle Anfragen über die Methode zu stellen construct_query wo wiederum ein http-Aufruf erfolgt.

  2. Ich bringe alle Felder mit snake_case Für Komfort.

  3. Nun, die logger.catch-Dekoration für eine schöne und informative Traceback-Ausgabe.

PS Vergessen Sie nicht, das Alphavantage-Token lokal zu config.yml hinzuzufügen oder die Umgebungsvariable zu exportieren HORTON_SERVICE_APIKEY. Wir erhalten einen Token hier.

CRUD-Klasse

Wir werden über eine Wertpapiersammlung verfügen, um Metainformationen über Wertpapiere zu speichern.

Datenbank/Sicherheit.py

Meiner Meinung nach muss hier nichts erklärt werden, und die Basisklasse selbst ist recht einfach.

die App holen()

Fügen wir eine Funktion zum Erstellen eines Anwendungsobjekts hinzu app.py.

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Vorerst haben wir die einfachste Anwendungserstellung, etwas später werden wir sie jedoch erweitern, um Sie hier nicht warten zu lassen Verweise zur App-Klasse. Ich empfehle Ihnen auch, einen Blick auf die Settings-Klasse zu werfen, da diese für die meisten Einstellungen verantwortlich ist.

Startseite

Beauftragter für die Sammlung und Führung einer Wertpapierliste

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Zuerst erhalten wir also das Faust-Anwendungsobjekt – es ist ganz einfach. Als nächstes deklarieren wir explizit ein Thema für unseren Agenten... Hier ist es erwähnenswert, was es ist, was der interne Parameter ist und wie dieser anders angeordnet werden kann.

  1. Themen in Kafka, wenn wir die genaue Definition wissen wollen, ist es besser, sie zu lesen aus. dokumentieren, oder Sie können lesen Kompendium auf Habré auf Russisch, wo auch alles ziemlich genau wiedergegeben wird :)

  2. Parameter intern, im Faust-Dokument recht gut beschrieben, ermöglicht es uns, das Thema direkt im Code zu konfigurieren. Dies bedeutet natürlich die von den Faust-Entwicklern bereitgestellten Parameter, zum Beispiel: Aufbewahrung, Aufbewahrungsrichtlinie (standardmäßig löschen, aber Sie können festlegen kompakt), Anzahl der Partitionen pro Thema (Partiturenzum Beispiel weniger als tun globale Bedeutung Anwendungen Faust).

  3. Im Allgemeinen kann der Agent ein verwaltetes Thema mit globalen Werten erstellen, ich deklariere jedoch gerne alles explizit. Darüber hinaus können einige Parameter (z. B. die Anzahl der Partitionen oder die Aufbewahrungsrichtlinie) des Themas in der Agent-Ankündigung nicht konfiguriert werden.

    So könnte es aussehen, ohne das Thema manuell zu definieren:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nun, jetzt beschreiben wir, was unser Agent tun wird :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Zu Beginn des Agenten eröffnen wir also eine aiohttp-Sitzung für Anfragen über unseren Client. Wenn also ein Worker gestartet wird und unser Agent gestartet wird, wird sofort eine Sitzung geöffnet – eine für die gesamte Laufzeit des Workers (oder mehrere, wenn Sie den Parameter ändern). Parallelität von einem Agenten mit einer Standardeinheit).

Als nächstes folgen wir dem Stream (wir platzieren die Nachricht in _, da uns in diesem Agenten der Inhalt von Nachrichten aus unserem Thema egal ist, wenn sie am aktuellen Offset vorhanden sind, andernfalls wartet unser Zyklus auf ihre Ankunft. Nun, innerhalb unserer Schleife protokollieren wir den Empfang der Nachricht, rufen eine Liste der aktiven Wertpapiere ab (get_securities gibt standardmäßig nur aktive zurück, siehe Client-Code), speichern sie in der Datenbank und prüfen, ob es ein Wertpapier mit demselben Ticker gibt Austausch in der Datenbank, wenn ja, dann wird es (das Papier) einfach aktualisiert.

Lasst uns unsere Kreation starten!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS-Funktionen Webkomponente Ich werde Faust in den Artikeln nicht berücksichtigen, daher setzen wir die entsprechende Flagge.

In unserem Startbefehl haben wir Faust anhand der Info-Log-Ausgabeebene mitgeteilt, wo nach dem Anwendungsobjekt gesucht werden soll und was damit zu tun ist (einen Worker starten). Wir erhalten die folgende Ausgabe:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Es lebt!!!

Schauen wir uns den Partitionssatz an. Wie wir sehen können, wurde ein Thema mit dem Namen erstellt, den wir im Code angegeben haben, der Standardanzahl von Partitionen (8, entnommen aus topic_partitions - Anwendungsobjektparameter), da wir für unser Thema (über Partitionen) keinen individuellen Wert angegeben haben. Dem gestarteten Agenten im Worker werden alle 8 Partitionen zugewiesen, da er der einzige ist, worauf aber im Teil über Clustering noch genauer eingegangen wird.

Nun können wir zu einem anderen Terminalfenster gehen und eine leere Nachricht an unser Thema senden:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS mit @ Wir zeigen, dass wir eine Nachricht an ein Thema namens „collect_securities“ senden.

In diesem Fall ging die Nachricht an Partition 6 – Sie können dies überprüfen, indem Sie auf kafdrop gehen localhost:9000

Wenn wir mit unserem Mitarbeiter zum Terminalfenster gehen, sehen wir eine glückliche Nachricht, die mit loguru gesendet wurde:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Wir können auch einen Blick in Mongo werfen (mit Robo3T oder Studio3T) und sehen, dass sich die Wertpapiere in der Datenbank befinden:

Ich bin kein Milliardär und deshalb begnügen wir uns mit der ersten Ansichtsoption.

Hintergrundaufgaben zu Faust, Teil II: Agenten und TeamsHintergrundaufgaben zu Faust, Teil II: Agenten und Teams

Glück und Freude – der erste Agent ist fertig :)

Agent bereit, es lebe der neue Agent!

Ja, meine Herren, wir haben nur ein Drittel des in diesem Artikel vorbereiteten Weges zurückgelegt, aber lassen Sie sich nicht entmutigen, denn jetzt wird es einfacher.

Jetzt brauchen wir also einen Agenten, der Metainformationen sammelt und in ein Sammlungsdokument einfügt:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Da dieser Agent Informationen zu einem bestimmten Wertpapier verarbeitet, müssen wir in der Nachricht den Ticker (Symbol) dieses Wertpapiers angeben. Zu diesem Zweck gibt es im Faust Aufzeichnungen – Klassen, die das Nachrichtenschema im Agententhema deklarieren.

In diesem Fall gehen wir zu Records.py und beschreiben Sie, wie die Nachricht für dieses Thema aussehen sollte:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Wie Sie vielleicht schon vermutet haben, verwendet Faust die Python-Typ-Annotation, um das Nachrichtenschema zu beschreiben, weshalb die von der Bibliothek unterstützte Mindestversion ist 3.6.

Kehren wir zum Agenten zurück, legen die Typen fest und fügen ihn hinzu:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Wie Sie sehen können, übergeben wir einen neuen Parameter mit einem Schema an die Topic-Initialisierungsmethode – value_type. Außerdem läuft alles nach dem gleichen Schema ab, sodass ich keinen Sinn darin sehe, mich mit irgendetwas anderem zu befassen.

Nun, der letzte Schliff besteht darin, einen Aufruf an den Metainformations-Sammelagenten „collect_securitites“ hinzuzufügen:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Wir verwenden für die Nachricht das zuvor angekündigte Schema. In diesem Fall habe ich die .cast-Methode verwendet, da wir nicht auf das Ergebnis des Agenten warten müssen, aber das ist erwähnenswert Wege Senden Sie eine Nachricht an das Thema:

  1. cast – blockiert nicht, da kein Ergebnis erwartet wird. Sie können das Ergebnis nicht als Nachricht an ein anderes Thema senden.

  2. send – blockiert nicht, da kein Ergebnis erwartet wird. Sie können im Thema einen Agenten angeben, an den das Ergebnis gesendet werden soll.

  3. fragen – wartet auf ein Ergebnis. Sie können im Thema einen Agenten angeben, an den das Ergebnis gesendet werden soll.

Das ist also alles mit den Agenten für heute!

Das Traumteam

Das Letzte, was ich versprochen habe, in diesem Teil zu schreiben, sind Befehle. Wie bereits erwähnt, sind Befehle in Faust eine Umhüllung um den Klick. Tatsächlich hängt Faust einfach unseren benutzerdefinierten Befehl an seine Schnittstelle an, wenn er den Schlüssel -A angibt

Nach den angekündigten Agenten agenten.py Fügen Sie eine Funktion mit einem Dekorator hinzu app.commandAufruf der Methode werfen у Collect_Securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Wenn wir also die Liste der Befehle aufrufen, befindet sich unser neuer Befehl darin:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Wir können es wie jeder andere verwenden, also starten wir den Faust-Worker neu und beginnen mit einer vollständigen Sammlung von Wertpapieren:

> faust -A horton.agents start-collect-securities

Was wird als nächstes passieren?

Im nächsten Teil betrachten wir am Beispiel der verbleibenden Agenten den Sink-Mechanismus für die Suche nach Extremwerten in den Schlusskursen des Jahreshandels und die Cron-Einführung von Agenten.

Das ist alles für heute! Danke fürs Lesen :)

Code für diesen Teil

Hintergrundaufgaben zu Faust, Teil II: Agenten und Teams

PS: Im letzten Teil wurde ich nach Faust und Confluent Kafka gefragt (Welche Funktionen hat Confluent?). Es scheint, dass Confluent in vielerlei Hinsicht funktionaler ist, aber Tatsache ist, dass Faust keine vollständige Client-Unterstützung für Confluent bietet – dies folgt aus Beschreibungen der Client-Einschränkungen im Dokument.

Source: habr.com

Kommentar hinzufügen