Baggrundsopgaver om Faust, del II: Agenter og teams

Baggrundsopgaver om Faust, del II: Agenter og teams

indholdsfortegnelse

  1. Del I: Introduktion

  2. Del II: Agenter og teams

Hvad laver vi her?

Altså, anden del. Som skrevet tidligere vil vi i den gøre følgende:

  1. Lad os skrive en lille klient til alphavantage på aiohttp med anmodninger om de endepunkter, vi har brug for.

  2. Lad os oprette en agent, der vil indsamle data om værdipapirer og metaoplysninger om dem.

Men det er, hvad vi vil gøre for selve projektet, og med hensyn til faust research, vil vi lære, hvordan man skriver agenter, der behandler stream-hændelser fra kafka, samt hvordan man skriver kommandoer (click wrapper), i vores tilfælde - for manuelle push-meddelelser til det emne, som agenten overvåger.

Træning

AlphaVantage klient

Lad os først skrive en lille aiohttp-klient for anmodninger om alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Faktisk er alt klart fra det:

  1. AlphaVantage API er ganske enkelt og smukt designet, så jeg besluttede at lave alle anmodninger gennem metoden construct_query hvor der til gengæld er et http-kald.

  2. Jeg bringer alle markerne til snake_case for komfort.

  3. Nå, logger.catch-dekorationen til smuk og informativ traceback-output.

PS Glem ikke at tilføje alphavantage-tokenet lokalt til config.yml, eller eksportere miljøvariablen HORTON_SERVICE_APIKEY. Vi modtager et token her.

CRUD klasse

Vi vil have en værdipapirsamling til at gemme metaoplysninger om værdipapirer.

database/security.py

Efter min mening er der ingen grund til at forklare noget her, og selve basisklassen er ret simpel.

get_app()

Lad os tilføje en funktion til at oprette et applikationsobjekt i app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

For nu vil vi have den enkleste applikationsoprettelse, lidt senere udvider vi den dog for ikke at lade dig vente, her referencer til App-klasse. Jeg råder dig også til at tage et kig på indstillingsklassen, da den er ansvarlig for de fleste af indstillingerne.

Hoveddelen

Agent for indsamling og vedligeholdelse af en liste over værdipapirer

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Så først får vi faust-applikationsobjektet - det er ret simpelt. Dernæst erklærer vi eksplicit et emne for vores agent... Her er det værd at nævne, hvad det er, hvad den interne parameter er, og hvordan dette kan arrangeres anderledes.

  1. Emner i kafka, hvis vi vil kende den nøjagtige definition, er det bedre at læse af. dokument, eller du kan læse kompendium på Habré på russisk, hvor alt også afspejles ret præcist :)

  2. Intern parameter, som er ganske godt beskrevet i faust-dokumentet, giver os mulighed for at konfigurere emnet direkte i koden, det betyder selvfølgelig de parametre, som faust-udviklerne har leveret, for eksempel: retention, retention policy (som standard slette, men du kan indstille kompakt), antal partitioner pr. emne (scoresat gøre for eksempel mindre end global betydning applikationer faust).

  3. Generelt kan agenten oprette et administreret emne med globale værdier, dog kan jeg godt lide at erklære alt eksplicit. Derudover kan nogle parametre (f.eks. antallet af partitioner eller opbevaringspolitik) for emnet i agentannoncen ikke konfigureres.

    Sådan kan det se ud uden manuelt at definere emnet:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nå, lad os nu beskrive, hvad vores agent vil gøre :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Så i begyndelsen af ​​agenten åbner vi en aiohttp-session for anmodninger gennem vores klient. Når en arbejder startes, når vores agent startes, vil der straks blive åbnet en session - én, for hele tiden arbejderen kører (eller flere, hvis du ændrer parameteren samtidighed fra en agent med en standardenhed).

Dernæst følger vi strømmen (vi placerer beskeden i _, da vi i denne agent er ligeglade med indholdet) af meddelelser fra vores emne, hvis de eksisterer ved den aktuelle offset, ellers vil vores cyklus vente på deres ankomst. Nå, inde i vores løkke logger vi modtagelsen af ​​meddelelsen, får en liste over aktive (get_securities returnerer kun aktive som standard, se klientkode) værdipapirer og gemmer det i databasen, tjekker om der er et værdipapir med samme ticker og udveksling i databasen, hvis der er, så vil den (papiret) simpelthen blive opdateret.

Lad os lancere vores skabelse!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS funktioner web-komponent Jeg vil ikke overveje faust i artiklerne, så vi sætter det passende flag.

I vores startkommando fortalte vi faust, hvor man skulle lede efter applikationsobjektet, og hvad man skulle gøre med det (start en arbejder) med informationslogoutputniveauet. Vi får følgende output:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Den lever!!!

Lad os se på partitionssættet. Som vi kan se, blev der oprettet et emne med det navn, som vi angav i koden, standardantallet af partitioner (8, taget fra emne_partitioner - application object parameter), da vi ikke specificerede en individuel værdi for vores emne (via partitioner). Den lancerede agent i arbejderen er tildelt alle 8 partitioner, da den er den eneste, men dette vil blive diskuteret mere detaljeret i delen om klyngedannelse.

Nå, nu kan vi gå til et andet terminalvindue og sende en tom besked til vores emne:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS bruger @ vi viser, at vi sender en besked til et emne med navnet "collect_securities".

I dette tilfælde gik beskeden til partition 6 - du kan tjekke dette ved at gå til kafdrop on localhost:9000

Går vi til terminalvinduet med vores arbejder, vil vi se en glad besked sendt med loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Vi kan også kigge på mongo (ved hjælp af Robo3T eller Studio3T) og se, at værdipapirerne er i databasen:

Jeg er ikke milliardær, og derfor nøjes vi med den første visningsmulighed.

Baggrundsopgaver om Faust, del II: Agenter og teamsBaggrundsopgaver om Faust, del II: Agenter og teams

Lykke og glæde - den første agent er klar :)

Agent klar, længe leve den nye agent!

Ja, mine herrer, vi har kun dækket 1/3 af den vej, som denne artikel har forberedt, men bliv ikke afskrækket, for nu bliver det nemmere.

Så nu har vi brug for en agent, der indsamler metainformation og sætter den i et indsamlingsdokument:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Da denne agent vil behandle oplysninger om et specifikt værdipapir, er vi nødt til at angive ticker (symbol) for dette værdipapir i meddelelsen. Til dette formål i faust er der Records — klasser, der erklærer meddelelsesskemaet i agentemnet.

I dette tilfælde, lad os gå til records.py og beskriv hvordan budskabet til dette emne skal se ud:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Som du måske har gættet, bruger faust annoteringen af ​​python-typen til at beskrive meddelelsesskemaet, hvorfor den minimumsversion, der understøttes af biblioteket, er 3.6.

Lad os vende tilbage til agenten, indstille typerne og tilføje det:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Som du kan se, sender vi en ny parameter med et skema til emneinitialiseringsmetoden - værdi_type. Yderligere følger alt det samme skema, så jeg kan ikke se nogen mening i at dvæle ved andet.

Nå, den sidste touch er at tilføje et opkald til metainformationsindsamlingsagenten til collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Vi bruger den tidligere udmeldte ordning til beskeden. I dette tilfælde brugte jeg .cast-metoden, da vi ikke behøver at vente på resultatet fra agenten, men det er værd at nævne, at måder send en besked til emnet:

  1. cast - blokerer ikke, fordi den ikke forventer et resultat. Du kan ikke sende resultatet til et andet emne som en besked.

  2. send - blokerer ikke, fordi den ikke forventer et resultat. Du kan angive en agent i emnet, som resultatet skal gå til.

  3. spørge - venter på et resultat. Du kan angive en agent i emnet, som resultatet skal gå til.

Så det er alt med agenter for i dag!

Drømmeholdet

Det sidste, jeg lovede at skrive i denne del, er kommandoer. Som tidligere nævnt er kommandoer i faust en indpakning omkring klik. Faktisk knytter faust blot vores brugerdefinerede kommando til dens grænseflade, når han angiver -A-tasten

Efter de annoncerede agenter ind agents.py tilføje en funktion med en dekoratør app.kommandokalder metoden støbt у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Så hvis vi kalder listen over kommandoer, vil vores nye kommando være i den:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Vi kan bruge det som alle andre, så lad os genstarte faust-arbejderen og begynde en fuldgyldig samling af værdipapirer:

> faust -A horton.agents start-collect-securities

Hvad sker der nu?

I den næste del, ved at bruge de resterende agenter som eksempel, vil vi overveje synkemekanismen til at søge efter ekstremer i årets lukkepriser for handel og lanceringen af ​​agenter.

Det var alt for i dag! Tak fordi du læste med :)

Kode for denne del

Baggrundsopgaver om Faust, del II: Agenter og teams

PS Under den sidste del blev jeg spurgt om faust og konfluent kafka (hvilke funktioner har confluent?). Det ser ud til, at confluent er mere funktionelt på mange måder, men faktum er, at faust ikke har fuld klientsupport til confluent - dette følger af beskrivelser af klientrestriktioner i dok.

Kilde: www.habr.com

Tilføj en kommentar