Karên paşîn ên li ser Faust, Beş II: Ajan û Tîm

Karên paşîn ên li ser Faust, Beş II: Ajan û Tîm

Table of Contents

  1. Beş I: Destpêk

  2. Beş II: Ajan û Tîm

Em li vir çi dikin?

Ji ber vê yekê, beşa duyemîn. Wekî ku berê hatî nivîsandin, di wê de em ê jêrîn bikin:

  1. Ka em ji bo alphavantage xerîdarek piçûk li aiohttp bi daxwazên xalên dawî yên ku em hewce ne binivîsin.

  2. Ka em ajanek biafirînin ku dê daneyên li ser ewlehiyê û agahdariya meta li ser wan berhev bike.

Lê, ya ku em ê ji bo projeyê bixwe bikin ev e, û di warê lêkolîna faustê de, em ê fêr bibin ka meriv çawa ajanên ku bûyeran ji kafka diherikînin binivîsin, û her weha meriv çawa fermanan binivîsîne (wrapper bikirtîne), di doza me de - ji bo peyamên push manual ji bo mijara ku agent çavdêriya.

Amadekirin

AlphaVantage Client

Pêşîn, bila ji bo daxwazên alphavantage xerîdarek aiohttp piçûk binivîsin.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Bi rastî, her tişt ji wê zelal e:

  1. AlphaVantage API pir sade û xweşik hatî sêwirandin, ji ber vê yekê min biryar da ku hemî daxwazan bi rêbazê bikim construct_query li ku derê bangek http heye.

  2. Ez hemû zeviyan bînim ber snake_case ji bo rihetiyê.

  3. Welê, xemla logger.catch ji bo derketina şopandina xweşik û agahdar.

PS Ji bîr nekin ku nîşana alphavantage-ya herêmî li config.yml zêde bikin, an guhêrbara jîngehê derxînin HORTON_SERVICE_APIKEY. Em nîşanek distînin vir.

çîna CRUD

Em ê berhevokek ewlehiyê hebe ku agahdariya meta di derheqê ewlehiyê de hilîne.

database/security.py

Bi dîtina min, ne hewce ye ku li vir tiştek were ravekirin, û çîna bingehîn bixwe pir hêsan e.

get_app ()

Werin em fonksiyonek ji bo afirandina objeyek serîlêdanê lê zêde bikin app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Heya nuha em ê xwedan çêkirina serîlêdanê ya herî hêsan bin, piçek paşê em ê wê berfireh bikin, lêbelê, ji bo ku hûn li bendê nemînin, li vir referansên ji bo App-class. Di heman demê de ez ji we re şîret dikim ku hûn li çîna mîhengan binihêrin, ji ber ku ew ji piraniya mîhengan berpirsiyar e.

Beşek sereke

Agent ji bo berhevkirin û domandina navnîşek ewlehiyê

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ji ber vê yekê, pêşî em armanca serîlêdana faust digirin - ew pir hêsan e. Dûv re, em bi eşkere mijarek ji bo nûnerê xwe radigihînin... Li vir hêjayî gotinê ye ku ew çi ye, pîvana hundurîn çi ye û çawa ev dikare bi rengek cûda were saz kirin.

  1. Mijarên di kafkayê de, ger em bixwazin pênaseya rast bizanibin, çêtir e ku em bixwînin ji. belge, an hûn dikarin bixwînin berhevok li ser Habré bi rûsî, ku her tişt jî bi rengek rast xuya dike :)

  2. Parametreya navxweyî, ku di belgeya faustê de pir xweş tê diyar kirin, dihêle ku em mijarê rasterast di kodê de mîheng bikin, bê guman, ev tê vê wateyê ku pîvanên ku ji hêla pêşdebirên faust ve têne peyda kirin, mînakî: ragirtin, polîtîkaya ragirtinê (bi xwerû jêbirin, lê hûn dikarin saz bikin gişt), hejmara beşan li ser her mijarê (partîsiyonanji bo nimûne, kêmtir ji girîngiya cîhanî sepanên faust).

  3. Bi gelemperî, ajan dikare mijarek birêvebirî bi nirxên gerdûnî biafirîne, lêbelê, ez dixwazim her tiştî bi eşkere eşkere bikim. Wekî din, hin pîvan (mînak, hejmara dabeşan an polîtîkaya ragirtinê) ya mijarê di reklama ajansê de nayê mîheng kirin.

    Li vir ev e ku ew bêyî destnîşankirina mijarê bi destan xuya dike:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Belê, naha em diyar bikin ka dê nûnerê me çi bike :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ji ber vê yekê, di destpêka nûnerê de, em ji bo daxwaznameyên bi navgîniya muwekîlê xwe ve rûniştinek aiohttp vedikin. Bi vî rengî, dema ku xebatkarek dest pê dike, dema ku nûnerê me dest pê dike, dê tavilê danişînek were vekirin - yek, ji bo tevahiya dema ku xebatkar dimeşîne (an çend, heke hûn pîvanê biguherînin hevdemî ji karmendek bi yekîneyek xwerû).

Dûv re, em tîrêjê dişopînin (em peyamê tê de cih digirin _, ji ber ku em, di vê ajansê de, bala xwe nadin naverokê) peyamên ji mijara xwe, heke ew di dema niha de hebin, wekî din çerxa me dê li benda hatina wan bimîne. Welê, di hundurê lûleya xwe de, em wergirtina peyamê têketinê, navnîşek ewlekariya çalak (get_securities tenê ji hêla xwerû ve vedigere, koda xerîdar bibîne) distînin û wê li databasê hilînin, kontrol dikin ka ewlehîyek bi heman tîpê heye û danûstendina di databasê de, heke hebe, wê hingê ew (kaxiz) tenê dê were nûve kirin.

Werin em afirandina xwe bidin destpêkirin!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Taybetmendiyên PS pêkhateya webê Ez ê di gotaran de faustê nehesibînim, ji ber vê yekê em ala guncan destnîşan dikin.

Di fermana destpêkirina xwe de, me ji faustê re got ku li kuderê li hêmana serîlêdanê bigere û bi wê re çi bike (karkerek bide destpêkirin) bi asta derana têketina agahdariyê. Em encamek jêrîn bistînin:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ew zindî ye!!!

Ka em li set dabeşkirinê binêrin. Wekî ku em dibînin, mijarek bi navê ku me di kodê de destnîşan kiriye, hejmara xwerû ya dabeşan (8, ji topic_partitions - Parametreya objeya serîlêdanê), ji ber ku me ji bo mijara xwe nirxek kesane diyar nekir (bi riya dabeşan). Di xebatkarê de nûnerê dest pêkirî ji her 8 dabeşan re têne destnîşan kirin, ji ber ku ew yekane ye, lê ev ê di beşa di derbarê kombûnê de bi hûrgulî were nîqaş kirin.

Welê, naha em dikarin biçin pencereyek termînalek din û peyamek vala ji mijara xwe re bişînin:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS bikar tîne @ em nîşan didin ku em ji mijarek bi navê "collect_securities" re peyamek dişînin.

Di vê rewşê de, peyam çû dabeşkirina 6 - hûn dikarin vê yekê bi çûna kafdrop-ê kontrol bikin localhost:9000

Bi xebatkarê xwe re diçin pencereya termînalê, em ê peyamek kêfxweş bibînin ku bi karanîna loguru hatî şandin:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Em dikarin li mongo jî binihêrin (bi Robo3T an Studio3T bikar tînin) û bibînin ku ewlekarî di databasê de ne:

Ez ne mîlyarder im, û ji ber vê yekê em bi vebijarka dîtina yekem razî ne.

Karên paşîn ên li ser Faust, Beş II: Ajan û TîmKarên paşîn ên li ser Faust, Beş II: Ajan û Tîm

Kêfxweşî û şahî - nûnerê yekem amade ye :)

Agent amade ye, bijî ajanê nû!

Erê, birêzan, me tenê 1/3 riya ku ji hêla vê gotarê ve hatî amadekirin vegirtiye, lê dilteng nebin, ji ber ku niha ew ê hêsantir be.

Ji ber vê yekê naha ji me re karmendek pêdivî ye ku agahdariya meta berhev dike û wê dixe nav belgeyek berhevokê:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ji ber ku ev ajan dê di derheqê ewlehîyek taybetî de agahdariya pêvajoyê bike, pêdivî ye ku em di peyamê de nîşana (sembola) vê ewlehiyê destnîşan bikin. Ji bo vê armancê di faustê de hene Records - dersên ku di mijara agentê de nexşeya peyamê radigihînin.

Di vê rewşê de, em biçin records.py û diyar bike ka peyama vê mijarê divê çawa xuya bike:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Wekî ku we texmîn kir, faust annotation tîpa python bikar tîne da ku şema peyamê rave bike, ji ber vê yekê guhertoya herî kêm a ku ji hêla pirtûkxaneyê ve hatî piştgirî kirin e. 3.6.

Ka em vegerin ser ajanê, celeb saz bikin û lê zêde bikin:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Wekî ku hûn dibînin, em bi pileyek pîvanek nû ji rêbaza destpêkirina mijarê re derbas dikin - value_type. Digel vê yekê, her tişt heman pilanê dişopîne, ji ber vê yekê ez ti xalê nabînim ku ez li ser tiştek din bisekinim.

Welê, pêwendiya paşîn ev e ku meriv bangek li nûnerê berhevkirina agahdariya meta zêde bike da ku collect_securites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Em ji bo peyamê nexşeya ku berê hatî ragihandin bikar tînin. Di vê rewşê de, min rêbaza .cast bikar anî ji ber ku em ne hewce ne ku li benda encamê ji nûnerê bisekinin, lê hêjayî gotinê ye ku awayên ji mijarê re peyamek bişînin:

  1. avêtin - asteng nake ji ber ku ew li hêviya encamek ne. Hûn nikarin encamê wekî peyamek ji mijarek din re bişînin.

  2. şandin - asteng nake ji ber ku encamek hêvî nake. Hûn dikarin di mijara ku encam jê re diçe de karmendek diyar bikin.

  3. bipirse - li benda encamê ye. Hûn dikarin di mijara ku encam jê re diçe de karmendek diyar bikin.

Ji ber vê yekê, ew hemî bi ajanên îro re ye!

Tîma xewnê

Tişta dawî ku min soz da ku di vê beşê de binivîsim ferman e. Wekî ku berê hate behs kirin, fermanên di faustê de li dora klîk pêça ne. Bi rastî, faust dema ku mifteya -A destnîşan dike, tenê fermana meya xwerû bi navbeyna xwe ve girêdide

Piştî ku ajanên hatin ragihandin ketin agents.py fonksiyonek bi dekoratorê zêde bike app.commandgazîkirina rêbazê avdan у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Bi vî rengî, heke em navnîşa fermanan bang bikin, emrê meya nû dê tê de be:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Em dikarin wê mîna her kesê bikar bînin, ji ber vê yekê em xebatkarê faust ji nû ve bidin destpêkirin û berhevokek bêkêmasî ya ewlehiyê dest pê bikin:

> faust -A horton.agents start-collect-securities

Dê paşê çi bibe?

Di beşa paşîn de, wekî mînakek ajanên mayî bikar bînin, em ê mekanîzmaya sink ji bo lêgerîna tundûtûjiyê di bihayên girtina bazirganiya salê de û destpêkirina krona ajanan de binirxînin.

Ji bo îro her tişt e! Spas ji bo xwendinê :)

Koda ji bo vê beşê

Karên paşîn ên li ser Faust, Beş II: Ajan û Tîm

PS Di beşa dawîn de ji min hat pirsîn kafka faust û tevlihev (çi taybetiyên hevgirtî hene?). Wusa dixuye ku confluent bi gelek awayan fonksiyoneltir e, lê rastiyek ev e ku faust ji bo konfluentê xwedan piştgiriyek tam a xerîdar nîne - ev ji danasînên sînorkirinên xerîdar di doc.

Source: www.habr.com

Add a comment