Kumashure mabasa paFaust, Chikamu II: Agents uye Matimu

Kumashure mabasa paFaust, Chikamu II: Agents uye Matimu

Tafura yezvinyorwa

  1. Chikamu I: Nhanganyaya

  2. Chikamu II: Agents uye Zvikwata

Tiri kuitei pano?

Saka, saka, chikamu chechipiri. Sezvazvakanyorwa kare, mairi tichaita zvinotevera:

  1. Ngatinyorei mutengi mudiki kune alphavantage paaiohttp nezvikumbiro zvemagumo atinoda.

  2. Ngatigadzirei mumiririri anozounganidza data pamusoro pezvivimbiso uye meta ruzivo pazviri.

Asi, izvi ndizvo zvatichaitira purojekiti yacho pachayo, uye maererano nekutsvagisa kwakanyanya, isu tichadzidza kunyora maajenti anogadzira zviitiko zvekuyerera kubva kafka, pamwe nekunyora mirairo (tinya wrapper), mune yedu - yemanyorero ekusundidzira mameseji kune iyo nyaya iyo mumiririri ari kutarisa.

Kugadzirira kwe

AlphaVantage Client

Kutanga, ngatinyore diki aiohttp mutengi kune zvikumbiro kune alphavantage.

alphavantage.py

pomuparadzi

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Muchokwadi, zvese zviri pachena kubva pazviri:

  1. Iyo AlphaVantage API iri nyore uye yakanyatsogadzirwa, saka ndakafunga kuita zvikumbiro zvese kuburikidza nenzira construct_query uko zvakare kune http kufona.

  2. Ndinounza minda yese kwairi snake_case kuitira nyore.

  3. Zvakanaka, iyo logger.catch kushongedzwa kune yakanaka uye inodzidzisa traceback kubuda.

PS Usakanganwa kuwedzera iyo alphavantage token munharaunda yako ku config.yml, kana kutumira kunze kwenyika kushanduka HORTON_SERVICE_APIKEY. Tinogamuchira chiratidzo pano.

CRUD kirasi

Tichava nekuunganidza kwekuchengetedza kuchengetedza meta ruzivo nezve kuchengetedzwa.

database/security.py

Mukuona kwangu, hapana chikonzero chekutsanangura chero chinhu pano, uye kirasi yega yega iri nyore.

get_app()

Ngatiwedzerei basa rekugadzira chinhu chekushandisa mukati app.py

pomuparadzi

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Parizvino tichava neyakareruka yekugadzira application, gare gare tichaiwedzera, zvisinei, kuti usarambe wakamirira, pano. mareferensi kuApp-class. Ini zvakare ndinokupa zano kuti utarise iyo kirasi yekumisikidza, sezvo ine mutoro kune mazhinji ezvirongwa.

Chikamu chikuru

Mumiririri wekuunganidza nekuchengetedza runyoro rwezvivimbiso

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Saka, chekutanga tinowana iyo yakanyanyisa application chinhu - zviri nyore. Zvadaro, tinozivisa zvakajeka musoro wemumiririri wedu ... Pano zvakakodzera kutaura kuti chii, chii chinonzi parameter yemukati uye kuti izvi zvinogona kurongwa sei zvakasiyana.

  1. Misoro mu kafka, kana tichida kuziva tsanangudzo chaiyo, zviri nani kuverenga off. document, kana kuti unogona kuverenga compendium paHabré muchiRussia, uko zvese zvinoratidzwawo nemazvo :)

  2. Parameter mukati, inotsanangurwa zvakanyatsonaka mune faust doc, inotibvumira kuti tigadzirise musoro wacho zvakananga mukodhi, hongu, izvi zvinoreva maparameter anopiwa nevanogadzirisa faust, semuenzaniso: kuchengetedza, kuchengetedza mutemo (nekuda kudzima, asi unogona kuseta. tsindirana), nhamba yezvikamu pamusoro wenyaya (chikamukuita, semuenzaniso, zvishoma pane kukosha kwepasi rose zvikumbiro zvinokurumidza).

  3. Kazhinji, mumiririri anogona kugadzira chinyorwa chinogadziriswa chine tsika dzepasirese, zvisinei, ini ndinoda kuzivisa zvese zvakajeka. Uye zvakare, mamwe ma paramita (semuenzaniso, nhamba yezvikamu kana chengetedzo mutemo) yemusoro mushambadziro yemumiririri haigone kugadzirwa.

    Hezvino zvazvingaite pasina kutsanangura nemaoko musoro wenyaya:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Zvakanaka, ikozvino ngatitsanangure zvichaitwa nemumiririri wedu :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Saka, pakutanga kwemumiririri, tinovhura aiohttp chikamu chezvikumbiro kuburikidza nemutengi wedu. Saka, kana uchitanga mushandi, kana mumiririri wedu atanga, chikamu chichavhurwa pakarepo - imwe, kwenguva yose iyo mushandi ari kushanda (kana akati wandei, kana ukachinja parameter. concurrency kubva kumumiririri ane chikwata chekare).

Tevere, tinotevera rukova (tinoisa meseji mukati _, sezvo isu, mumiririri uyu, tisina hanya nezvezvirimo) zvemashoko kubva mumusoro wedu, kana aripo pane yazvino offset, zvikasadaro kutenderera kwedu kunomirira kusvika kwavo. Zvakanaka, mukati mechiuno chedu, tinonyora risiti yemeseji, tora runyoro rwekushanda (get_securities inodzoka chete inoshanda nekusarudzika, ona kodhi yemutengi) zvibatiso uye chengetedza kune dhatabhesi, uchitarisa kana paine chengetedzo ine imwechete ticker uye. kuchinjanisa mu database , kana iripo, ipapo iyo (bepa) inongogadziriswa.

Ngatitangei kusikwa kwedu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features web chikamu Ini handingatarise faust muzvinyorwa, saka tinoisa mureza wakakodzera.

Mukuraira kwedu kwekutanga, takaudza faust kwekutsvaga chinhu chekushandisa uye zvekuita nacho (kutanga mushandi) neiyo info log yekubuda level. Isu tinowana zvinotevera zvinobuda:

pomuparadzi

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Hupenyu!!!

Ngatitarisei pane partition set. Sezvatinoona, musoro wakagadzirwa une zita ratakasarudza mukodhi, nhamba yekusarudzika yezvikamu (8, zvakatorwa kubva topic_partitions - application chinhu parameter), sezvo isu tisina kutsanangura kukosha kwemunhu wenyaya yedu (kuburikidza nezvikamu). Mumiriri akatangwa mumushandi anopihwa ese 8 zvikamu, sezvo zviri izvo chete, asi izvi zvichakurukurwa zvakadzama muchikamu chekubatanidza.

Zvakanaka, ikozvino isu tinogona kuenda kune imwe terminal hwindo uye kutumira meseji isina chinhu kune yedu musoro wenyaya:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS kushandisa @ isu tinoratidza kuti tiri kutumira meseji kune musoro unonzi "collect_securities".

Muchiitiko ichi, meseji yakaenda kuchikamu chechitanhatu - unogona kutarisa izvi nekuenda ku kafdrop on localhost:9000

Kuenda kuhwindo rekupedzisira nemushandi wedu, tichaona meseji inofadza inotumirwa tichishandisa loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Isu tinogona zvakare kutarisa mumongo (tichishandisa Robo3T kana Studio3T) uye toona kuti masecurity ari mudhatabhesi:

Ini handisi bhirioni, uye saka isu tinogutsikana nesarudzo yekutanga yekuona.

Kumashure mabasa paFaust, Chikamu II: Agents uye MatimuKumashure mabasa paFaust, Chikamu II: Agents uye Matimu

Mufaro uye mufaro - mumiriri wekutanga akagadzirira :)

Mumiririri akagadzirira, rarama kwenguva refu mumiriri mutsva!

Hongu, varume, isu takangovhara 1/3 yegwara rakagadzirirwa nechinyorwa ichi, asi musaore moyo, nekuti zvino zvichave nyore.

Saka ikozvino tinoda mumiririri anounganidza meta ruzivo uye anoisa mugwaro rekuunganidza:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Sezvo mumiririri uyu achizogadzirisa ruzivo nezve imwe chengetedzo, isu tinofanirwa kuratidza iyo ticker (chiratidzo) chekuchengetedza uku mumeseji. Nechinangwa ichi mune faust pane Records - makirasi anozivisa chirongwa chemeseji mumusoro weagent.

Muchiitiko ichi, ngatiendei records.py uye tsanangura kuti meseji yenyaya iyi inofanirwa kutaridzika sei:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Sezvaungave wakafungidzira, faust anoshandisa iyo python mhando rondedzero kutsanangura meseji schema, ndosaka iyo shoma shanduro inotsigirwa neraibhurari iri. 3.6.

Ngatidzokere kumumiririri, toisa mhando uye tiwedzere:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Sezvauri kuona, isu tinopfuudza paramende nyowani ine chirongwa kune iyo nyaya yekutanga nzira - value_type. Kupfuurirazve, zvese zvinotevera chirongwa chimwe chete, saka ini handioni chero chikonzero chekugara pane chimwe chinhu.

Zvakanaka, kubata kwekupedzisira kuwedzera runhare kune meta ruzivo rwekuunganidza mumiriri kuunganidza_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Isu tinoshandisa iyo yakamboziviswa chirongwa chemeseji. Muchiitiko ichi, ndakashandisa .cast nzira sezvo isu hatidi kumirira mhedzisiro kubva kumumiririri, asi zvakakodzera kuti titaure izvozvo. nzira tumira meseji kumusoro wenyaya:

  1. cast - haivharidzi nekuti haitarisiri mhedzisiro. Iwe haugone kutumira mhinduro kune imwe musoro semeseji.

  2. tumira - haivharidzi nekuti haitarisiri mhedzisiro. Iwe unogona kutsanangura mumiriri mune iyo mhedzisiro ichaenda.

  3. kubvunza - kumirira mhedzisiro. Iwe unogona kutsanangura mumiriri mune iyo mhedzisiro ichaenda.

Saka, ndizvo zvese nemaajenti anhasi!

The dream Team

Chinhu chekupedzisira chandakavimbisa kunyora muchikamu chino mirairo. Sezvambotaurwa, mirairo mu faust ndeye kuputira kwakatenderedza kudzvanya. Muchokwadi, faust inongobatanidza tsika yedu yekuraira kune yayo interface kana ichitsanangura iyo -A kiyi

Mushure mokunge vamiririri vakaziviswa vapinda vamiririri.py wedzera basa nemushonga app.commandkufona nzira cast у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Saka, kana tikadaidza rondedzero yemirairo, murairo wedu mutsva uchave mauri:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Isu tinogona kuishandisa semumwe munhu wese, saka ngatitangeizve mushandi akapusa uye titange yakazara-yakazara muunganidzwa wezvivimbiso:

> faust -A horton.agents start-collect-securities

Chii chichatevera kuitika?

Muchikamu chinotevera, tichishandisa vamiririri vakasara semuenzaniso, isu tichafunga nzira yekunyura yekutsvaga zvakanyanyisa mumitengo yekuvhara yekutengesa kwegore uye cron kutanga kwevamiririri.

Ndizvo zvanhasi! Thanks nekuverenga :)

Kodhi yechikamu ichi

Kumashure mabasa paFaust, Chikamu II: Agents uye Matimu

PS Pasi pechikamu chekupedzisira ndakabvunzwa nezve faust uye confluent kafka (zvimiro zvipi zvine confluent?) Zvinoita sekunge confluent inonyanya kushanda munzira dzakawanda, asi chokwadi ndechekuti faust haina tsigiro yakazara yevatengi kune confluent - izvi zvinotevera kubva tsananguro dzezvirambidzo zvevatengi mudoc.

Source: www.habr.com

Voeg