Pozadinski zadaci na Faustu, II. dio: Agenti i timovi

Pozadinski zadaci na Faustu, II. dio: Agenti i timovi

pregled sadržaja

  1. Dio I: Uvod

  2. Dio II: Agenti i timovi

Što radimo ovdje?

Tako, tako, drugi dio. Kao što je ranije napisano, u njemu ćemo učiniti sljedeće:

  1. Napišimo mali klijent za alphavantage na aiohttp sa zahtjevima za krajnje točke koje trebamo.

  2. Kreirajmo agenta koji će prikupljati podatke o vrijednosnim papirima i meta informacije o njima.

No, to je ono što ćemo učiniti za sam projekt, au smislu istraživanja fausta, naučit ćemo kako pisati agente koji obrađuju stream događaje iz kafke, kao i kako pisati naredbe (click wrapper), u našem slučaju - za ručno slanje poruka na temu koju agent nadzire.

Trening

AlphaVantage klijent

Prvo, napišimo mali aiohttp klijent za zahtjeve za alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Zapravo, sve je jasno iz njega:

  1. AlphaVantage API prilično je jednostavno i lijepo dizajniran, pa sam odlučio sve zahtjeve postavljati putem metode construct_query gdje pak postoji http poziv.

  2. Donosim sva polja u snake_case radi praktičnosti.

  3. Pa, ukras logger.catch za prekrasan i informativan traceback izlaz.

PS Ne zaboravite dodati alphavantage token lokalno u config.yml ili izvesti varijablu okruženja HORTON_SERVICE_APIKEY. Primamo žeton ovdje.

CRUD klasa

Imat ćemo zbirku vrijednosnih papira za pohranu meta informacija o vrijednosnim papirima.

baza podataka/sigurnost.py

Po mom mišljenju, ovdje nema potrebe ništa objašnjavati, a sama osnovna klasa je prilično jednostavna.

get_app()

Dodajmo funkciju za stvaranje aplikacijskog objekta app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Za sada ćemo imati najjednostavniju izradu aplikacije, nešto kasnije ćemo je proširiti, no kako ne biste čekali, ovdje reference u App-class. Također vam savjetujem da pogledate klasu postavki, jer je ona odgovorna za većinu postavki.

Glavno tijelo

Agent za prikupljanje i održavanje popisa vrijednosnih papira

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Dakle, prvo dobivamo faust aplikacijski objekt - prilično je jednostavan. Zatim izričito deklariramo temu za našeg agenta... Ovdje je vrijedno spomenuti što je to, koji je interni parametar i kako se to može drugačije urediti.

  1. Teme u Kafki, ako želimo znati točnu definiciju, bolje je pročitati isključeno. dokument, ili možete pročitati kompendij na Habréu na ruskom, gdje se sve također odražava prilično točno :)

  2. Parametar unutarnji, prilično dobro opisan u faust dokumentu, omogućuje nam da konfiguriramo temu izravno u kodu, naravno, to znači parametre koje su osigurali faust programeri, na primjer: zadržavanje, politika zadržavanja (prema zadanim postavkama brisanje, ali možete postaviti kompaktni), broj particija po temi (pregradeučiniti, na primjer, manje od globalni značaj aplikacije faust).

  3. Općenito, agent može stvoriti upravljanu temu s globalnim vrijednostima, međutim, volim sve deklarirati eksplicitno. Osim toga, neki parametri (na primjer, broj particija ili politika zadržavanja) teme u oglasu agenta ne mogu se konfigurirati.

    Evo kako bi to moglo izgledati bez ručnog definiranja teme:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Pa, hajmo sada opisati što će naš agent učiniti :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dakle, na početku agenta otvaramo aiohttp sesiju za zahtjeve preko našeg klijenta. Dakle, prilikom pokretanja workera, kada se pokrene naš agent, odmah će se otvoriti sesija - jedna, za cijelo vrijeme dok se worker izvodi (ili nekoliko, ako promijenite parametar konkurentnost od agenta sa zadanom jedinicom).

Zatim slijedimo tok (smještamo poruku u _, budući da mi, u ovom agentu, ne brinemo o sadržaju) poruka iz naše teme, ako postoje na trenutnom pomaku, inače će naš ciklus čekati njihov dolazak. Pa, unutar naše petlje, bilježimo primitak poruke, dobivamo popis aktivnih (get_securities vraća samo aktivne prema zadanim postavkama, pogledajte kod klijenta) vrijednosnih papira i spremamo ga u bazu podataka, provjeravajući postoji li vrijednosni papir s istom oznakom i razmjene u bazi podataka , ako postoji, onda će se (papir) jednostavno ažurirati.

Pokrenimo našu kreaciju!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS značajke web komponenta Fausta neću razmatrati u člancima, pa smo postavili odgovarajuću zastavicu.

U našoj naredbi za pokretanje, rekli smo Faustu gdje da traži objekt aplikacije i što da radi s njim (pokrene worker) s razinom izlaza info dnevnika. Dobivamo sljedeći izlaz:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Živo je!!!

Pogledajmo particijski skup. Kao što vidimo, kreirana je tema s nazivom koji smo odredili u kodu, zadani broj particija (8, preuzeto iz particije_tema - parametar objekta aplikacije), budući da nismo naveli pojedinačnu vrijednost za našu temu (putem particija). Pokrenutom agentu u workeru dodjeljuje se svih 8 particija, jer je on jedini, ali o tome će biti više riječi u dijelu o klasteriranju.

Pa, sada možemo otići do drugog prozora terminala i poslati praznu poruku našoj temi:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS korištenje @ pokazujemo da šaljemo poruku u temu pod nazivom “collect_securities”.

U ovom slučaju, poruka je otišla na particiju 6 - to možete provjeriti odlaskom na kafdrop on localhost:9000

Idemo do prozora terminala s našim radnikom, vidjet ćemo sretnu poruku poslanu pomoću loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Također možemo pogledati u mongo (koristeći Robo3T ili Studio3T) i vidjeti da su vrijednosni papiri u bazi podataka:

Ja nisam milijarder i stoga smo zadovoljni prvom opcijom gledanja.

Pozadinski zadaci na Faustu, II. dio: Agenti i timoviPozadinski zadaci na Faustu, II. dio: Agenti i timovi

Sreća i veselje - prvi agent je spreman :)

Agent spreman, živio novi agent!

Da, gospodo, prešli smo samo 1/3 staze koju je pripremio ovaj članak, ali nemojte se obeshrabriti, jer sada će biti lakše.

Dakle, sada nam treba agent koji prikuplja meta informacije i stavlja ih u dokument zbirke:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Budući da će ovaj agent obrađivati ​​podatke o određenom vrijednosnom papiru, u poruci moramo naznačiti ticker (simbol) tog vrijednosnog papira. U tu svrhu u faustu postoje Ploče — klase koje deklariraju shemu poruka u temi agenta.

U ovom slučaju, idemo na zapisi.py i opišite kako bi trebala izgledati poruka za ovu temu:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kao što ste možda pogodili, faust koristi python tipsku napomenu za opisivanje sheme poruke, zbog čega je minimalna verzija koju podržava biblioteka 3.6.

Vratimo se agentu, postavimo tipove i dodamo ga:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kao što vidite, mi prosljeđujemo novi parametar sa shemom metodi inicijalizacije teme - value_type. Dalje, sve ide po istoj shemi, tako da ne vidim smisla zadržavati se na bilo čemu drugom.

Pa, posljednji detalj je dodavanje poziva agentu za prikupljanje meta informacija za collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Koristimo prethodno najavljenu shemu za poruku. U ovom slučaju koristio sam .cast metodu jer ne moramo čekati rezultat od agenta, ali vrijedi napomenuti da načine pošalji poruku na temu:

  1. cast - ne blokira jer ne očekuje rezultat. Ne možete poslati rezultat u drugu temu kao poruku.

  2. poslati - ne blokira jer ne očekuje rezultat. Možete odrediti agenta u temi na koju će ići rezultat.

  3. pitati - čeka rezultat. Možete odrediti agenta u temi na koju će ići rezultat.

Dakle, to je sve s agentima za danas!

Ekipa iz snova

Zadnje što sam obećao napisati u ovom dijelu su naredbe. Kao što je ranije spomenuto, naredbe u faustu su omotač oko klika. Zapravo, faust jednostavno pridaje našu prilagođenu naredbu svom sučelju kada navede ključ -A

Nakon najavljenih agenata u agenti.py dodajte funkciju s dekoratorom aplikacija.naredbapozivanje metode baciti у prikupiti_securitite:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Stoga, ako pozovemo popis naredbi, naša nova naredba će biti u njemu:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Možemo ga koristiti kao bilo tko drugi, stoga ponovno pokrenimo faust worker i započnimo potpunu kolekciju vrijednosnih papira:

> faust -A horton.agents start-collect-securities

Što će biti dalje?

U sljedećem dijelu, koristeći preostale agente kao primjer, razmotrit ćemo mehanizam potonuća za traženje ekstrema u cijenama zatvaranja trgovanja za godinu i pokretanje agenata cron.

To je sve za danas! Hvala na čitanju :)

Kod za ovaj dio

Pozadinski zadaci na Faustu, II. dio: Agenti i timovi

PS Pod prošlim dijelom su me pitali o faustu i konfluentnoj kafki (koje značajke ima konfluent?). Čini se da je confluent funkcionalniji na mnogo načina, ali činjenica je da faust nema punu klijentsku podršku za confluent - to slijedi iz opisi ograničenja klijenta u dok.

Izvor: www.habr.com

Dodajte komentar