Achtergrondtaken over Faust, deel II: agenten en teams

Achtergrondtaken over Faust, deel II: agenten en teams

inhoudsopgave

  1. Deel I: Inleiding

  2. Deel II: Agenten en teams

Wat doen we hier?

Dus, het tweede deel. Zoals eerder geschreven, zullen we daarin het volgende doen:

  1. Laten we een kleine client voor alphavantage op aiohttp schrijven met verzoeken voor de eindpunten die we nodig hebben.

  2. Laten we een agent creëren die gegevens over effecten en meta-informatie daarover verzamelt.

Maar dit is wat we zullen doen voor het project zelf, en in termen van faustonderzoek zullen we leren hoe we agenten moeten schrijven die stroomgebeurtenissen van Kafka verwerken, en hoe we opdrachten moeten schrijven (klik-wrapper), in ons geval - voor handmatige pushberichten naar het onderwerp dat de agent in de gaten houdt.

Opleiding

AlphaVantage-klant

Laten we eerst een kleine aiohttp-client schrijven voor verzoeken aan alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eigenlijk is alles duidelijk:

  1. De AlphaVantage API is vrij eenvoudig en mooi ontworpen, dus besloot ik alle verzoeken via de methode in te dienen construct_query waar op zijn beurt een http-oproep is.

  2. Ik breng alle velden naar snake_case voor comfort.

  3. Welnu, de logger.catch-decoratie voor mooie en informatieve traceback-uitvoer.

PS Vergeet niet om het alphavantage-token lokaal toe te voegen aan config.yml, of de omgevingsvariabele te exporteren HORTON_SERVICE_APIKEY. Wij ontvangen een token hier.

CRUD-klasse

We zullen een effectencollectie hebben om meta-informatie over effecten op te slaan.

database/security.py

Naar mijn mening is het niet nodig om hier iets uit te leggen, en de basisklasse zelf is vrij eenvoudig.

verkrijg de app()

Laten we een functie toevoegen voor het maken van een toepassingsobject in app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Voorlopig zullen we de eenvoudigste applicatiecreatie hebben, iets later zullen we deze uitbreiden, maar om u niet te laten wachten, hier referenties naar App-klasse. Ik raad je ook aan om eens naar de instellingenklasse te kijken, aangezien deze verantwoordelijk is voor de meeste instellingen.

Het grootste deel

Agent voor het verzamelen en bijhouden van een lijst met effecten

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Dus eerst krijgen we het Faust-toepassingsobject - het is vrij eenvoudig. Vervolgens declareren we expliciet een onderwerp voor onze agent... Hier is het de moeite waard om te vermelden wat het is, wat de interne parameter is en hoe dit anders kan worden geregeld.

  1. Onderwerpen in kafka, als we de exacte definitie willen weten, is het beter om te lezen uit. document, of je kunt lezen abstract op Habré in het Russisch, waar alles ook vrij nauwkeurig wordt weergegeven :)

  2. Parameter intern, vrij goed beschreven in het faust-document, stelt ons in staat om het onderwerp rechtstreeks in de code te configureren, dit betekent natuurlijk de parameters die door de faust-ontwikkelaars zijn verstrekt, bijvoorbeeld: retentie, retentiebeleid (standaard verwijderd, maar je kunt compact), aantal partities per onderwerp (scoresom bijvoorbeeld minder te doen dan mondiale betekenis toepassingen faus).

  3. Over het algemeen kan de agent een beheerd onderwerp met globale waarden maken, maar ik wil alles graag expliciet declareren. Bovendien kunnen sommige parameters (bijvoorbeeld het aantal partities of het bewaarbeleid) van het onderwerp in de agentadvertentie niet worden geconfigureerd.

    Zo zou het eruit kunnen zien zonder het onderwerp handmatig te definiëren:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Laten we nu beschrijven wat onze agent zal doen :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dus aan het begin van de agent openen we een aiohttp-sessie voor verzoeken via onze klant. Wanneer u dus een werker start en onze agent wordt gestart, wordt er onmiddellijk een sessie geopend: één, voor de hele tijd dat de werker actief is (of meerdere, als u de parameter wijzigt samenloop van een agent met een standaardeenheid).

Vervolgens volgen we de stream (we plaatsen het bericht in _, aangezien wij, in deze agent, niet om de inhoud geven) van berichten van ons onderwerp, als ze op de huidige offset bestaan, anders zal onze cyclus wachten op hun aankomst. Welnu, binnen onze lus registreren we de ontvangst van het bericht, krijgen een lijst met actieve effecten (get_securities retourneert standaard alleen actief, zie klantcode) en slaan deze op in de database, waarbij we controleren of er een effect is met dezelfde ticker en uitwisseling in de database, als die er is, dan wordt deze (het papier) eenvoudigweg bijgewerkt.

Laten we onze creatie lanceren!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS-functies webcomponent Ik zal Faust niet in de artikelen beschouwen, dus hebben we de juiste vlag geplaatst.

In ons startcommando vertelden we Faust waar we naar het applicatieobject moesten zoeken en wat we ermee moesten doen (een werker starten) met het infolog-uitvoerniveau. We krijgen de volgende uitvoer:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Het leeft!!!

Laten we eens kijken naar de partitieset. Zoals we kunnen zien, is er een onderwerp gemaakt met de naam die we in de code hebben aangegeven, het standaardaantal partities (8, overgenomen van onderwerp_partities - applicatieobjectparameter), omdat we geen individuele waarde voor ons onderwerp hebben opgegeven (via partities). De gelanceerde agent in de worker krijgt alle 8 partities toegewezen, omdat dit de enige is, maar dit zal in meer detail worden besproken in het deel over clustering.

Welnu, nu kunnen we naar een ander terminalvenster gaan en een leeg bericht naar ons onderwerp sturen:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS gebruikt @ we laten zien dat we een bericht sturen naar een onderwerp met de naam “collect_securities”.

In dit geval ging het bericht naar partitie 6. U kunt dit controleren door naar kafdrop on te gaan localhost:9000

Als we met onze medewerker naar het terminalvenster gaan, zien we een vrolijk bericht verzonden met loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

We kunnen ook naar mongo kijken (met behulp van Robo3T of Studio3T) en zien dat de effecten in de database staan:

Ik ben geen miljardair en daarom zijn we tevreden met de eerste kijkmogelijkheid.

Achtergrondtaken over Faust, deel II: agenten en teamsAchtergrondtaken over Faust, deel II: agenten en teams

Geluk en vreugde - de eerste agent is klaar :)

Agent klaar, lang leve de nieuwe agent!

Ja heren, we hebben slechts 1/3 van het pad afgelegd dat in dit artikel is voorbereid, maar wees niet ontmoedigd, want nu zal het gemakkelijker zijn.

Dus nu hebben we een agent nodig die meta-informatie verzamelt en deze in een verzameldocument plaatst:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Omdat deze agent informatie over een specifiek effect verwerkt, moeten we de ticker (symbool) van dit effect in het bericht vermelden. Voor dit doel zijn er in Faust Archief — klassen die het berichtenschema in het agentonderwerp declareren.

In dit geval gaan we naar records.py en beschrijf hoe de boodschap voor dit onderwerp eruit zou moeten zien:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Zoals je misschien al geraden hebt, gebruikt Faust de annotatie van het Python-type om het berichtenschema te beschrijven. Daarom is de minimale versie die door de bibliotheek wordt ondersteund 3.6.

Laten we terugkeren naar de agent, de typen instellen en deze toevoegen:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Zoals u kunt zien, geven we een nieuwe parameter met een schema door aan de onderwerpinitialisatiemethode: waarde_type. Verder volgt alles hetzelfde schema, dus ik zie geen enkel nut om bij iets anders stil te staan.

Welnu, de laatste hand is het toevoegen van een oproep aan de meta-informatieverzamelingsagent om collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Voor het bericht gebruiken we het eerder aangekondigde schema. In dit geval heb ik de .cast-methode gebruikt, omdat we niet hoeven te wachten op het resultaat van de agent, maar het is de moeite waard om dat te vermelden manieren stuur een bericht naar het onderwerp:

  1. cast - blokkeert niet omdat er geen resultaat wordt verwacht. Je kunt het resultaat niet als bericht naar een ander onderwerp sturen.

  2. verzenden - blokkeert niet omdat er geen resultaat wordt verwacht. U kunt een agent opgeven in het onderwerp waar het resultaat naartoe gaat.

  3. vragen - wacht op een resultaat. U kunt een agent opgeven in het onderwerp waar het resultaat naartoe gaat.

Dit was dus alles met agenten voor vandaag!

Het droomteam

Het laatste dat ik beloofde in dit deel te schrijven zijn opdrachten. Zoals eerder vermeld, zijn opdrachten in Faust een omhulsel rond klikken. In feite koppelt Faust eenvoudigweg ons aangepaste commando aan zijn interface bij het specificeren van de -A-sleutel

Nadat de aangekondigde agenten binnen waren agenten.py voeg een functie toe met een decorateur app.opdrachtde methode aanroepen gegoten у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dus als we de lijst met opdrachten oproepen, zal onze nieuwe opdracht erin staan:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

We kunnen het net als ieder ander gebruiken, dus laten we de faust-werker opnieuw opstarten en beginnen met een volwaardige verzameling van effecten:

> faust -A horton.agents start-collect-securities

Wat gebeurt er daarna?

In het volgende deel zullen we, met de overige agenten als voorbeeld, het sink-mechanisme beschouwen voor het zoeken naar extremen in de slotkoersen van de handel voor het jaar en de cron-lancering van agenten.

Dat is alles voor vandaag! Bedankt voor het lezen :)

Codeer voor dit onderdeel

Achtergrondtaken over Faust, deel II: agenten en teams

PS In het laatste deel werd mij gevraagd naar faust en confluente kafka (welke kenmerken heeft confluent?). Het lijkt erop dat confluent in veel opzichten functioneler is, maar feit is dat Faust geen volledige klantenondersteuning heeft voor confluent - dit volgt uit beschrijvingen van clientbeperkingen in het document.

Bron: www.habr.com

Voeg een reactie