Fonaj taskoj pri Faust, Part II: Agentoj kaj Teamoj

Fonaj taskoj pri Faust, Part II: Agentoj kaj Teamoj

Enhavtabelo

  1. Parto I: Enkonduko

  2. Parto II: Agentoj kaj Teamoj

Kion ni faras ĉi tie?

Do, do, la dua parto. Kiel skribite antaŭe, en ĝi ni faros la jenon:

  1. Ni skribu malgrandan klienton por alphaavantage sur aiohttp kun petoj por la finpunktoj, kiujn ni bezonas.

  2. Ni kreu agenton, kiu kolektos datumojn pri valorpaperoj kaj metainformojn pri ili.

Sed, ĉi tion ni faros por la projekto mem, kaj laŭ faust-esplorado, ni lernos kiel skribi agentojn, kiuj prilaboras fluajn eventojn de kafka, kaj ankaŭ kiel skribi komandojn (klaku envolvaĵon), en nia kazo - por manaj puŝomesaĝoj al la temo kiun la agento kontrolas.

Trejnado

AlphaVantage Kliento

Unue, ni skribu malgrandan aiohttp klienton por petoj al alphaavantage.

alphaavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Fakte, ĉio estas klara el ĝi:

  1. La AlphaVantage API estas sufiĉe simple kaj bele desegnita, do mi decidis fari ĉiujn petojn per la metodo construct_query kie siavice estas http-voko.

  2. Mi alportas ĉiujn kampojn al snake_case por komforto.

  3. Nu, la logger.catch dekoracio por bela kaj informa spuro eligo.

PS Ne forgesu aldoni la alphaavantage-ĵetonon loke al config.yml, aŭ eksporti la mediovariablon HORTON_SERVICE_APIKEY. Ni ricevas ĵetonon tie.

KRUDA klaso

Ni havos valorpaperojn por stoki metainformojn pri valorpaperoj.

database/security.py

Miaopinie, ĉi tie ne necesas klarigi ion, kaj la baza klaso mem estas sufiĉe simpla.

get_app()

Ni aldonu funkcion por krei aplikaĵobjekton en app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Nuntempe ni havos la plej simplan kreadon de aplikaĵo, iom poste ni vastigos ĝin, tamen, por ne atendigi vin, ĉi tie referencoj al App-klaso. Mi ankaŭ konsilas al vi rigardi la klason de agordoj, ĉar ĝi respondecas pri la plej multaj el la agordoj.

Ĉefa parto

Agento por kolekti kaj konservi liston de valorpaperoj

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Do, unue ni ricevas la faust-aplikaĵon - ĝi estas sufiĉe simpla. Poste, ni eksplicite deklaras temon por nia agento... Ĉi tie indas mencii kio ĝi estas, kio estas la interna parametro kaj kiel ĉi tio povas esti aranĝita alimaniere.

  1. Temoj en kafka, se ni volas scii la ĝustan difinon, estas pli bone legi for. dokumento, aŭ vi povas legi kompendio sur Habré en la rusa, kie ĉio estas ankaŭ sufiĉe precize reflektita :)

  2. Interna parametro, sufiĉe bone priskribita en la faust-dokumento, permesas al ni agordi la temon rekte en la kodo, kompreneble, tio signifas la parametrojn provizitajn de la faust-programistoj, ekzemple: reteno, retenpolitiko (defaŭlte forigi, sed vi povas agordi kompakta), nombro da sekcioj per temo (disdonojfari, ekzemple, malpli ol tutmonda signifo aplikoj faust).

  3. Ĝenerale, la agento povas krei administritan temon kun tutmondaj valoroj, tamen mi ŝatas deklari ĉion eksplicite. Krome, kelkaj parametroj (ekzemple, la nombro da sekcioj aŭ retenpolitiko) de la temo en la agentanonco ne povas esti agordita.

    Jen kiel ĝi povus aspekti sen permane difini la temon:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nu, nun ni priskribu, kion faros nia agento :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Do, komence de la agento, ni malfermas aiohttp-sesion por petoj per nia kliento. Tiel, kiam oni komencas laboriston, kiam nia agento estas lanĉita, sesio tuj estos malfermita - unu, dum la tuta tempo, kiam la laboristo funkcias (aŭ pluraj, se vi ŝanĝas la parametron). samtempeco de agento kun defaŭlta unuo).

Poste, ni sekvas la fluon (ni metas la mesaĝon enen _, ĉar ni, en ĉi tiu agento, ne zorgas pri la enhavo) de mesaĝoj de nia temo, se ili ekzistas ĉe la nuna ofseto, alie nia ciklo atendos ilian alvenon. Nu, ene de nia buklo, ni registras la ricevon de la mesaĝo, ricevas liston de aktivaj (get_securities revenas nur aktivaj defaŭlte, vidu klientkodon) kaj konservas ĝin en la datumbazo, kontrolante ĉu ekzistas sekureco kun la sama ticker kaj interŝanĝo en la datumbazo , se ekzistas, tiam ĝi (la papero) simple estos ĝisdatigita.

Ni lanĉu nian kreaĵon!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Trajtoj TTT-komponento Mi ne konsideros faust en la artikoloj, do ni starigas la taŭgan flagon.

En nia lanĉa komando, ni diris al faust, kie serĉi la aplikaĵobjekton kaj kion fari kun ĝi (komencu laboriston) kun la nivelo de eligo de informoj. Ni ricevas la sekvan eligon:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ĝi estas viva!!!

Ni rigardu la dispartigon. Kiel ni povas vidi, temo estis kreita kun la nomo, kiun ni indikis en la kodo, la defaŭlta nombro da sekcioj (8, prenita de temo_dispartigoj - aplika objekta parametro), ĉar ni ne specifis individuan valoron por nia temo (per sekcioj). La lanĉita agento en la laboristo ricevas ĉiujn 8 sekciojn, ĉar ĝi estas la sola, sed ĉi tio estos diskutita pli detale en la parto pri clustering.

Nu, nun ni povas iri al alia fina fenestro kaj sendi malplenan mesaĝon al nia temo:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS uzante @ ni montras, ke ni sendas mesaĝon al temo nomita "kolekto_sekuraĵoj".

En ĉi tiu kazo, la mesaĝo iris al sekcio 6 - vi povas kontroli tion irante al kafdrop on localhost:9000

Irante al la fina fenestro kun nia laboristo, ni vidos feliĉan mesaĝon senditan uzante loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Ni ankaŭ povas rigardi en mongo (uzante Robo3T aŭ Studio3T) kaj vidi, ke la valorpaperoj estas en la datumbazo:

Mi ne estas miliardulo, kaj tial ni kontentiĝas pri la unua spektada opcio.

Fonaj taskoj pri Faust, Part II: Agentoj kaj TeamojFonaj taskoj pri Faust, Part II: Agentoj kaj Teamoj

Feliĉo kaj ĝojo - la unua agento estas preta :)

Agento preta, vivu la nova agento!

Jes, sinjoroj, ni kovris nur 1/3 de la vojo preparita de ĉi tiu artikolo, sed ne malkuraĝiĝu, ĉar nun estos pli facile.

Do nun ni bezonas agenton, kiu kolektas metainformojn kaj metas ĝin en kolektodokumenton:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ĉar ĉi tiu agento prilaboros informojn pri specifa sekureco, ni devas indiki la ticker (simbolo) de ĉi tiu sekureco en la mesaĝo. Tiucele en faust ekzistas rekordoj — klasoj kiuj deklaras la mesaĝskemon en la agenta temo.

En ĉi tiu kazo, ni iru al records.py kaj priskribu kiel devus aspekti la mesaĝo por ĉi tiu temo:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kiel vi eble divenis, faust uzas la python-tipan komentadon por priskribi la mesaĝskemon, tial la minimuma versio subtenata de la biblioteko estas 3.6.

Ni revenu al la agento, agordu la tipojn kaj aldonu ĝin:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kiel vi povas vidi, ni transdonas novan parametron kun skemo al la temo-komenciga metodo - value_type. Plue, ĉio sekvas la saman skemon, do mi vidas nenian signifon enloĝi pri io alia.

Nu, la fina tuŝo estas aldoni alvokon al la metainformkolekta agento por collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Ni uzas la antaŭe anoncitan skemon por la mesaĝo. En ĉi tiu kazo, mi uzis la metodon .cast ĉar ni ne bezonas atendi la rezulton de la agento, sed menciindas tion. manieroj sendu mesaĝon al la temo:

  1. cast - ne blokas ĉar ĝi ne atendas rezulton. Vi ne povas sendi la rezulton al alia temo kiel mesaĝo.

  2. sendi - ne blokas ĉar ĝi ne atendas rezulton. Vi povas specifi agenton en la temo al kiu la rezulto iros.

  3. demandi — atendas rezulton. Vi povas specifi agenton en la temo al kiu la rezulto iros.

Do, tio estas ĉio kun agentoj por hodiaŭ!

La sonĝteamo

La lasta afero, kiun mi promesis skribi en ĉi tiu parto, estas komandoj. Kiel menciite pli frue, komandoj en faust estas envolvaĵo ĉirkaŭ klako. Fakte, faust simple ligas nian kutiman komandon al sia interfaco kiam oni specifas la -A-klavon

Post la anoncitaj agentoj en agentoj.py aldonu funkcion kun dekoraciisto app.komandonomante la metodon ero у kolekti_paperojn:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Tiel, se ni vokas la liston de komandoj, nia nova komando estos en ĝi:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ni povas uzi ĝin kiel iu ajn alia, do ni rekomencu la faust-laboriston kaj komencu plenan kolekton de valorpaperoj:

> faust -A horton.agents start-collect-securities

Kio okazos poste?

En la sekva parto, uzante la ceterajn agentojn kiel ekzemplon, ni konsideros la sinkan mekanismon por serĉi ekstremojn en la fermaj prezoj de komerco por la jaro kaj la cron-lanĉo de agentoj.

Tio estas ĉio por hodiaŭ! Dankon pro legado :)

Kodo por ĉi tiu parto

Fonaj taskoj pri Faust, Part II: Agentoj kaj Teamoj

PS Sub la lasta parto oni demandis min pri faust kaj kunflua kafka (kiajn trajtojn havas kunfluanto?). Ŝajnas, ke confluent estas pli funkcia en multaj manieroj, sed la fakto estas, ke faust ne havas plenan klientsubtenon por confluent - tio sekvas el priskriboj de klientlimigoj en la dokumento.

fonto: www.habr.com

Aldoni komenton