Bakgrunnsoppgaver om Faust, del II: Agenter og lag

Bakgrunnsoppgaver om Faust, del II: Agenter og lag

innholdsfortegnelsen

  1. Del I: Introduksjon

  2. Del II: Agenter og lag

Hva gjør vi her?

Så, så, den andre delen. Som skrevet tidligere, i den vil vi gjøre følgende:

  1. La oss skrive en liten klient for alphavantage på aiohttp med forespørsler om endepunktene vi trenger.

  2. La oss lage en agent som vil samle inn data om verdipapirer og metainformasjon om dem.

Men, dette er hva vi skal gjøre for selve prosjektet, og når det gjelder faust-forskning, vil vi lære å skrive agenter som behandler strømmehendelser fra kafka, samt hvordan man skriver kommandoer (click wrapper), i vårt tilfelle - for manuelle push-meldinger til emnet som agenten overvåker.

Trening

AlphaVantage-klient

La oss først skrive en liten aiohttp-klient for forespørsler til alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Faktisk er alt klart fra det:

  1. AlphaVantage API er ganske enkelt og vakkert designet, så jeg bestemte meg for å gjøre alle forespørsler gjennom metoden construct_query hvor det igjen er et http-kall.

  2. Jeg tar med alle feltene til snake_case for enkelhets skyld.

  3. Vel, logger.catch-dekorasjonen for vakker og informativ tilbakesporing.

PS Ikke glem å legge til alphavantage-tokenet lokalt til config.yml, eller eksporter miljøvariabelen HORTON_SERVICE_APIKEY. Vi mottar en token her.

CRUD klasse

Vi vil ha en verdipapirsamling for å lagre metainformasjon om verdipapirer.

database/security.py

Etter min mening er det ikke nødvendig å forklare noe her, og selve basisklassen er ganske enkel.

få appen()

La oss legge til en funksjon for å lage et applikasjonsobjekt i app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Foreløpig vil vi ha den enkleste applikasjonsopprettingen, litt senere vil vi utvide den, men for ikke å la deg vente, her referanser til App-klassen. Jeg anbefaler deg også å ta en titt på innstillingsklassen, siden den er ansvarlig for de fleste innstillingene.

Hoveddel

Agent for innsamling og vedlikehold av en liste over verdipapirer

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Så først får vi faust-applikasjonsobjektet - det er ganske enkelt. Deretter erklærer vi eksplisitt et emne for vår agent... Her er det verdt å nevne hva det er, hva den interne parameteren er og hvordan dette kan ordnes annerledes.

  1. Emner i kafka, hvis vi vil vite den nøyaktige definisjonen, er det bedre å lese av. dokument, eller du kan lese kompendium på Habré på russisk, hvor alt også reflekteres ganske nøyaktig :)

  2. Intern parameter, beskrevet ganske godt i faust-dokumentet, lar oss konfigurere emnet direkte i koden, selvfølgelig betyr dette parametrene gitt av faust-utviklerne, for eksempel: oppbevaring, oppbevaringspolicy (som standard slette, men du kan angi kompakt), antall partisjoner per emne (skårerå gjøre for eksempel mindre enn global betydning applikasjoner faust).

  3. Generelt kan agenten lage et administrert emne med globale verdier, men jeg liker å erklære alt eksplisitt. I tillegg kan enkelte parametere (for eksempel antall partisjoner eller oppbevaringspolicy) for emnet i agentannonsen ikke konfigureres.

    Slik kan det se ut uten å definere emnet manuelt:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Vel, la oss nå beskrive hva agenten vår vil gjøre :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Så i begynnelsen av agenten åpner vi en aiohttp-sesjon for forespørsler gjennom vår klient. Når du starter en arbeider, når agenten vår er lansert, vil en økt umiddelbart åpnes - én, for hele tiden arbeideren kjører (eller flere, hvis du endrer parameteren samtidighet fra en agent med en standardenhet).

Deretter følger vi strømmen (vi legger inn meldingen _, siden vi, i denne agenten, ikke bryr oss om innholdet) av meldinger fra emnet vårt, hvis de eksisterer med gjeldende forskyvning, ellers vil syklusen vår vente på deres ankomst. Vel, inne i løkken vår logger vi mottaket av meldingen, får en liste over aktive (get_securities returnerer bare aktive som standard, se klientkode) verdipapirer og lagrer det i databasen, sjekker om det er et verdipapir med samme ticker og utveksling i databasen, hvis det er det, vil den (papiret) ganske enkelt bli oppdatert.

La oss lansere vår kreasjon!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS-funksjoner web-komponent Jeg vil ikke vurdere faust i artiklene, så vi setter riktig flagg.

I lanseringskommandoen vår fortalte vi faust hvor de skulle se etter applikasjonsobjektet og hva de skulle gjøre med det (starte en arbeider) med informasjonsloggutdatanivået. Vi får følgende utgang:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Den lever!!!

La oss se på partisjonssettet. Som vi kan se, ble det opprettet et emne med navnet som vi angav i koden, standard antall partisjoner (8, hentet fra emne_partisjoner - applikasjonsobjektparameter), siden vi ikke spesifiserte en individuell verdi for emnet vårt (via partisjoner). Den lanserte agenten i arbeideren er tildelt alle 8 partisjoner, siden den er den eneste, men dette vil bli diskutert mer detaljert i delen om klynging.

Vel, nå kan vi gå til et annet terminalvindu og sende en tom melding til emnet vårt:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS bruker @ vi viser at vi sender en melding til et emne som heter "collect_securities".

I dette tilfellet gikk meldingen til partisjon 6 - du kan sjekke dette ved å gå til kafdrop on localhost:9000

Når vi går til terminalvinduet med arbeideren vår, vil vi se en glad melding sendt med loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Vi kan også se på mongo (ved å bruke Robo3T eller Studio3T) og se at verdipapirene er i databasen:

Jeg er ingen milliardær, og derfor er vi fornøyd med det første visningsalternativet.

Bakgrunnsoppgaver om Faust, del II: Agenter og lagBakgrunnsoppgaver om Faust, del II: Agenter og lag

Lykke og glede - den første agenten er klar :)

Agent klar, lenge leve den nye agenten!

Ja, mine herrer, vi har bare dekket 1/3 av veien utarbeidet av denne artikkelen, men ikke mist motet, for nå blir det lettere.

Så nå trenger vi en agent som samler inn metainformasjon og legger den inn i et innsamlingsdokument:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Siden denne agenten vil behandle informasjon om et bestemt verdipapir, må vi angi ticker (symbol) for dette verdipapiret i meldingen. For dette formålet i faust er det Records — klasser som erklærer meldingsskjemaet i agentemnet.

I dette tilfellet, la oss gå til records.py og beskriv hvordan meldingen for dette emnet skal se ut:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Som du kanskje har gjettet, bruker faust merknaden python-type for å beskrive meldingsskjemaet, og det er grunnen til at minimumsversjonen som støttes av biblioteket er 3.6.

La oss gå tilbake til agenten, angi typene og legge den til:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Som du kan se, sender vi en ny parameter med et skjema til emneinitialiseringsmetoden - verdi_type. Videre følger alt det samme opplegget, så jeg ser ikke noe poeng i å dvele ved noe annet.

Vel, den siste detaljen er å legge til et kall til metainformasjonsinnsamlingsagenten til collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Vi bruker tidligere annonsert ordning for meldingen. I dette tilfellet brukte jeg .cast-metoden siden vi ikke trenger å vente på resultatet fra agenten, men det er verdt å nevne at måter å send en melding til emnet:

  1. cast - blokkerer ikke fordi den ikke forventer et resultat. Du kan ikke sende resultatet til et annet emne som en melding.

  2. send - blokkerer ikke fordi den ikke forventer et resultat. Du kan spesifisere en agent i emnet som resultatet skal gå til.

  3. spør - venter på et resultat. Du kan spesifisere en agent i emnet som resultatet skal gå til.

Så, det er alt med agenter for i dag!

Drømmelaget

Det siste jeg lovet å skrive i denne delen er kommandoer. Som nevnt tidligere, er kommandoer i faust en omslag rundt klikk. Faktisk knytter faust ganske enkelt vår egendefinerte kommando til grensesnittet når den spesifiserer -A-tasten

Etter de annonserte agentene inn agents.py legg til en funksjon med en dekoratør app.kommandokaller metoden kastet у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Derfor, hvis vi kaller listen over kommandoer, vil vår nye kommando være i den:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Vi kan bruke det som alle andre, så la oss starte faust-arbeideren på nytt og starte en fullverdig samling av verdipapirer:

> faust -A horton.agents start-collect-securities

Hva vil skje videre?

I neste del, med de gjenværende agentene som eksempel, vil vi se på synkemekanismen for å søke etter ytterpunkter i sluttkursene for handel for året og kronlanseringen av agenter.

Det var alt for i dag! Takk for at du leste :)

Kode for denne delen

Bakgrunnsoppgaver om Faust, del II: Agenter og lag

PS Under den siste delen ble jeg spurt om faust og konfluent kafka (hvilke funksjoner har confluent?). Det ser ut til at confluent er mer funksjonelt på mange måter, men faktum er at faust ikke har full klientstøtte for confluent – ​​dette følger av beskrivelser av klientrestriksjoner i dok.

Kilde: www.habr.com

Legg til en kommentar