Imisebenzi yangasemva kwiFaust, iCandelo II: Iiarhente kunye namaQela

Imisebenzi yangasemva kwiFaust, iCandelo II: Iiarhente kunye namaQela

Uluhlu lomxholo

  1. Icandelo I: Intshayelelo

  2. Icandelo II: Iiarhente kunye namaQela

Senza ntoni apha?

Ngoko ke, inxalenye yesibini. Njengoko kubhaliwe ngaphambili, kuyo siya kwenza oku kulandelayo:

  1. Masibhale umxhasi omncinci wealphavantage kwi-aiohttp kunye nezicelo zeesiphelo esizifunayo.

  2. Masenze i-arhente eya kuqokelela idatha kwii-securities kunye nolwazi lwe-meta kubo.

Kodwa, yiloo nto esiya kuyenzela iprojekthi ngokwayo, kwaye ngokumalunga nophando olukhawulezayo, siya kufunda indlela yokubhala ii-agent eziqhuba iziganeko ezivela kwi-kafka, kunye nendlela yokubhala imiyalelo (cofa i-wrapper), kwimeko yethu - yemiyalezo yokutyhala ngesandla ukuya kwisihloko esijongwa ngummeli.

Ukulungiselela

AlphaVantage Client

Kuqala, masibhale umxhasi omncinci we-aiohttp kwizicelo kwi-alphavantage.

ialphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Enyanisweni, yonke into icacile kuyo:

  1. IAlphaVantage API ilula kwaye iyilwe kakuhle, ndiye ndagqiba ekubeni ndenze zonke izicelo ngale ndlela construct_query apho kukho umnxeba we-http.

  2. Ndizisa onke amasimi kuwo snake_case ukwenzela intuthuzelo.

  3. Ewe, umhombiso we-logger.catch wemveliso entle kunye nenolwazi lokulandela umkhondo.

PS Ungalibali ukongeza ithokheni yealphavantage ekuhlaleni kwiconfig.yml, okanye uthumele ngaphandle umahluko wemekobume. HORTON_SERVICE_APIKEY. Sifumana umqondiso apha.

Iklasi yeCRUD

Siza kuba nengqokelela yokhuseleko ukugcina ulwazi lwemeta malunga nezibambiso.

idatabase/ukhuseleko.py

Ngokombono wam, akukho mfuneko yokuchaza nantoni na apha, kwaye iklasi yesiseko ngokwayo ilula kakhulu.

fumana_usetyenziso ()

Masidibanise umsebenzi wokwenza into yesicelo kuyo usetyenziso.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Okwangoku siza kuba neyona ndlela ilula yokuyilwa kwesicelo, emva kwexesha elincinci siyayandisa, nangona kunjalo, ukuze ungahlali ulindile, apha. iimbekiselo kwi-App-class. Ndikwacebisa ukuba ujonge iklasi yokuseta, kuba inoxanduva loninzi lwezicwangciso.

Inxalenye ephambili

I-arhente yokuqokelela kunye nokugcina uluhlu lwezibambiso

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ke, okokuqala sifumana eyona nto isebenzayo-ilula kakhulu. Emva koko, sibhengeza ngokucacileyo isihloko kwi-arhente yethu ... Apha kukufanelekile ukukhankanya ukuba yintoni, yintoni ipharamitha yangaphakathi kunye nendlela oku kunokulungiswa ngayo ngokuhlukileyo.

  1. Izihloko kwi-kafka, ukuba sifuna ukwazi inkcazo echanekileyo, kungcono ukufunda icimile. uxwebhu, okanye unokufunda isishwankathelo kuHabré ngesiRashiya, apho yonke into ikwabonakaliswa ngokuchanekileyo :)

  2. Ipharamitha yangaphakathi, echazwe kakuhle kwi doc faust, isivumela ukuba siqwalasele isihloko ngokuthe ngqo kwikhowudi, kakade, oku kuthetha ukuba iparameters ezibonelelwe ngabaphuhlisi abanamandla, umzekelo: ugcino, umgaqo-nkqubo wogcino (ngokucima okungagqibekanga, kodwa ungabeka indawo yokudlala), inani lezahlulo ngesihloko ngasinye (izikoloukwenza, umzekelo, ngaphantsi kwe ukubaluleka kwehlabathi izicelo ngokukhawuleza).

  3. Ngokubanzi, i-arhente inokudala isihloko esilawulwayo kunye namaxabiso ehlabathi, nangona kunjalo, ndiyathanda ukuvakalisa yonke into ngokucacileyo. Ukongezelela, ezinye iiparamitha (umzekelo, inani lezahlulo okanye umgaqo-nkqubo wokugcinwa) wesihloko kwintengiso ye-arhente ayikwazi ukumiselwa.

    Nantsi indlela enokujongeka ngayo ngaphandle kokuchaza isihloko ngesandla:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ewe, ngoku makhe sichaze ukuba iarhente yethu iya kwenza ntoni :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ngoko, ekuqaleni kwe-arhente, sivula iseshoni ye-aiohttp yezicelo ngomthengi wethu. Ke, xa uqala umsebenzi, xa iarhente yethu iqaliswa, iseshoni iya kuvulwa ngokukhawuleza - enye, ngalo lonke ixesha eqhuba umsebenzi (okanye uninzi, ukuba utshintsha iparameter. concurrency ukusuka kwi-arhente eneyunithi engagqibekanga).

Emva koko, silandela umsinga (sibeka umyalezo _, ekubeni thina, kule arhente, asikhathali ngomxholo) wemiyalezo evela kwisihloko sethu, ukuba zikhona kwi-offset yangoku, ngaphandle koko umjikelo wethu uya kulinda ukufika kwabo. Ewe, ngaphakathi kwi-loop yethu, singena kwirisithi yomyalezo, sifumana uluhlu olusebenzayo (fumana_ukubuyisela ukubuyisela kuphela okusebenzayo ngokungagqibekanga, jonga ikhowudi yomxhasi) kwaye uyigcine kwisiseko sedatha, ukujonga ukuba kukho ukhuseleko kunye neticker efanayo kwaye utshintshiselwano kwisiseko sedatha , ukuba kukho, ngoko ke (iphepha) liya kuhlaziywa ngokulula.

Masiqalise indalo yethu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Iimpawu zePS icandelo lewebhu Andiyi kuqwalasela i-faust kumanqaku, ngoko sibeka iflegi efanelekileyo.

Kumyalelo wethu wokuphehlelelwa, sixelele i-faust ukuba ijonge phi into yesicelo kwaye wenze ntoni ngayo (qalisa umsebenzi) kunye nenqanaba lemveliso yelog yolwazi. Sifumana iziphumo ezilandelayo:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Iyaphila!!!

Makhe sijonge iseti yokwahlula. Njengoko sibona, isihloko senziwe ngegama esilikhethileyo kwikhowudi, inani elingagqibekanga lezahlulo (8, ezithathwe izahlulo_zesihloko - into yesicelo iparameter), ekubeni asizange sicacise ixabiso lomntu kwisihloko sethu (ngokwahlula). I-arhente esungulwe kumsebenzi inikwe zonke izahlulo ezisi-8, ekubeni yiyo yodwa, kodwa oku kuya kuxoxwa ngokubanzi kwinxalenye malunga nokudibanisa.

Ewe, ngoku singaya kwenye i-terminal window kwaye sithumele umyalezo ongenanto kwisihloko sethu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usebenzisa @ sibonisa ukuba sithumela umyalezo kwisihloko esibizwa ngokuba "collect_securities".

Kule meko, umyalezo uye kwi-partition 6 - ungajonga oku ngokuya kwi-kafdrop on localhost:9000

Ukuya kwifestile yesiphelo kunye nomsebenzi wethu, siya kubona umyalezo ovuyayo othunyelwe kusetyenziswa i-loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Sinokujonga kwakhona kwi-mongo (usebenzisa i-Robo3T okanye i-Studio3T) kwaye sibone ukuba izibambiso zikwisiseko sedatha:

Andiyena ubhiliyoni, kwaye ngoko sanelisekile yindlela yokuqala yokujonga.

Imisebenzi yangasemva kwiFaust, iCandelo II: Iiarhente kunye namaQelaImisebenzi yangasemva kwiFaust, iCandelo II: Iiarhente kunye namaQela

Ulonwabo kunye novuyo - i-arhente yokuqala ilungile :)

I-arhente ilungile, phila ixesha elide i-arhente entsha!

Ewe, manene, siye sagubungela kuphela i-1/3 yendlela elungiselelwe leli nqaku, kodwa musa ukudimazeka, kuba ngoku kuya kuba lula.

Ke ngoku sifuna iarhente eqokelela ulwazi lwemeta kwaye ilubeke kuxwebhu lokuqokelela:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ekubeni le arhente iza kuqhubekekisa ulwazi malunga nokhuseleko oluthile, kufuneka sibonise iticker (uphawu) yolu khuseleko kumyalezo. Ngenxa yale njongo kwi-faust kukho iirekhodi — iiklasi ezibhengeza isikimu somyalezo kwisihloko somenzeli.

Kule meko, masiye ku iirekhodi.py kwaye uchaze ukuba umyalezo wesi sihloko kufuneka ujongeke njani:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Njengoko unokuba uqikelele, i-faust isebenzisa uhlobo lwe-python yochazo ukuchaza i-schema yomyalezo, yiyo loo nto eyona nguqulelo incinci ixhaswa lithala leencwadi. 3.6.

Masibuyele kwi-arhente, seta iindidi kwaye uyongeze:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Njengoko ubona, sidlula iparameter entsha kunye neskimu kwindlela yokuqalisa isihloko - value_type. Ngapha koko, yonke into ilandela iskimu esifanayo, ngoko andiboni nqaku lokuhlala kuyo nayiphi na enye into.

Ewe, uchuku lokugqibela kukongeza umnxeba kwiarhente yokuqokelela ulwazi lwemeta ukuqokelela_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Sisebenzisa inkqubo echazwe ngaphambili yomyalezo. Kule meko, ndisebenzise indlela yokuphosa kuba akukho mfuneko yokuba silinde isiphumo esivela kwi-arhente, kodwa kufanelekile ukuyikhankanya loo nto. iindlela thumela umyalezo kwisihloko:

  1. cast - ayivali kuba ayilindelanga siphumo. Awukwazi ukuthumela isiphumo kwesinye isihloko njengomyalezo.

  2. thumela - ayithinteli kuba ayilindelanga siphumo. Ungakhankanya i-arhente kwisihloko apho umphumo uya kuya khona.

  3. cela - ulindele umphumo. Ungakhankanya i-arhente kwisihloko apho umphumo uya kuya khona.

Ke, konke oku kunye neearhente zanamhlanje!

Iqela lamaphupha

Into yokugqibela endithembise ukuyibhala kule nxalenye yimiyalelo. Njengoko bekutshiwo ngaphambili, imiyalelo kwi-faust kukucofa ngokucofa. Ngapha koko, i-faust incamathela ngokulula umyalelo wethu wesiko kujongano lwayo xa uchaza iqhosha -A

Emva kweearhente ezibhengeziweyo kwi iiarhente.py yongeza umsebenzi kunye nomhombiso app.commandukubiza indlela ukuphosa у qokelela_iisecuritites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ke, ukuba sibiza uluhlu lwemiyalelo, umyalelo wethu omtsha uya kuba kuwo:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Singayisebenzisa njengaye nabani na ongomnye, ke masiqale kwakhona umsebenzi ongasebenziyo kwaye siqale ingqokelela epheleleyo yokhuseleko:

> faust -A horton.agents start-collect-securities

Kuza kwenzeka ntoni emva koko?

Kwinxalenye elandelayo, sisebenzisa ii-arhente eziseleyo njengomzekelo, siya kuqwalasela indlela ye-sink yokukhangela ukugqithiswa kwamaxabiso okuvala okuthengiswa konyaka kunye nokuqaliswa kwe-cron yee-arhente.

Kuphelele apho namhlanje! Enkosi ngokufunda :)

Ikhowudi yale nxalenye

Imisebenzi yangasemva kwiFaust, iCandelo II: Iiarhente kunye namaQela

PS Ngaphantsi kwenxalenye yokugqibela ndibuzwe malunga ne-faust kunye ne-confluent kafka (Zeziphi iimpawu enazo i-confluent?). Kubonakala ngathi i-confluent isebenza ngeendlela ezininzi, kodwa inyaniso kukuba i-faust ayinayo inkxaso epheleleyo yomxhasi we-confluent - oku kulandela iinkcazo zezithintelo zabaxumi kwi doc.

umthombo: www.habr.com

Yongeza izimvo