Mga buluhaton sa background sa Faust, Bahin II: Mga Ahente ug Mga Team

Mga buluhaton sa background sa Faust, Bahin II: Mga Ahente ug Mga Team

Mga sulud sa sulud

  1. Bahin I: Pasiuna

  2. Bahin II: Mga Ahente ug Mga Grupo

Unsay atong gibuhat dinhi?

Busa, sa ikaduha nga bahin. Sama sa nahisulat sa sayo pa, niini atong buhaton ang mosunod:

  1. Atong isulat ang usa ka gamay nga kliyente alang sa alphavantage sa aiohttp nga adunay mga hangyo alang sa mga endpoint nga atong gikinahanglan.

  2. Maghimo kita usa ka ahente nga mangolekta mga datos sa mga securities ug meta nga impormasyon bahin niini.

Apan, kini ang atong buhaton alang sa proyekto mismo, ug sa mga termino sa paspas nga panukiduki, makakat-on kita kung unsaon pagsulat ang mga ahente nga nagproseso sa mga panghitabo sa stream gikan sa kafka, ingon man kung giunsa pagsulat ang mga mando (pag-klik sa wrapper), sa among kaso - alang sa manwal nga mga mensahe sa pagduso sa hilisgutan nga gibantayan sa ahente.

Training

Kliyente sa AlphaVantage

Una, magsulat kita og gamay nga aiohttp nga kliyente para sa mga hangyo sa alphavantage.

alphavantage.py

maglalaglag

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Sa tinuud, ang tanan klaro gikan niini:

  1. Ang AlphaVantage API kay yano ug nindot nga pagkadisenyo, mao nga nakahukom ko nga himoon ang tanang hangyo pinaagi sa pamaagi construct_query diin sa baylo adunay usa ka tawag sa http.

  2. Gidala nako ang tanan nga mga uma sa snake_case alang sa kasayon

  3. Aw, ang logger.catch nga dekorasyon alang sa matahum ug informative nga traceback nga output.

PS Ayaw kalimti nga idugang ang alphavantage token sa lokal sa config.yml, o i-export ang environment variable HORTON_SERVICE_APIKEY. Nakadawat mi og token dinhi.

CRUD nga klase

Adunay kami usa ka koleksyon sa mga securities aron tipigan ang meta nga impormasyon bahin sa mga securities.

database/security.py

Sa akong opinyon, dili kinahanglan nga ipasabut ang bisan unsa dinhi, ug ang base nga klase mismo yano ra.

get_app()

Atong idugang ang usa ka function alang sa paghimo sa usa ka aplikasyon nga butang sa app.py

maglalaglag

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Sa pagkakaron aduna kami'y pinakasimple nga paghimo sa aplikasyon, sa dili madugay among palapdan kini, bisan pa, aron dili ka maghulat, dinhi mga pakisayran ngadto sa App-class. Gitambagan ko usab ikaw nga tan-awon ang klase sa mga setting, tungod kay kini ang responsable sa kadaghanan sa mga setting.

Main nga bahin

Ahente sa pagkolekta ug pagmentinar sa listahan sa mga securities

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Busa, una atong makuha ang faust application object - kini yano ra. Sunod, kami tin-aw nga nagpahayag sa usa ka hilisgutan alang sa among ahente ... Dinhi takus nga hisgutan kung unsa kini, kung unsa ang internal nga parameter ug kung giunsa kini mahimo nga gihan-ay nga lahi.

  1. Ang mga hilisgutan sa kafka, kung gusto naton mahibal-an ang eksakto nga kahulugan, mas maayo nga basahon off. dokumento, o makabasa ka kompendyum sa Habré sa Russian, diin ang tanan gipakita usab nga tukma :)

  2. Parameter sa sulod, nga gihulagway nga maayo sa faust doc, nagtugot kanamo sa pag-configure sa hilisgutan direkta sa code, siyempre, kini nagpasabot sa mga parameter nga gihatag sa faust developers, pananglitan: retention, retention policy (pinaagi sa default delete, apan mahimo nimong itakda compact), gidaghanon sa mga partisyon matag hilisgutan (partisyonsa pagbuhat, alang sa panig-ingnan, ubos pa kay sa global nga kahulogan mga aplikasyon faust).

  3. Sa kinatibuk-an, ang ahente makahimo og usa ka gidumala nga hilisgutan nga adunay mga global nga bili, bisan pa, gusto nako nga ipahayag ang tanan nga tin-aw. Dugang pa, ang pipila ka mga parameter (pananglitan, ang gidaghanon sa mga partisyon o palisiya sa pagpadayon) sa hilisgutan sa advertisement sa ahente dili ma-configure.

    Ania kung unsa ang hitsura niini kung wala’y manual nga pagtino sa hilisgutan:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Aw, karon atong ihulagway kung unsa ang buhaton sa atong ahente :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Mao nga, sa sinugdanan sa ahente, nagbukas kami usa ka sesyon sa aiohttp alang sa mga hangyo pinaagi sa among kliyente. Busa, sa pagsugod sa usa ka trabahante, kung ang among ahente gilunsad, usa ka sesyon ang maablihan dayon - usa, sa tibuok panahon nga ang trabahante nagdagan (o daghan, kung imong usbon ang parameter pag-uyon gikan sa ahente nga adunay default nga yunit).

Sunod, gisundan namo ang sapa (gibutang namo ang mensahe sa _, tungod kay kami, sa kini nga ahente, wala magtagad sa sulud) sa mga mensahe gikan sa among hilisgutan, kung kini anaa sa kasamtangan nga offset, kung dili ang among siklo maghulat sa ilang pag-abot. Aw, sa sulod sa among loop, among gi-log ang resibo sa mensahe, pagkuha usa ka lista sa aktibo (get_securities mobalik nga aktibo lamang pinaagi sa default, tan-awa ang code sa kliyente) mga securities ug i-save kini sa database, pagsusi kung adunay usa ka seguridad nga adunay parehas nga ticker ug pagbaylo sa database , kung naa, nan kini (ang papel) ma-update ra.

Atong sugdan ang atong pagmugna!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Mga Feature sa PS web component Dili nako hisgotan ang faus sa mga artikulo, busa among gibutang ang angay nga bandila.

Sa among launch command, gisultihan namo si faust kung asa pangitaon ang aplikasyon nga butang ug unsa ang buhaton niini (paglunsad og usa ka trabahante) sa lebel sa output log sa impormasyon. Atong makuha ang mosunod nga output:

maglalaglag

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Buhi na!!!

Atong tan-awon ang partition set. Sama sa atong makita, usa ka hilisgutan ang gibuhat nga adunay ngalan nga among gitudlo sa code, ang default nga gidaghanon sa mga partisyon (8, gikuha gikan sa topic_partitions - application object parameter), tungod kay wala kami nagtino sa usa ka indibidwal nga kantidad alang sa among hilisgutan (pinaagi sa mga partisyon). Ang gilansad nga ahente sa trabahante gi-assign sa tanan nga 8 nga mga partisyon, tungod kay kini usa ra, apan kini hisgotan sa mas detalyado sa bahin bahin sa clustering.

Aw, karon makaadto na kita sa laing terminal window ug magpadala ug walay sulod nga mensahe sa atong topiko:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS gamit @ gipakita namon nga nagpadala kami usa ka mensahe sa usa ka hilisgutan nga ginganlag "collect_securities".

Sa kini nga kaso, ang mensahe miadto sa partition 6 - mahimo nimong susihon kini pinaagi sa pag-adto sa kafdrop sa localhost:9000

Pag-adto sa terminal nga bintana uban sa among trabahante, among makita ang malipayong mensahe nga gipadala gamit ang loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Mahimo usab natong tan-awon ang mongo (gamit ang Robo3T o Studio3T) ug tan-awon nga ang mga securities naa sa database:

Dili ako usa ka bilyonaryo, ug busa kontento kami sa una nga kapilian sa pagtan-aw.

Mga buluhaton sa background sa Faust, Bahin II: Mga Ahente ug Mga TeamMga buluhaton sa background sa Faust, Bahin II: Mga Ahente ug Mga Team

Kalipay ug kalipay - andam na ang unang ahente :)

Andam na ang ahente, mabuhi ang bag-ong ahente!

Oo, mga ginoo, 1/3 pa lang sa dalan nga giandam niining artikuloha ang among nasakpan, apan ayawg kaluya, kay karon mas sayon ​​na.

Mao nga karon kinahanglan namon ang usa ka ahente nga nagkolekta sa impormasyon sa meta ug gibutang kini sa usa ka dokumento sa pagkolekta:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Tungod kay kini nga ahente magproseso sa kasayuran bahin sa usa ka piho nga seguridad, kinahanglan namon nga ipakita ang ticker (simbolo) niini nga seguridad sa mensahe. Alang niini nga katuyoan sa faus adunay mga rekord — mga klase nga nagpahayag sa laraw sa mensahe sa hilisgutan sa ahente.

Sa kini nga kaso, moadto kita sa records.py ug ihulagway kung unsa ang hitsura sa mensahe alang niini nga hilisgutan:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Sama sa imong nahunahunaan, gigamit ni faust ang python type annotation aron ihulagway ang schema sa mensahe, mao nga ang minimum nga bersyon nga gisuportahan sa librarya 3.6.

Mobalik kita sa ahente, ibutang ang mga tipo ug idugang kini:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Sama sa imong nakita, gipasa namon ang usa ka bag-ong parameter nga adunay laraw sa pamaagi sa pagsugod sa hilisgutan - value_type. Dugang pa, ang tanan nagsunod sa parehas nga laraw, mao nga wala akoy nakita nga punto sa pagpadayon sa bisan unsang butang.

Aw, ang katapusan nga paghikap mao ang pagdugang usa ka tawag sa ahente sa pagkolekta sa impormasyon sa meta aron makolekta_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Gigamit namon ang gipahibalo kaniadto nga laraw alang sa mensahe. Sa kini nga kaso, gigamit nako ang .cast nga pamaagi tungod kay dili na kinahanglan nga maghulat alang sa resulta gikan sa ahente, apan angay nga hisgutan kana. mga paagi ipadala ang usa ka mensahe sa hilisgutan:

  1. cast - dili babagan tungod kay wala kini magdahom sa usa ka resulta. Dili nimo ipadala ang resulta sa laing hilisgutan isip mensahe.

  2. ipadala - dili babagan tungod kay wala kini magdahom nga resulta. Mahimo nimong ipiho ang usa ka ahente sa hilisgutan kung diin moadto ang resulta.

  3. mangutana - naghulat alang sa usa ka resulta. Mahimo nimong ipiho ang usa ka ahente sa hilisgutan kung diin moadto ang resulta.

Busa, kana ang tanan sa mga ahente alang sa karon!

Ang Dream Team

Ang kataposang butang nga akong gisaad nga isulat niini nga bahin mao ang mga sugo. Sama sa gihisgutan sa sayo pa, ang mga sugo sa faust usa ka wrapper sa palibot sa pag-click. Sa tinuud, ang faust yano nga nag-attach sa among naandan nga mando sa interface niini kung gipiho ang -A nga yawe

Human sa gipahibalo nga mga ahente sa agents.py pagdugang usa ka function nga adunay usa ka dekorador app.commandpagtawag sa pamaagi gisalikway у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Busa, kon atong tawgon ang listahan sa mga sugo, ang atong bag-ong sugo anaa niini:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Mahimo natong gamiton kini sama sa uban, busa atong i-restart ang faust worker ug magsugod sa usa ka hingpit nga koleksyon sa mga securities:

> faust -A horton.agents start-collect-securities

Unsay sunod nga mahitabo?

Sa sunod nga bahin, gamit ang nahabilin nga mga ahente ingon usa ka pananglitan, atong hisgotan ang mekanismo sa lababo alang sa pagpangita sa mga sobra sa panapos nga mga presyo sa pamatigayon alang sa tuig ug ang paglansad sa cron sa mga ahente.

Kana lang para karong adlawa! Salamat sa pagbasa :)

Kodigo alang niini nga bahin

Mga buluhaton sa background sa Faust, Bahin II: Mga Ahente ug Mga Team

PS Ubos sa katapusang bahin gipangutana ko bahin sa faust ug confluent kafka (unsa nga mga bahin ang adunay confluent?). Ingon og ang confluent mas magamit sa daghang mga paagi, apan ang kamatuoran mao nga ang faust walay hingpit nga suporta sa kliyente alang sa confluent - kini nagsunod gikan sa mga paghubit sa mga pagdili sa kliyente sa doc.

Source: www.habr.com

Idugang sa usa ka comment