Fona uzdevumi par Faustu, II daļa: Aģenti un komandas

Fona uzdevumi par Faustu, II daļa: Aģenti un komandas

Satura

  1. I daļa: Ievads

  2. II daļa: Aģenti un komandas

Ko mēs Å”eit darām?

Tātad, tā, otrā daļa. Kā rakstÄ«ts iepriekÅ”, tajā mēs darÄ«sim sekojoÅ”o:

  1. Uzrakstīsim nelielu klientu alfavantage vietnei aiohttp ar pieprasījumiem par mums nepiecieŔamajiem galapunktiem.

  2. Izveidosim aģentu, kas apkopos datus par vērtspapīriem un meta informāciju par tiem.

Bet tas ir tas, ko mēs darÄ«sim paÅ”a projekta labā, un, runājot par fausta izpēti, mēs iemācÄ«simies rakstÄ«t aÄ£entus, kas apstrādā straumÄ“Å”anas notikumus no kafka, kā arÄ« kā rakstÄ«t komandas (klikŔķa iesaiņojums), mÅ«su gadÄ«jumā - manuāliem push ziņojumiem uz tēmu, kuru aÄ£ents uzrauga.

TreniņŔ

AlphaVantage klients

Vispirms uzrakstīsim nelielu aiohttp klientu alfavantage pieprasījumiem.

alfavantage.py

Spoileris

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Patiesībā no tā viss ir skaidrs:

  1. AlphaVantage API ir diezgan vienkārÅ”i un skaisti izstrādāta, tāpēc es nolēmu veikt visus pieprasÄ«jumus, izmantojot Å”o metodi construct_query kur savukārt ir http izsaukums.

  2. Es vedu visus laukus uz snake_case komfortam.

  3. Labi, logger.catch dekorācija skaistai un informatīvai izsekoŔanai.

PS Neaizmirstiet vietēji pievienot alfavantage marÄ·ieri vietnei config.yml vai eksportēt vides mainÄ«go HORTON_SERVICE_APIKEY. Mēs saņemam žetonu Å”eit.

CRUD klase

Mums būs vērtspapīru kolekcija, lai saglabātu metainformāciju par vērtspapīriem.

datubāze/security.py

Manuprāt, Ŕeit nekas nav jāskaidro, un pati bāzes klase ir diezgan vienkārŔa.

get_app()

Pievienosim funkciju lietojumprogrammas objekta izveidei app.py

Spoileris

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Pagaidām mums bÅ«s visvienkārŔākā aplikāciju izveide, nedaudz vēlāk to paplaÅ”ināsim, tomēr, lai neliktu gaidÄ«t, Å”eit atsauces uz App-class. Iesaku arÄ« apskatÄ«t iestatÄ«jumu klasi, jo tā ir atbildÄ«ga par lielāko daļu iestatÄ«jumu.

Galvenā daļa

AÄ£ents vērtspapÄ«ru saraksta vākÅ”anai un uzturÄ“Å”anai

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Tātad, vispirms mēs iegÅ«stam faust lietojumprogrammas objektu - tas ir diezgan vienkārÅ”i. Tālāk mēs skaidri deklarējam tēmu savam aÄ£entam... Å eit ir vērts pieminēt, kas tas ir, kāds ir iekŔējais parametrs un kā to var sakārtot citādi.

  1. Kafka tēmas, ja vēlamies uzzināt precÄ«zu definÄ«ciju, labāk izlasÄ«t izslēgts. dokumentu, vai arÄ« varat lasÄ«t apkopojums uz HabrĆ© krievu valodā, kur arÄ« viss ir diezgan precÄ«zi atspoguļots :)

  2. IekŔējais parametrs, diezgan labi aprakstÄ«ts faust dokumentā, ļauj konfigurēt tēmu tieÅ”i kodā, protams, tas nozÄ«mē faust izstrādātāju sniegtos parametrus, piemēram: saglabāŔana, saglabāŔanas politika (pēc noklusējuma dzēst, bet var iestatÄ«t kompakts), nodalÄ«jumu skaits vienā tēmā (rādÄ«tājidarÄ«t, piemēram, mazāk nekā globāla nozÄ«me pieteikumi faust).

  3. Kopumā aÄ£ents var izveidot pārvaldÄ«tu tēmu ar globālām vērtÄ«bām, tomēr man patÄ«k visu skaidri deklarēt. Turklāt dažus aÄ£enta sludinājuma tēmas parametrus (piemēram, nodalÄ«jumu skaitu vai saglabāŔanas politiku) nevar konfigurēt.

    Lūk, kā tas varētu izskatīties, manuāli nedefinējot tēmu:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nu, tagad aprakstīsim, ko darīs mūsu aģents :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Tātad aģenta darbības sākumā mēs atveram aiohttp sesiju pieprasījumiem, izmantojot mūsu klientu. Tādējādi, startējot strādnieku, kad tiek palaists mūsu aģents, nekavējoties tiks atvērta sesija - viena, uz visu laiku, kad darbinieks darbojas (vai vairākas, ja mainīsit parametru vienlaicība no aģenta ar noklusējuma vienību).

Tālāk mēs sekojam straumei (ievietojam ziņojumu _, jo mums, Å”ajā aÄ£entā, nerÅ«p mÅ«su tēmas ziņojumu saturs, ja tie pastāv paÅ”reizējā nobÄ«dē, pretējā gadÄ«jumā mÅ«su cikls gaidÄ«s to ieraÅ”anos. MÅ«su cilpas ietvaros mēs reÄ£istrējam ziņojuma saņemÅ”anu, iegÅ«stam aktÄ«vo (get_securities pēc noklusējuma atgriež tikai aktÄ«vus, skatiet klienta kodu) vērtspapÄ«ru sarakstu un saglabājam to datu bāzē, pārbaudot, vai ir vērtspapÄ«rs ar tādu paÅ”u atzÄ«mi un apmaiņa datubāzē , ja ir, tad tas (papÄ«rs) vienkārÅ”i tiks atjaunināts.

Sāksim savu radīŔanu!

> docker-compose up -d
... Š—Š°ŠæусŠŗ ŠŗŠ¾Š½Ń‚ŠµŠ¹Š½ŠµŃ€Š¾Š² ...
> faust -A horton.agents worker --without-web -l info

PS funkcijas tÄ«mekļa komponents Rakstos Faustu neuzskatÄ«Å”u, tāpēc uzlikām atbilstoÅ”u karogu.

Savā palaiÅ”anas komandā mēs teicām faustam, kur meklēt lietojumprogrammas objektu un ko ar to darÄ«t (palaist darbinieku) ar informācijas žurnāla izvades lÄ«meni. Mēs iegÅ«stam Ŕādu izvadi:

Spoileris

ā”ŒĘ’aĀµSā€  v1.10.4ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
ā”‚ id          ā”‚ horton                                            ā”‚
ā”‚ transport   ā”‚ [URL('kafka://localhost:9092')]                   ā”‚
ā”‚ store       ā”‚ memory:                                           ā”‚
ā”‚ log         ā”‚ -stderr- (info)                                   ā”‚
ā”‚ pid         ā”‚ 1271262                                           ā”‚
ā”‚ hostname    ā”‚ host-name                                         ā”‚
ā”‚ platform    ā”‚ CPython 3.8.2 (Linux x86_64)                      ā”‚
ā”‚ drivers     ā”‚                                                   ā”‚
ā”‚   transport ā”‚ aiokafka=1.1.6                                    ā”‚
ā”‚   web       ā”‚ aiohttp=3.6.2                                     ā”‚
ā”‚ datadir     ā”‚ /path/to/project/horton-data                      ā”‚
ā”‚ appdir      ā”‚ /path/to/project/horton-data/v1                   ā”‚
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
... Š»Š¾Š³Šø, Š»Š¾Š³Šø, Š»Š¾Š³Šø ...

ā”ŒTopic Partition Setā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
ā”‚ topic                      ā”‚ partitions ā”‚
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
ā”‚ collect_securities         ā”‚ {0-7}      ā”‚
ā”‚ horton-__assignor-__leader ā”‚ {0}        ā”‚
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ 

Tas ir dzīvs!!!

ApskatÄ«sim nodalÄ«jumu komplektu. Kā redzam, tika izveidota tēma ar nosaukumu, ko norādÄ«jām kodā, noklusēto nodalÄ«jumu skaitu (8, ņemts no topic_partitions - lietojumprogrammas objekta parametrs), jo mēs nenorādÄ«jām savai tēmai atseviŔķu vērtÄ«bu (izmantojot nodalÄ«jumus). Darbinieka palaistajam aÄ£entam tiek pieŔķirti visi 8 nodalÄ«jumi, jo tas ir vienÄ«gais, taču par to sÄ«kāk tiks runāts sadaļā par klasterizāciju.

Nu, tagad mēs varam pāriet uz citu termināļa logu un nosÅ«tÄ«t tukÅ”u ziņojumu mÅ«su tēmai:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS izmanto @ mēs parādām, ka mēs sÅ«tām ziņojumu uz tēmu ar nosaukumu ā€œcollect_securitiesā€.

Å ajā gadÄ«jumā ziņojums tika nosÅ«tÄ«ts uz 6. nodalÄ«jumu ā€” to varat pārbaudÄ«t, dodoties uz kafdrop on localhost:9000

Dodoties uz termināļa logu ar mūsu darbinieku, mēs redzēsim priecīgu ziņojumu, kas nosūtīts, izmantojot loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Mēs varam arī izpētīt mongo (izmantojot Robo3T vai Studio3T) un redzēt, ka vērtspapīri atrodas datu bāzē:

Es neesmu miljardieris, un tāpēc mēs esam apmierināti ar pirmo skatÄ«Å”anās iespēju.

Fona uzdevumi par Faustu, II daļa: Aģenti un komandasFona uzdevumi par Faustu, II daļa: Aģenti un komandas

Laime un prieks - pirmais aģents gatavs :)

Aģents gatavs, lai dzīvo jaunais aģents!

Jā, kungi, esam veikuÅ”i tikai 1/3 no Ŕī raksta sagatavotā ceļa, bet nekautrējieties, jo tagad bÅ«s vieglāk.

Tātad tagad mums ir nepiecieŔams aģents, kas apkopo meta informāciju un ievieto to savākŔanas dokumentā:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Tā kā Å”is aÄ£ents apstrādās informāciju par konkrētu vērtspapÄ«ru, mums ziņojumā ir jānorāda Ŕī vērtspapÄ«ra svārsts (simbols). Å im nolÅ«kam faustā ir Ieraksti ā€” klases, kas deklarē ziņojuma shēmu aÄ£enta tēmā.

Å ajā gadÄ«jumā pāriesim pie records.py un aprakstiet, kā vajadzētu izskatÄ«ties vēstÄ«jumam par Å”o tēmu:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kā jau varēja nojaust, fausts izmanto python tipa anotāciju, lai aprakstītu ziņojuma shēmu, tāpēc bibliotēkas atbalstītā minimālā versija ir 3.6.

Atgriezīsimies pie aģenta, iestatīsim veidus un pievienosim to:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kā redzat, tēmas inicializācijas metodei mēs nododam jaunu parametru ar shēmu - value_type. Turklāt viss notiek pēc tās paÅ”as shēmas, tāpēc es neredzu jēgu kavēties pie kaut kā cita.

Pēdējais pieskāriens ir pievienot metainformācijas vākÅ”anas aÄ£entam izsaukumu collection_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Ziņojumam izmantojam iepriekÅ” izziņoto shēmu. Å ajā gadÄ«jumā es izmantoju .cast metodi, jo mums nav jāgaida aÄ£enta rezultāts, taču ir vērts pieminēt, ka veidus nosÅ«tÄ«t ziņu tēmai:

  1. cast - nebloķē, jo negaida rezultātu. Jūs nevarat nosūtīt rezultātu uz citu tēmu kā ziņojumu.

  2. sūtīt - nebloķē, jo negaida rezultātu. Tēmā varat norādīt aģentu, uz kuru tiks nosūtīts rezultāts.

  3. jautāt - gaida rezultātu. Tēmā varat norādīt aģentu, uz kuru tiks nosūtīts rezultāts.

Tātad ar aģentiem Ŕodien viss!

Sapņu komanda

Pēdējais, ko solÄ«jos uzrakstÄ«t Å”ajā daļā, ir komandas. Kā minēts iepriekÅ”, komandas programmā Faust ir iesaiņojums ap klikŔķi. Faktiski Faust vienkārÅ”i pievieno mÅ«su pielāgoto komandu savai saskarnei, norādot taustiņu -A

Pēc izsludināto aÄ£entu ienākÅ”anas agents.py pievienot funkciju ar dekoratoru app.commandsaucot metodi mest у vākt_vērtspapÄ«rus:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Tādējādi, ja mēs izsauksim komandu sarakstu, mūsu jaunā komanda būs tajā:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Mēs to varam izmantot tāpat kā jebkurÅ” cits, tāpēc restartēsim faust worker un sāksim pilnvērtÄ«gu vērtspapÄ«ru kolekciju:

> faust -A horton.agents start-collect-securities

Kas notiks tālāk?

Nākamajā daļā, izmantojot atlikuÅ”os aÄ£entus kā piemēru, aplÅ«kosim grimÅ”anas mehānismu galējÄ«bu meklÄ“Å”anai gada tirdzniecÄ«bas slēgÅ”anas cenās un aÄ£entu kronu palaiÅ”anu.

Tas Ŕodienai viss! Paldies, ka izlasīji :)

Šīs daļas kods

Fona uzdevumi par Faustu, II daļa: Aģenti un komandas

PS Zem pēdējās daļas man jautāja par faust and confluent kafka (kādas Ä«paŔības ir konfluentam?). Å Ä·iet, ka konfluents daudzējādā ziņā ir funkcionālāks, taču fakts ir tāds, ka Faustam nav pilnÄ«gs klientu atbalsts konfluentam - tas izriet no klientu ierobežojumu apraksti dok.

Avots: www.habr.com

Pievieno komentāru