Eftergrûntaken oer Faust, Diel II: Aginten en teams

Eftergrûntaken oer Faust, Diel II: Aginten en teams

Ynhâldsopjefte

  1. Diel I: Ynlieding

  2. Diel II: Aginten en teams

Wat dogge wy hjir?

Dus, sa, it twadde diel. Lykas earder skreaun, sille wy it folgjende dwaan:

  1. Litte wy in lyts kliïnt skriuwe foar alphavantage op aiohttp mei oanfragen foar de einpunten dy't wy nedich binne.

  2. Litte wy in agint oanmeitsje dy't gegevens sil sammelje oer weardepapieren en meta-ynformaasje oer har.

Mar, dit is wat wy sille dwaan foar it projekt sels, en yn termen fan faust-ûndersyk sille wy leare hoe't jo aginten skriuwe dy't stream-eveneminten ferwurkje fan kafka, lykas hoe't jo kommando's skriuwe (klik wrapper), yn ús gefal - foar hânmjittich push-berjochten nei it ûnderwerp dat de agint kontrolearret.

Tarieding fan

AlphaVantage Client

Litte wy earst in lyts aiohttp-kliïnt skriuwe foar fersiken om alphavantage.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eins is alles der dúdlik út:

  1. De AlphaVantage API is frij ienfâldich en prachtich ûntworpen, dus ik besleat alle oanfragen fia de metoade te meitsjen construct_query wêr't op syn beurt in http-oprop is.

  2. Ik bring alle fjilden nei snake_case foar gemak.

  3. No, de logger.catch-dekoraasje foar prachtige en ynformative traceback-útfier.

PS Ferjit net om de alphavantage token lokaal ta te foegjen oan config.yml, of de omjouwingsfariabele te eksportearjen HORTON_SERVICE_APIKEY. Wy krije in token hjir.

CRUD klasse

Wy sille in samling fan weardepapieren hawwe om meta-ynformaasje oer weardepapieren op te slaan.

database/security.py

Yn myn miening is it net nedich om hjir wat út te lizzen, en de basisklasse sels is frij simpel.

get_app()

Litte wy in funksje tafoegje foar it meitsjen fan in applikaasjeobjekt yn app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Foar no sille wy de ienfâldichste oanmeitsjen fan applikaasjes hawwe, in bytsje letter sille wy it lykwols útwreidzje om jo net te wachtsjen, hjir referinsjes oan App-klasse. Ik advisearje jo ek om de ynstellingsklasse te besjen, om't it ferantwurdlik is foar de measte ynstellings.

Haadpart

Agent foar it sammeljen en ûnderhâlden fan in list fan weardepapieren

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Dat, earst krije wy it faust-applikaasjeobjekt - it is frij ienfâldich. Dêrnei ferklearje wy eksplisyt in ûnderwerp foar ús agent ... Hjir is it wurdich te neamen wat it is, wat de ynterne parameter is en hoe't dit oars kin wurde regele.

  1. Underwerpen yn kafka, as wy de krekte definysje witte wolle, is it better om te lêzen út. dokumint, of jo kinne lêze kompendium op Habré yn it Russysk, wêr't alles ek frij presys werjûn wurdt :)

  2. Parameter ynterne. kompakt), oantal partysjes per ûnderwerp (partysjeste dwaan, bygelyks, minder as globale betsjutting applikaasjes faust).

  3. Yn 't algemien kin de agint in beheard ûnderwerp oanmeitsje mei globale wearden, lykwols wol ik alles eksplisyt ferklearje. Derneist kinne guon parameters (bygelyks it oantal partysjes of behâldbelied) fan it ûnderwerp yn 'e agent-advertinsje net konfigureare wurde.

    Hjir is hoe't it der útsjen kin sûnder it ûnderwerp manuell te definiearjen:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No, litte wy no beskriuwe wat ús agent sil dwaan :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dat, oan it begjin fan 'e agint, iepenje wy in aiohttp-sesje foar oanfragen fia ús kliïnt. Sa, by it starten fan in arbeider, as ús agint wurdt lansearre, sil fuortendaliks in sesje iepene wurde - ien, foar de hiele tiid dat de arbeider rint (of ferskate, as jo de parameter feroarje gelikensens fan in agint mei in standertienheid).

Dêrnei folgje wy de stream (wy pleatse it berjocht yn _, om't wy, yn dizze agint, net skele oer de ynhâld) fan berjochten fan ús ûnderwerp, as se besteane op 'e hjoeddeistige offset, oars sil ús syklus wachtsje op har komst. No, yn ús lus registrearje wy de ûntfangst fan it berjocht, krije in list mei aktive (get_securities jout allinich aktyf standert, sjoch kliïntkoade) weardepapieren en bewarje it yn 'e databank, kontrolearje oft d'r in feiligens is mei deselde ticker en útwikseling yn 'e databank , as der is, dan sil it (it papier) gewoan bywurke wurde.

Litte wy ús skepping lansearje!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features web komponint Ik sil gjin faust beskôgje yn 'e artikels, dus wy sette de passende flagge.

Yn ús startkommando hawwe wy faust ferteld wêr't it applikaasjeobjekt te sykjen en wat dermei te dwaan (in arbeider starte) mei it útfiernivo fan ynfolog. Wy krije de folgjende útfier:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

It libbet!!!

Litte wy nei de partysjeset sjen. Sa't wy sjen kinne, is in ûnderwerp makke mei de namme dy't wy yn 'e koade oantsjutten, it standert oantal partysjes (8, nommen út topic_partysjes - applikaasje-objektparameter), om't wy gjin yndividuele wearde hawwe opjûn foar ús ûnderwerp (fia partysjes). De lansearre agint yn 'e arbeider wurdt tawiisd oan alle 8 partysjes, om't it de ienige is, mar dit sil yn mear detail besprutsen wurde yn it diel oer klustering.

No, no kinne wy ​​nei in oar terminalfinster gean en in leech berjocht nei ús ûnderwerp stjoere:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS brûke @ wy litte sjen dat wy in berjocht stjoere nei in ûnderwerp mei de namme "collect_securities".

Yn dit gefal gie it berjocht nei partysje 6 - jo kinne dit kontrolearje troch te gean nei kafdrop op localhost:9000

Gean nei it terminalfinster mei ús arbeider, sille wy in lokkich berjocht sjen ferstjoerd mei loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Wy kinne ek nei mongo sjen (mei Robo3T of Studio3T) en sjen dat de weardepapieren yn 'e database binne:

Ik bin gjin miljardêr, en dêrom binne wy ​​tefreden mei de earste besjen opsje.

Eftergrûntaken oer Faust, Diel II: Aginten en teamsEftergrûntaken oer Faust, Diel II: Aginten en teams

Lok en freugde - de earste agent is klear :)

Agent klear, lang libje de nije agent!

Ja, hearen, wy hawwe mar 1/3 fan it paad dat troch dit artikel taret is ôfsletten, mar wês net ûntmoedige, want no sil it makliker wurde.

Dat no hawwe wy in agint nedich dy't meta-ynformaasje sammelet en it yn in samlingdokumint set:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Sûnt dizze agint sil ferwurkje ynformaasje oer in spesifike feiligens, wy moatte oanjaan de ticker (symboal) fan dizze feiligens yn it berjocht. Foar dit doel yn faust der binne records - klassen dy't it berjochtskema ferklearje yn it agentûnderwerp.

Yn dit gefal, litte wy gean nei records.py en beskriuw hoe't it berjocht foar dit ûnderwerp der útsjen moat:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Lykas jo miskien hawwe rieden, brûkt faust de annotaasje fan it python-type om it berjochtskema te beskriuwen, dat is de reden wêrom't de minimale ferzje dy't troch de bibleteek wurdt stipe is 3.6.

Litte wy weromgean nei de agint, set de typen yn en foegje it ta:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Sa't jo sjen kinne, jouwe wy in nije parameter mei in skema troch nei de metoade foar inisjalisaasje fan ûnderwerp - value_type. Fierder folget alles itselde skema, dus ik sjoch gjin nut om op wat oars te stean.

No, de lêste touch is om in oprop ta te foegjen oan de agint foar it sammeljen fan meta-ynformaasje om collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Wy brûke it earder oankundige skema foar it berjocht. Yn dit gefal haw ik de .cast-metoade brûkt, om't wy net hoege te wachtsjen op it resultaat fan 'e agint, mar it is it neamen wurdich dat manieren stjoer in berjocht nei it ûnderwerp:

  1. cast - blokkearret net om't it gjin resultaat ferwachtet. Jo kinne it resultaat net as berjocht nei in oar ûnderwerp stjoere.

  2. ferstjoere - blokkearret net om't it gjin resultaat ferwachtet. Jo kinne in agint opjaan yn it ûnderwerp wêr't it resultaat nei sil gean.

  3. freegje - wachtet op in resultaat. Jo kinne in agint opjaan yn it ûnderwerp wêr't it resultaat nei sil gean.

Dat, dat is alles mei aginten foar hjoed!

It dreamteam

It lêste wat ik tasein yn dit diel te skriuwen is kommando's. Lykas earder neamd, binne kommando's yn faust in wrapper om klik. Feitlik hechtet faust ús oanpaste kommando gewoan oan har ynterface by it opjaan fan de -A-kaai

Nei de oankundige aginten yn agents.py add in funksje mei in decorator app.kommandoropt de metoade cast у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dus, as wy de list mei kommando's neame, sil ús nije kommando deryn wêze:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Wy kinne it brûke lykas elkenien, dus litte wy de faust-arbeider opnij starte en in folsleine kolleksje fan weardepapieren begjinne:

> faust -A horton.agents start-collect-securities

Wat sil d'rnei barre?

Yn it folgjende diel, mei de oerbleaune aginten as foarbyld, sille wy it sinkmeganisme beskôgje foar it sykjen nei ekstremen yn 'e slutingsprizen fan hannel foar it jier en de cron-lansearring fan aginten.

Dat is alles foar hjoed! Betanke foar it lêzen :)

Koade foar dit diel

Eftergrûntaken oer Faust, Diel II: Aginten en teams

PS Under it lêste diel waard ik frege oer faust en konfluent kafka (hokker funksjes hat confluent?). It liket derop dat confluent mear funksjoneel is op in protte manieren, mar it feit is dat faust gjin folsleine kliïntstipe hat foar confluent - dit folget út beskriuwingen fan klantbeperkingen yn it dokumint.

Boarne: www.habr.com

Add a comment