Taustaülesanded teemal Faust, II osa: Agendid ja meeskonnad

Taustaülesanded teemal Faust, II osa: Agendid ja meeskonnad

Sisukord

  1. I osa: Sissejuhatus

  2. II osa: Agendid ja meeskonnad

Mida me siin teeme?

Nii, nii, teine ​​osa. Nagu varem kirjutatud, teeme selles järgmist:

  1. Kirjutame aiohttp-le alfavantage väikekliendi koos meile vajalike lõpp-punktide päringutega.

  2. Loome agendi, kes kogub andmeid väärtpaberite kohta ja metainfot nende kohta.

Kuid seda me teeme projekti enda jaoks ja fausti uurimise osas õpime, kuidas kirjutada agente, mis töötlevad sündmusi kafkast, samuti kuidas kirjutada käske (klõpsake ümbrist), meie puhul - käsitsi tõukesõnumite jaoks teemale, mida agent jälgib.

Koolitus

AlphaVantage klient

Kõigepealt kirjutame alfavantage taotluste jaoks väikese aiohttp-kliendi.

alfavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Tegelikult on sellest kõik selge:

  1. AlphaVantage API on üsna lihtsalt ja kaunilt kujundatud, nii et otsustasin esitada kõik taotlused selle meetodi kaudu construct_query kus omakorda on http-kõne.

  2. Toon kõik põllud kohale snake_case mugavuseks.

  3. Noh, kaunistus logger.catch kauni ja informatiivse jälgimisväljundi jaoks.

PS Ärge unustage lisada saidile config.yml kohapeal alfavantage-märki või eksportida keskkonnamuutujat HORTON_SERVICE_APIKEY. Saame märgi siin.

CRUD klass

Meil on väärtpaberikogu, et salvestada väärtpaberite metainfot.

andmebaas/security.py

Minu arust pole siin vaja midagi seletada ja baasklass ise on üsna lihtne.

get_app()

Lisame funktsiooni rakenduse objekti loomiseks app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Praegu on meil kõige lihtsam rakenduste loomine, veidi hiljem laiendame seda, kuid et mitte jätta teid ootama, siis siin viited App-klassi. Samuti soovitan teil vaadata seadete klassi, kuna see vastutab enamiku seadete eest.

Peamine osa

Agent väärtpaberite nimekirja kogumiseks ja pidamiseks

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Niisiis, kõigepealt saame kiire rakenduse objekti - see on üsna lihtne. Järgmisena deklareerime oma agendi jaoks selgesõnaliselt teema... Siin tasub mainida, mis see on, mis on sisemine parameeter ja kuidas seda teisiti korraldada.

  1. Kafka-keelsed teemad, kui tahame täpset määratlust teada, on parem lugeda väljas. dokumentvõi saate lugeda abstraktne venekeelsel Habrel, kus kõik ka päris täpselt kajastatud :)

  2. Sisemine parameeter, mis on faust doc-is üsna hästi kirjeldatud, võimaldab meil teemat otse koodis seadistada, see tähendab muidugi fausti arendajate antud parameetreid, näiteks: retention, retention policy (vaikimisi kustutada, aga saab määrata kompaktne), partitsioonide arv teema kohta (hindedteha näiteks vähem kui globaalset tähtsust rakendused faust).

  3. Üldiselt saab agent luua globaalsete väärtustega hallatava teema, kuid mulle meeldib kõike selgelt deklareerida. Lisaks ei saa agendikuulutuses oleva teema mõningaid parameetreid (nt partitsioonide arv või säilituspoliitika) seadistada.

    Ilma teemat käsitsi määratlemata võib see välja näha järgmine:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Noh, nüüd kirjeldame, mida meie agent teeb :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Seega avame agendi alguses aiohttp-seansi oma kliendi kaudu päringute jaoks. Seega avatakse töötaja käivitamisel meie agendi käivitamisel kohe seanss – üks, kogu töötaja töötamise ajaks (või mitu, kui muudate parameetrit samaaegsus vaikeüksusega agendilt).

Järgmisena järgime voogu (paneme sõnumi sisse _, kuna meid selles agendis ei huvita oma teema sõnumite sisu, kui need on praegusel nihkel olemas, vastasel juhul ootab meie tsükkel nende saabumist. Noh, oma tsükli sees logime sõnumi vastuvõtmise, saame aktiivsete (get_securities tagastab vaikimisi ainult aktiivsed, vaata kliendi koodi) väärtpaberite nimekirja ja salvestame selle andmebaasi, kontrollides, kas seal on sama märgiga väärtpaberit ja vahetada andmebaasis , kui on, siis seda (paberit) lihtsalt uuendatakse.

Paneme oma loomingu käima!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS funktsioonid veebikomponent Fausti ma artiklites arvesse ei võta, seega panime sobiva lipu.

Käivituskäskluses ütlesime faust infologi väljundtasemega, kust rakendusobjekti otsida ja mida sellega teha (käivitada töötaja). Saame järgmise väljundi:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

See on elus!!!

Vaatame partitsioonikomplekti. Nagu näeme, loodi teema nimega, mille koodis määrasime, partitsioonide vaikearv (8, võetud teema_partitsioonid - rakenduse objekti parameeter), kuna me ei määranud oma teema jaoks individuaalset väärtust (sektsioonide kaudu). Töötajas käivitatud agendile on määratud kõik 8 partitsiooni, kuna see on ainuke, kuid sellest räägitakse üksikasjalikumalt klasterdamist käsitlevas osas.

Noh, nüüd saame minna teise terminali aknasse ja saata meie teemale tühja sõnumi:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS kasutamine @ näitame, et saadame sõnumi teemale "koguda_väärtpaberid".

Sel juhul läks teade partitsioonile 6 – seda saate kontrollida, minnes kafdrop on localhost:9000

Oma töötajaga terminali aknasse minnes näeme loguru abil saadetud rõõmsat teadet:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Samuti saame vaadata mongot (kasutades Robo3T või Studio3T) ja näha, et väärtpaberid on andmebaasis:

Ma ei ole miljardär ja seetõttu oleme esimese vaatamisvõimalusega rahul.

Taustaülesanded teemal Faust, II osa: Agendid ja meeskonnadTaustaülesanded teemal Faust, II osa: Agendid ja meeskonnad

Õnn ja rõõm - esimene agent on valmis :)

Agent valmis, elagu uus agent!

Jah, härrased, oleme läbinud vaid 1/3 selle artikliga ette valmistatud teest, kuid ärge heitke meelt, sest nüüd on see lihtsam.

Nüüd vajame agenti, kes kogub metateavet ja paneb selle kogumisdokumenti:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Kuna see agent töötleb teavet konkreetse väärtpaberi kohta, peame sõnumis märkima selle väärtpaberi märgi (sümboli). Selleks on faustis olemas Andmed — klassid, mis deklareerivad agendi teemas sõnumiskeemi.

Sel juhul lähme edasi records.py ja kirjeldage, milline peaks selle teema sõnum välja nägema:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Nagu arvata võis, kasutab Faust sõnumiskeemi kirjeldamiseks pythoni tüüpi annotatsiooni, mistõttu on teegi poolt toetatud minimaalne versioon 3.6.

Pöördume tagasi agendi juurde, määrake tüübid ja lisage see:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Nagu näete, edastame teema initsialiseerimise meetodile uue parameetri koos skeemiga - väärtus_tüüp. Lisaks käib kõik sama skeemi järgi, nii et ma ei näe mõtet millegi muu kallal peatuda.

Noh, viimane lihv on lisada metateabe kogumise agendile call_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Sõnumi jaoks kasutame varem välja kuulutatud skeemi. Sel juhul kasutasin .cast meetodit, kuna me ei pea ootama agendi tulemust, kuid tasub mainida, et viisil saada sõnum teemale:

  1. valatud - ei blokeeri, sest ei oota tulemust. Tulemust ei saa sõnumina teisele teemale saata.

  2. saada – ei blokeeri, sest ei oota tulemust. Teemas saate määrata agendi, kellele tulemus suunatakse.

  3. küsi - ootab tulemust. Teemas saate määrata agendi, kellele tulemus suunatakse.

Tänaseks on agentidega kõik!

Unistuste meeskond

Viimane asi, mida ma sellesse ossa lubasin kirjutada, on käsud. Nagu varem mainitud, on fausti käsud klõpsamise ümber. Tegelikult lisab Faust klahvi -A määramisel lihtsalt meie kohandatud käsu oma liidesele

Pärast agentide väljakuulutamist agents.py lisa dekoraatoriga funktsioon app.commandkutsudes meetodit koo у koguda_väärtpabereid:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Seega, kui kutsume välja käskude loendi, on meie uus käsk selles:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Saame seda kasutada nagu iga teinegi, nii et taaskäivitagem Faust Worker ja alustame täieõigusliku väärtpaberite kogumisega:

> faust -A horton.agents start-collect-securities

Mis saab edasi?

Järgmises osas vaatleme ülejäänud agentide näitel vajumismehhanismi aasta tehingute sulgemishindades äärmuste otsimiseks ja agentide kroonu käivitamiseks.

See on tänaseks kõik! Aitäh lugemast :)

Selle osa kood

Taustaülesanded teemal Faust, II osa: Agendid ja meeskonnad

PS Viimase osa all küsiti minult faust and confluent kafka (millised omadused on konfluendil?). Tundub, et confluent on mitmes mõttes funktsionaalsem, kuid fakt on see, et Faustil puudub täielik klienditugi confluenti jaoks – see tuleneb kliendipiirangute kirjeldused dokumendis.

Allikas: www.habr.com

Lisa kommentaar