Ħidmiet ta' sfond dwar Faust, Parti II: Aġenti u Timijiet

Ħidmiet ta' sfond dwar Faust, Parti II: Aġenti u Timijiet

Tabella tal-kontenut

  1. Parti I: Introduzzjoni

  2. Taqsima II: Aġenti u Timijiet

X'qed nagħmlu hawn?

Allura, hekk, it-tieni parti. Kif miktub qabel, fiha se nagħmlu dan li ġej:

  1. Ejja niktbu klijent żgħir għal alphaavantage fuq aiohttp b'talbiet għall-endpoints li għandna bżonn.

  2. Ejja noħolqu aġent li jiġbor data dwar titoli u meta informazzjoni fuqhom.

Iżda, dan huwa dak li se nagħmlu għall-proġett innifsu, u f'termini ta 'riċerka faust, se nitgħallmu kif niktbu aġenti li jipproċessaw avvenimenti ta' fluss minn kafka, kif ukoll kif niktbu kmandi (ikklikkja wrapper), fil-każ tagħna - għal messaġġi push manwali għas-suġġett li l-aġent qed jimmonitorja.

Taħriġ

Klijent AlphaVantage

L-ewwel, ejja tikteb klijent aiohttp żgħir għal talbiet għal alphaavantage.

alphaavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Fil-fatt, kollox huwa ċar minnha:

  1. L-API AlphaVantage hija ddisinjata b'mod pjuttost sempliċi u sabiħ, għalhekk iddeċidejt li nagħmel it-talbiet kollha permezz tal-metodu construct_query fejn imbagħad ikun hemm sejħa http.

  2. Inġib l-għelieqi kollha lejhom snake_case għall-kumdità.

  3. Ukoll, id-dekorazzjoni logger.catch għal output ta 'traceback sabiħ u informattiv.

PS Tinsiex iżżid it-token alphaavantage lokalment ma' config.yml, jew tesporta l-varjabbli ambjentali HORTON_SERVICE_APIKEY. Nirċievu token hawn.

Klassi CRUD

Se jkollna kollezzjoni ta’ titoli biex naħżnu meta informazzjoni dwar it-titoli.

database/security.py

Fl-opinjoni tiegħi, m'hemmx għalfejn nispjega xejn hawn, u l-klassi bażi nnifisha hija pjuttost sempliċi.

get_app()

Ejja nżidu funzjoni għall-ħolqien ta 'oġġett ta' applikazzjoni fi app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Għalissa se jkollna l-eħfef ħolqien ta' applikazzjoni, ftit wara se nkabbruha, madankollu, sabiex ma nżommukx tistenna, hawn referenzi għall-klassi App. Nagħtik ukoll parir biex tagħti ħarsa lejn il-klassi tas-settings, peress li hija responsabbli għall-biċċa l-kbira tas-settings.

Korp prinċipali

Aġent għall-ġbir u ż-żamma ta' lista ta' titoli

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Allura, l-ewwel irridu nġibu l-oġġett ta 'applikazzjoni faust - huwa pjuttost sempliċi. Sussegwentement, aħna niddikjaraw b'mod espliċitu suġġett għall-aġent tagħna... Hawnhekk ta 'min isemmi x'inhu, x'inhu l-parametru intern u kif dan jista' jiġi rranġat b'mod differenti.

  1. Suġġetti fil-kafka, jekk irridu nkunu nafu d-definizzjoni eżatta, aħjar naqraw mitfi. dokument, jew tista' taqra kompendju fuq Habré bir-Russu, fejn kollox huwa rifless ukoll b'mod pjuttost preċiż :)

  2. Parametru intern, deskritt pjuttost tajjeb fid-dok faust, jippermettilna nikkonfiguraw is-suġġett direttament fil-kodiċi, ovvjament, dan ifisser il-parametri pprovduti mill-iżviluppaturi tal-faust, pereżempju: żamma, politika ta 'żamma (b'mod awtomatiku ħassar, imma tista' tissettja kompatti), numru ta' diviżorji għal kull suġġett (punteġġitagħmel, per eżempju, inqas minn sinifikat globali applikazzjonijiet faust).

  3. B'mod ġenerali, l-aġent jista 'joħloq suġġett ġestit b'valuri globali, madankollu, inħobb niddikjara kollox b'mod espliċitu. Barra minn hekk, xi parametri (per eżempju, in-numru ta 'diviżorji jew politika ta' żamma) tas-suġġett fir-reklam tal-aġent ma jistgħux jiġu kkonfigurati.

    Hawn kif tista' tidher mingħajr ma tiddefinixxi manwalment is-suġġett:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ukoll, issa ejja niddeskrivu x'se jagħmel l-aġent tagħna :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Allura, fil-bidu tal-aġent, aħna niftħu sessjoni aiohttp għal talbiet permezz tal-klijent tagħna. Għalhekk, meta jibda ħaddiem, meta jiġi mniedi l-aġent tagħna, immedjatament tinfetaħ sessjoni - waħda, għall-ħin kollu li jkun qed jaħdem il-ħaddiem (jew diversi, jekk tibdel il-parametru konkorrenza minn aġent b'unità default).

Sussegwentement, insegwu n-nixxiegħa (aħna npoġġu l-messaġġ ġewwa _, peress li aħna, f'dan l-aġent, ma jimpurtahomx mill-kontenut) ta 'messaġġi mis-suġġett tagħna, jekk jeżistu fl-offset attwali, inkella ċ-ċiklu tagħna jistenna l-wasla tagħhom. Ukoll, ġewwa l-linja tagħna, aħna nilloggjaw l-irċevuta tal-messaġġ, nikseb lista ta 'titoli attivi (get_securities ritorni attivi biss b'mod awtomatiku, ara l-kodiċi tal-klijent) u ssejvjaha fid-database, niċċekkjaw jekk hemmx sigurtà bl-istess ticker u skambju fid-database , jekk ikun hemm, allura (il-karta) sempliċement tiġi aġġornata.

Ejja nniedu l-ħolqien tagħna!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Karatteristiċi PS komponent tal-web Mhux se nikkunsidra l-faust fl-artikoli, għalhekk aħna nistabbilixxu l-bandiera xierqa.

Fil-kmand tat-tnedija tagħna, għidna lil faust fejn għandek tfittex l-oġġett tal-applikazzjoni u x'għandek tagħmel miegħu (tniedi ħaddiem) bil-livell tal-output tal-log tal-informazzjoni. Aħna nġibu l-output li ġej:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Huwa ħaj!!!

Ejja nħarsu lejn is-sett tal-partizzjoni. Kif nistgħu naraw, inħoloq suġġett bl-isem li nnominajna fil-kodiċi, in-numru default ta’ diviżorji (8, meħuda minn topic_partitions - parametru tal-oġġett tal-applikazzjoni), peress li aħna ma speċifikajniex valur individwali għas-suġġett tagħna (permezz ta 'ħitan). L-aġent imniedi fil-ħaddiem huwa assenjat it-8 diviżorji kollha, peress li huwa l-uniku wieħed, iżda dan se jiġi diskuss f'aktar dettall fil-parti dwar il-clustering.

Ukoll, issa nistgħu mmorru f'tieqa terminali oħra u nibagħtu messaġġ vojt lis-suġġett tagħna:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS bl-użu @ nuru li qed nibagħtu messaġġ lil suġġett bl-isem “collect_securities”.

F'dan il-każ, il-messaġġ mar għall-partizzjoni 6 - tista 'tiċċekkja dan billi tmur kafdrop fuq localhost:9000

Meta mmorru fit-tieqa tat-terminal mal-ħaddiem tagħna, se naraw messaġġ kuntent mibgħut bl-użu tal-loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Nistgħu wkoll inħarsu lejn mongo (bl-użu ta' Robo3T jew Studio3T) u naraw li t-titoli jinsabu fid-database:

Jien mhux biljunarju, u għalhekk nikkuntentaw bl-ewwel għażla tal-wiri.

Ħidmiet ta' sfond dwar Faust, Parti II: Aġenti u TimijietĦidmiet ta' sfond dwar Faust, Parti II: Aġenti u Timijiet

Ferħ u ferħ - l-ewwel aġent lest :)

Aġent lest, ħajja twila l-aġent il-ġdid!

Iva, rġulija, koprejna biss 1/3 tat-triq imħejjija minn dan l-artikolu, imma taqtax qalbek, għax issa se jkun aktar faċli.

Allura issa għandna bżonn aġent li jiġbor meta informazzjoni u jpoġġiha f'dokument ta 'ġbir:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Peress li dan l-aġent se jipproċessa informazzjoni dwar sigurtà speċifika, jeħtieġ li nindikaw it-ticker (simbolu) ta 'din is-sigurtà fil-messaġġ. Għal dan il-għan fil-faust hemm Rekords — klassijiet li jiddikjaraw l-iskema tal-messaġġi fis-suġġett tal-aġent.

F'dan il-każ, ejja mmorru records.py u ddeskrivi kif għandu jidher il-messaġġ għal dan is-suġġett:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kif stajt bdejt, faust juża l-annotazzjoni tat-tip python biex jiddeskrivi l-iskema tal-messaġġ, u huwa għalhekk li l-verżjoni minima appoġġjata mil-librerija hija 3.6.

Ejja nerġgħu lura għall-aġent, issettja t-tipi u żidha:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kif tistgħu taraw, aħna ngħaddu parametru ġdid bi skema għall-metodu ta 'inizjalizzazzjoni tas-suġġett - value_type. Barra minn hekk, kollox isegwi l-istess skema, għalhekk ma nara l-ebda punt li nitkellem fuq xi ħaġa oħra.

Ukoll, l-aħħar touch huwa li żżid sejħa lill-aġent tal-ġbir tal-meta informazzjoni biex collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Aħna nużaw l-iskema mħabbra qabel għall-messaġġ. F'dan il-każ, użajt il-metodu .cast peress li m'għandniex bżonn nistennew ir-riżultat mill-aġent, iżda ta 'min isemmi li modi ibgħat messaġġ lis-suġġett:

  1. mitfugħa - ma jimblokkax għax ma jistenniex riżultat. Ma tistax tibgħat ir-riżultat lil suġġett ieħor bħala messaġġ.

  2. tibgħat - ma timblokkax għax ma tistenniex riżultat. Tista' tispeċifika aġent fis-suġġett li għalih se jmur ir-riżultat.

  3. staqsi - jistenna riżultat. Tista' tispeċifika aġent fis-suġġett li għalih se jmur ir-riżultat.

Allura, dak kollu ma 'aġenti għal-lum!

It-Tim tal-ħolm

L-aħħar ħaġa li wiegħed li nikteb f'din il-parti hija l-kmandi. Kif issemma qabel, kmandi f'faust huma wrapper madwar klikk. Fil-fatt, faust sempliċement iwaħħal il-kmand tad-dwana tagħna mal-interface tiegħu meta jispeċifika ċ-ċavetta -A

Wara l-aġenti mħabbra fil aġenti.py żid funzjoni b'dekoratur app.kmandsejħa tal-metodu mitfugħa у jiġbru_titoli:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Għalhekk, jekk insejħu l-lista ta 'kmandi, il-kmand il-ġdid tagħna se jkun fiha:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Nistgħu nużawha bħal ħaddieħor, allura ejja nibdew mill-ġdid il-ħaddiem faust u nibdew ġabra sħiħa ta 'titoli:

> faust -A horton.agents start-collect-securities

X'se jiġri wara?

Fil-parti li jmiss, billi tuża l-aġenti li fadal bħala eżempju, se nikkunsidraw il-mekkaniżmu tas-sink għat-tiftix għal estremi fil-prezzijiet tal-għeluq tal-kummerċ għas-sena u t-tnedija cron tal-aġenti.

Dak kollu għal-lum! Grazzi talli qrajt :)

Kodiċi għal din il-parti

Ħidmiet ta' sfond dwar Faust, Parti II: Aġenti u Timijiet

PS Taħt l-aħħar parti ġejt mistoqsi dwar faust u kafka konfluwenti (x'karatteristiċi għandha konfluwenti?). Jidher li konfluwenti huwa aktar funzjonali f'ħafna modi, iżda l-fatt hu li faust m'għandux appoġġ sħiħ tal-klijent għal konfluwenti - dan isegwi minn deskrizzjonijiet tar-restrizzjonijiet tal-klijenti fid-dok.

Sors: www.habr.com

Żid kumment