Osnovni zadaci o Faustu, II dio: Agenti i timovi

Osnovni zadaci o Faustu, II dio: Agenti i timovi

Sadržaj

  1. Dio I: Uvod

  2. Dio II: Agenti i timovi

Šta mi radimo ovde?

Dakle, tako, drugi dio. Kao što je ranije napisano, u njemu ćemo uraditi sljedeće:

  1. Hajde da napišemo mali klijent za alphavantage na aiohttp sa zahtevima za krajnje tačke koje su nam potrebne.

  2. Kreirajmo agenta koji će prikupljati podatke o hartijama od vrijednosti i meta informacije o njima.

Ali, to je ono što ćemo uraditi za sam projekat, a u smislu istraživanja fausta, naučićemo kako pisati agente koji obrađuju stream događaje iz kafke, kao i kako pisati komande (click wrapper), u našem slučaju - za ručne push poruke na temu koju agent nadgleda.

Trening

AlphaVantage Client

Prvo, napišimo mali aiohttp klijent za zahtjeve za alphavantage.

alphavantage.py

Spojler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

U stvari, iz toga je sve jasno:

  1. AlphaVantage API je prilično jednostavno i lijepo dizajniran, pa sam odlučio da sve zahtjeve postavim putem metode construct_query gdje zauzvrat postoji http poziv.

  2. Donosim sva polja snake_case radi praktičnosti.

  3. Pa, dekoracija logger.catch za prekrasan i informativan traceback izlaz.

PS Ne zaboravite dodati alphavantage token lokalno u config.yml, ili izvesti varijablu okruženja HORTON_SERVICE_APIKEY. Dobijamo token ovdje.

CRUD klasa

Imat ćemo kolekciju vrijednosnih papira za čuvanje meta informacija o vrijednosnim papirima.

baza podataka/sigurnost.py

Po mom mišljenju, ovdje ne treba ništa objašnjavati, a sama bazna klasa je prilično jednostavna.

get_app()

Dodajmo funkciju za kreiranje objekta aplikacije u app.py

Spojler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Za sada ćemo imati najjednostavniju izradu aplikacije, malo kasnije ćemo je proširiti, međutim, kako ne bismo čekali, ovdje reference u App-klasu. Savjetujem vam i da pogledate klasu postavki, jer je ona odgovorna za većinu postavki.

Glavni dio

Agent za prikupljanje i vođenje liste vrijednosnih papira

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Dakle, prvo dobijamo objekat aplikacije Faust - to je prilično jednostavno. Zatim eksplicitno deklariramo temu za našeg agenta... Ovdje je vrijedno spomenuti šta je to, koji je interni parametar i kako se to može drugačije urediti.

  1. Teme u kafki, ako želimo da znamo tačnu definiciju, bolje je pročitati isključeno. dokument, ili možete čitati compendium na Habré-u na ruskom, gde je takođe sve prilično tačno odraženo :)

  2. Parametar interni, koji je prilično dobro opisan u faust dokumentu, omogućava nam da konfiguriramo temu direktno u kodu, naravno, to znači parametre koje su dali Faust programeri, na primjer: zadržavanje, politika zadržavanja (podrazumevano brisanje, ali možete podesiti kompaktni), broj particija po temi (particijeučiniti, na primjer, manje od globalnog značaja aplikacije Faust).

  3. Generalno, agent može kreirati upravljanu temu s globalnim vrijednostima, međutim, ja volim da sve eksplicitno deklariram. Osim toga, neki parametri (na primjer, broj particija ili politika zadržavanja) teme u oglasu agenta ne mogu se konfigurirati.

    Evo kako bi to moglo izgledati bez ručnog definiranja teme:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Pa, hajde da sada opišemo šta će naš agent uraditi :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dakle, na početku agenta otvaramo aiohttp sesiju za zahtjeve preko našeg klijenta. Dakle, pri pokretanju worker-a, kada se naš agent pokrene, odmah će se otvoriti sesija - jedna, za cijelo vrijeme pokretanja radnika (ili nekoliko, ako promijenite parametar paralelnost od agenta sa zadanom jedinicom).

Zatim pratimo stream (postavimo poruku _, pošto nas, u ovom agentu, nije briga za sadržaj) poruka iz naše teme, ako postoje na trenutnom ofsetu, inače će naš ciklus čekati njihov dolazak. Pa, unutar naše petlje, evidentiramo prijem poruke, dobijemo listu aktivnih (get_security vraća samo aktivnu po defaultu, vidi klijentov kod) vrijednosnih papira i spremimo je u bazu podataka, provjeravamo da li postoji sigurnost s istim tickerom i razmjena u bazi podataka, ako postoji, onda će se ona (papir) jednostavno ažurirati.

Pokrenimo našu kreaciju!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features web komponenta Fausta neću razmatrati u člancima, pa smo postavili odgovarajuću zastavu.

U našoj naredbi za pokretanje rekli smo Faustu gdje da traži objekt aplikacije i šta da radi s njim (pokrene worker) s nivoom izlaza info dnevnika. Dobijamo sljedeći izlaz:

Spojler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Zivo je!!!

Pogledajmo skup particija. Kao što vidimo, kreirana je tema sa imenom koje smo naznačili u kodu, podrazumevanim brojem particija (8, preuzeto iz topic_partitions - parametar objekta aplikacije), budući da nismo specificirali pojedinačnu vrijednost za našu temu (preko particija). Pokrenutom agentu u workeru je dodijeljeno svih 8 particija, budući da je jedini, ali o tome će biti detaljnije u dijelu o klasteriranju.

Pa, sada možemo otići na drugi prozor terminala i poslati praznu poruku našoj temi:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS koristeći @ pokazujemo da šaljemo poruku na temu pod nazivom “collect_securities”.

U ovom slučaju, poruka je otišla na particiju 6 - ovo možete provjeriti tako što ćete otići na kafdrop on localhost:9000

Odlaskom na prozor terminala sa našim radnikom, vidjet ćemo sretnu poruku poslanu pomoću loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Također možemo pogledati mongo (koristeći Robo3T ili Studio3T) i vidjeti da su vrijednosni papiri u bazi podataka:

Nisam milijarder i zato smo zadovoljni prvom opcijom gledanja.

Osnovni zadaci o Faustu, II dio: Agenti i timoviOsnovni zadaci o Faustu, II dio: Agenti i timovi

Sreća i veselje - prvi agent je spreman :)

Agent spreman, živio novi agent!

Da, gospodo, prešli smo samo 1/3 puta pripremljenog ovim člankom, ali nemojte se obeshrabriti, jer će sada biti lakše.

Dakle, sada nam je potreban agent koji prikuplja meta informacije i stavlja ih u dokument prikupljanja:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Pošto će ovaj agent obraditi informacije o određenoj sigurnosti, potrebno je da u poruci navedemo oznaku (simbol) ove sigurnosti. U tu svrhu u Faustu postoje ploče — klase koje deklariraju šemu poruka u temi agenta.

U ovom slučaju, idemo na records.py i opišite kako bi poruka za ovu temu trebala izgledati:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kao što ste mogli pretpostaviti, faust koristi napomenu tipa python da opiše šemu poruke, zbog čega je minimalna verzija koju biblioteka podržava 3.6.

Vratimo se agentu, postavimo tipove i dodamo ga:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kao što vidite, metodi inicijalizacije teme prenosimo novi parametar sa šemom - value_type. Nadalje, sve ide po istoj shemi, tako da ne vidim smisla da se zadržavam na bilo čemu drugom.

Pa, posljednji dodir je dodavanje poziva agentu za prikupljanje meta informacija collect_securittes:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Za poruku koristimo prethodno najavljenu šemu. U ovom slučaju koristio sam metodu .cast jer ne moramo čekati rezultat od agenta, ali vrijedi napomenuti da načine pošaljite poruku na temu:

  1. cast - ne blokira jer ne očekuje rezultat. Rezultat ne možete poslati na drugu temu kao poruku.

  2. send - ne blokira jer ne očekuje rezultat. Možete odrediti agenta u temi na koju će ići rezultat.

  3. pitati - čeka rezultat. Možete odrediti agenta u temi na koju će ići rezultat.

Dakle, to je sve sa agentima za danas!

Tim snova

Poslednje što sam obećao da ću napisati u ovom delu su komande. Kao što je ranije spomenuto, komande u Faustu su omotač oko klika. U stvari, faust jednostavno prilaže našu prilagođenu komandu svom interfejsu kada specificira ključ -A

Nakon što su najavljeni agenti u agents.py dodajte funkciju s dekoraterom app.commandpozivanje metode cast у collect_securites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dakle, ako pozovemo listu naredbi, naša nova komanda će biti u njoj:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Možemo ga koristiti kao i bilo ko drugi, pa hajde da ponovo pokrenemo faust worker i započnemo punu kolekciju vrijednosnih papira:

> faust -A horton.agents start-collect-securities

Šta će biti dalje?

U sljedećem dijelu, koristeći preostale agente kao primjer, razmotrićemo sink mehanizam za traženje ekstrema u cijenama zatvaranja trgovanja za godinu i cron lansiranje agenata.

To je sve za danas! Hvala na čitanju :)

Šifra za ovaj dio

Osnovni zadaci o Faustu, II dio: Agenti i timovi

PS U zadnjem dijelu su me pitali o Faustu i konfluentnoj kafki (koje karakteristike ima konfluent?). Čini se da je konfluent funkcionalniji na mnogo načina, ali činjenica je da Faust nema potpunu klijentsku podršku za konfluent – ​​to proizilazi iz opisi ograničenja klijenata u dok.

izvor: www.habr.com

Dodajte komentar