Detyrat e sfondit mbi Faustin, Pjesa II: Agjentët dhe ekipet

Detyrat e sfondit mbi Faustin, Pjesa II: Agjentët dhe ekipet

PĂ«rmbajtje

  1. Pjesa I: Hyrje

  2. Pjesa II: Agjentët dhe Ekipet

ÇfarĂ« po bĂ«jmĂ« kĂ«tu?

Pra, pra, pjesa e dytë. Siç është shkruar më herët, në të do të bëjmë sa vijon:

  1. Le të shkruajmë një klient të vogël për alphavantage në aiohttp me kërkesa për pikat fundore që na duhen.

  2. Le të krijojmë një agjent që do të mbledhë të dhëna për letrat me vlerë dhe meta informacion mbi to.

Por, kjo është ajo që ne do të bëjmë për vetë projektin, dhe për sa i përket kërkimit të faustit, do të mësojmë se si të shkruajmë agjentë që përpunojnë ngjarjet e transmetimit nga kafka, si dhe si të shkruajmë komanda (klikoni mbështjellës), në rastin tonë - për mesazhe shtytëse manuale për temën që po monitoron agjenti.

Trajnimi

Klienti AlphaVantage

Së pari, le të shkruajmë një klient të vogël aiohttp për kërkesat për alphavantage.

alfavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Në fakt, gjithçka është e qartë prej saj:

  1. API AlphaVantage është projektuar mjaft thjesht dhe bukur, kështu që vendosa t'i bëj të gjitha kërkesat përmes metodës construct_query ku nga ana tjetër ka një thirrje http.

  2. I sjell të gjitha fushat në snake_case për lehtësi.

  3. Epo, dekorimi i logger.catch për rezultate të bukura dhe informative të gjurmës.

PS Mos harroni të shtoni token alphavantage në nivel lokal në config.yml, ose të eksportoni variablin e mjedisit HORTON_SERVICE_APIKEY. Ne marrim një shenjë këtu.

Klasa CRUD

Ne do të kemi një koleksion letrash me vlerë për të ruajtur meta informacionet rreth letrave me vlerë.

database/security.py

Sipas mendimit tim, nuk ka nevojë të shpjegohet asgjë këtu, dhe vetë klasa bazë është mjaft e thjeshtë.

get_app ()

Le të shtojmë një funksion për krijimin e një objekti aplikacioni në app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Tani për tani do të kemi krijimin më të thjeshtë aplikacioni, pak më vonë do ta zgjerojmë, megjithatë, për të mos ju mbajtur në pritje, këtu referencat në klasën e aplikacionit. Unë gjithashtu ju këshilloj të hidhni një sy në klasën e cilësimeve, pasi ajo është përgjegjëse për shumicën e cilësimeve.

Pjesa kryesore

Agjent për mbledhjen dhe mbajtjen e listës së letrave me vlerë

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Pra, së pari marrim objektin e aplikimit faust - është mjaft e thjeshtë. Më pas, ne deklarojmë në mënyrë eksplicite një temë për agjentin tonë... Këtu vlen të përmendet se çfarë është, cili është parametri i brendshëm dhe si mund të rregullohet ndryshe.

  1. Temat në kafka, nëse duam të dimë përkufizimin e saktë, është më mirë të lexojmë fikur. dokument, ose mund të lexoni përmbledhje në Habré në Rusisht, ku gjithçka pasqyrohet gjithashtu mjaft saktë :)

  2. Parametri i brendshëm, i përshkruar mjaft mirë në dokumentin faust, na lejon të konfigurojmë temën drejtpërdrejt në kod, natyrisht, kjo do të thotë parametrat e ofruar nga zhvilluesit e faustit, për shembull: mbajtje, politika e mbajtjes (si parazgjedhje fshini, por mund të vendosni kompakt), numri i ndarjeve për temë (ndarësepër të bërë, për shembull, më pak se rëndësi globale aplikacionet faust).

  3. Në përgjithësi, agjenti mund të krijojë një temë të menaxhuar me vlera globale, megjithatë, më pëlqen të deklaroj gjithçka në mënyrë eksplicite. Përveç kësaj, disa parametra (për shembull, numri i ndarjeve ose politika e ruajtjes) të temës në reklamën e agjentit nuk mund të konfigurohen.

    Ja se si mund të duket pa e përcaktuar manualisht temën:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Epo, tani le të përshkruajmë se çfarë do të bëjë agjenti ynë :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Pra, në fillim të agjentit, ne hapim një sesion aiohttp për kërkesat përmes klientit tonë. Kështu, kur filloni një punëtor, kur agjenti ynë hapet, menjëherë do të hapet një seancë - një, për të gjithë kohën që punëtori po funksionon (ose disa, nëse ndryshoni parametrin konkurencë nga një agjent me një njësi të paracaktuar).

Më pas, ndjekim rrjedhën (e vendosim mesazhin në _, meqenëse ne, në këtë agjent, nuk na intereson përmbajtja) e mesazheve nga tema jonë, nëse ato ekzistojnë në kompensimin aktual, përndryshe cikli ynë do të presë mbërritjen e tyre. Epo, brenda qarkut tonë, ne regjistrojmë marrjen e mesazhit, marrim një listë të letrave me vlerë aktive (get_securities kthehen vetëm aktive si parazgjedhje, shiko kodin e klientit) dhe e ruajmë atë në bazën e të dhënave, duke kontrolluar nëse ka një vlerë me të njëjtin tregues dhe shkëmbim në bazën e të dhënave, nëse ka, atëherë ai (letra) thjesht do të përditësohet.

Le të nisim krijimin tonë!

> docker-compose up -d
... ЗапусĐș ĐșĐŸĐœŃ‚Đ”ĐčĐœĐ”Ń€ĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info

Karakteristikat e PS komponent web Nuk do ta konsideroj Faustin në artikuj, ndaj vendosëm flamurin e duhur.

Në komandën tonë të nisjes, ne i thamë Faustit se ku të kërkonte objektin e aplikacionit dhe çfarë të bënte me të (nisni një punëtor) me nivelin e daljes së regjistrit të informacionit. Ne marrim daljen e mëposhtme:

Spoiler

ڟa”S† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┮───────────────────────────────────────────────────┘
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┌─────────────
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┮────────────┘ 

Eshte gjalle!!!

Le të shohim grupin e ndarjes. Siç mund ta shohim, u krijua një temë me emrin që caktuam në kod, numrin e paracaktuar të ndarjeve (8, marrë nga tema_ndarjet - parametri i objektit të aplikacionit), pasi nuk specifikuam një vlerë individuale për temën tonë (nëpërmjet ndarjeve). Agjentit të lëshuar në punëtor i caktohen të 8 ndarjet, pasi është e vetmja, por kjo do të diskutohet më në detaje në pjesën rreth grupimit.

Epo, tani mund të shkojmë në një dritare tjetër terminali dhe të dërgojmë një mesazh bosh në temën tonë:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS duke pĂ«rdorur @ ne tregojmĂ« se po i dĂ«rgojmĂ« njĂ« mesazh njĂ« teme tĂ« quajtur “mbledh_vlerat”.

Në këtë rast, mesazhi shkoi në ndarjen 6 - mund ta kontrolloni këtë duke shkuar te kafdrop on localhost:9000

Duke shkuar në dritaren e terminalit me punonjësin tonë, do të shohim një mesazh të lumtur të dërguar duke përdorur loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Ne gjithashtu mund të shikojmë në mongo (duke përdorur Robo3T ose Studio3T) dhe të shohim që letrat me vlerë janë në bazën e të dhënave:

Unë nuk jam miliarder dhe për këtë arsye jemi të kënaqur me opsionin e parë të shikimit.

Detyrat e sfondit mbi Faustin, Pjesa II: Agjentët dhe ekipetDetyrat e sfondit mbi Faustin, Pjesa II: Agjentët dhe ekipet

Lumturia dhe gëzimi - agjenti i parë është gati :)

Agjenti gati, rroftë agjenti i ri!

Po, zotërinj, ne kemi kaluar vetëm 1/3 e rrugës së përgatitur nga ky artikull, por mos u dekurajoni, sepse tani do të jetë më e lehtë.

Pra, tani ne kemi nevojë për një agjent që mbledh meta informacion dhe e vendos atë në një dokument grumbullimi:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

MeqenĂ«se ky agjent do tĂ« pĂ«rpunojĂ« informacione pĂ«r njĂ« siguri specifike, ne duhet tĂ« tregojmĂ« shenjĂ«n (simbolin) e kĂ«saj sigurie nĂ« mesazh. PĂ«r kĂ«tĂ« qĂ«llim nĂ« Faust ekzistojnĂ« tĂ« dhĂ«na — klasa qĂ« deklarojnĂ« skemĂ«n e mesazheve nĂ« temĂ«n e agjentit.

Në këtë rast, le të shkojmë në regjistron.py dhe përshkruani se si duhet të duket mesazhi për këtë temë:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Siç mund ta keni marrë me mend, Faust përdor shënimin e tipit python për të përshkruar skemën e mesazhit, prandaj versioni minimal i mbështetur nga biblioteka është 3.6.

Le të kthehemi te agjenti, të vendosim llojet dhe ta shtojmë atë:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Siç mund ta shihni, ne kalojmë një parametër të ri me një skemë në metodën e inicializimit të temës - value_type. Më tej, gjithçka ndjek të njëjtën skemë, kështu që nuk shoh asnjë kuptim të ndalem në ndonjë gjë tjetër.

Epo, prekja e fundit është të shtoni një telefonatë në agjentin e mbledhjes së informacionit meta për collect_securites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Ne përdorim skemën e shpallur më parë për mesazhin. Në këtë rast kam përdorur metodën .cast pasi nuk kemi nevojë të presim rezultatin nga agjenti, por vlen të theksohet se mënyra dërgoni një mesazh në temë:

  1. cast - nuk bllokon sepse nuk pret rezultat. Ju nuk mund ta dërgoni rezultatin në një temë tjetër si mesazh.

  2. dërgo - nuk bllokon sepse nuk pret rezultat. Ju mund të specifikoni një agjent në temën në të cilën do të shkojë rezultati.

  3. pyet - pret për një rezultat. Ju mund të specifikoni një agjent në temën në të cilën do të shkojë rezultati.

Pra, kjo është e gjitha me agjentët për sot!

Ekipi i Ă«ndrrave

Gjëja e fundit që premtova të shkruaj në këtë pjesë janë komandat. Siç u përmend më herët, komandat në faust janë një mbështjellës rreth klikimit. Në fakt, Faust thjesht bashkon komandën tonë të personalizuar në ndërfaqen e tij kur specifikon tastin -A

Pasi agjentët e shpallur në agjentët.py shtoni një funksion me një dekorues app.komandëduke thirrur metodën hedh у mbledh_titujt:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Kështu, nëse thërrasim listën e komandave, komanda jonë e re do të jetë në të:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ne mund ta përdorim atë si kushdo tjetër, kështu që le të rifillojmë faust worker dhe të fillojmë një koleksion të plotë të letrave me vlerë:

> faust -A horton.agents start-collect-securities

ÇfarĂ« do tĂ« ndodhĂ« mĂ« pas?

Në pjesën tjetër, duke përdorur agjentët e mbetur si shembull, ne do të shqyrtojmë mekanizmin e lavamanit për kërkimin e ekstremeve në çmimet e mbylljes së tregtimit për vitin dhe nisjen e agjentëve.

Kaq për sot! Faleminderit per leximin :)

Kodi për këtë pjesë

Detyrat e sfondit mbi Faustin, Pjesa II: Agjentët dhe ekipet

PS Në pjesën e fundit më pyetën për kafkën faust dhe konfluent (çfarë veçorish ka konfluenti?). Duket se konfluenti është më funksional në shumë mënyra, por fakti është se Faust nuk ka mbështetje të plotë të klientit për konfluent - kjo rrjedh nga përshkrimet e kufizimeve të klientit në doc.

Burimi: www.habr.com

Shto një koment