Faust-en aurrekariak, II. zatia: eragileak eta taldeak

Faust-en aurrekariak, II. zatia: eragileak eta taldeak

Edukien taula

  1. I. zatia: Sarrera

  2. II. zatia: Eragileak eta taldeak

Zertan ari gara hemen?

Beraz, beraz, bigarren zatia. Lehen idatzi bezala, bertan honako hau egingo dugu:

  1. Idatzi dezagun bezero txiki bat aiohttp-n alphaavantagerako, behar ditugun amaierako puntuen eskaerarekin.

  2. Sortu dezagun baloreei buruzko datuak eta haiei buruzko meta informazioa bilduko dituen agente bat.

Baina, hau da proiektuarentzat berarentzat egingo duguna, eta faust ikerketari dagokionez, kafkatik korronte-gertaerak prozesatzen dituzten agenteak nola idazten ikasiko dugu, baita komandoak nola idatzi (click wrapper), gure kasuan - agenteak kontrolatzen duen gaiari eskuzko push mezuetarako.

Prestakuntza

AlphaVantage bezeroa

Lehenik eta behin, idatz dezagun aiohttp bezero txiki bat alphaavantage eskaeretarako.

alphaavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Egia esan, dena argi dago hortik:

  1. AlphaVantage APIa nahiko sinplea eta ederki diseinatuta dago, beraz, eskaera guztiak metodoaren bidez egitea erabaki nuen construct_query non aldi berean http dei bat dagoen.

  2. Soro guztiak ekartzen ditut snake_case erosotasunerako.

  3. Beno, logger.catch dekorazioa trazeback irteera eder eta informagarrirako.

P.S. Ez ahaztu alphaavantage tokena lokalean gehitzea config.yml-era, edo ingurune-aldagaia esportatzea HORTON_SERVICE_APIKEY. Token bat jasotzen dugu Hemen.

CRUD klasea

Baloreen bilduma bat izango dugu baloreei buruzko meta informazioa gordetzeko.

datu-basea/segurtasuna.py

Nire ustez, hemen ez dago ezer azaldu beharrik, eta oinarrizko klasea bera nahiko sinplea da.

get_app()

Gehitu dezagun aplikazio-objektu bat sortzeko funtzio bat aplikazioa.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Oraingoz aplikazio sorrera errazena izango dugu, pixka bat geroago zabalduko dugu, hala ere, itxaroten ez jarraitzeko, hemen erreferentziak App-classera. Ezarpenen klaseari begirada bat ematea ere gomendatzen dizut, ezarpen gehienen arduraduna baita.

РћСЃРЅРѕРІРЅР ° С ° С ° Р ° СЃС‚С °

Baloreen zerrenda biltzeko eta mantentzeko agentea

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Beraz, lehenik faust aplikazio objektua lortuko dugu - nahiko erraza da. Jarraian, gure agenteari gai bat esplizituki deklaratzen diogu... Hemen aipatzekoa da zer den, zein den barne-parametroa eta nola antola daitekeen hau desberdin.

  1. Kafkan gaiak, definizio zehatza jakin nahi badugu, hobe irakurtzea itzali. dokumentua, edo irakurri dezakezu konpendioa Habré-n errusieraz, non dena ere nahiko zehatz islatzen den :)

  2. Barneko parametroa, nahiko ondo deskribatuta faust dok, gaia zuzenean konfiguratzeko aukera ematen digu kodean, noski, horrek esan nahi du faust garatzaileek emandako parametroak, adibidez: atxikipena, atxikipen politika (lehenespenez ezabatu, baina ezarri dezakezu. trinkoa), gai bakoitzeko partizio kopurua (puntuazioakegiteko, adibidez, baino gutxiago esangura globala aplikazioak faust).

  3. Oro har, agenteak kudeatutako gai bat sor dezake balio globalekin, hala ere, dena esplizituki deklaratzea gustatzen zait. Gainera, agentearen iragarkiko gaiaren parametro batzuk (adibidez, partizio kopurua edo atxikipen-politika) ezin dira konfiguratu.

    Hona hemen gaia eskuz definitu gabe nolakoa izan daitekeen:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Beno, orain deskriba dezagun zer egingo duen gure agenteak :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Beraz, agentearen hasieran, aiohttp saio bat irekitzen dugu gure bezeroaren bidez eskaerak egiteko. Horrela, langile bat abiaraztean, gure agentea abiarazten denean, saio bat berehala irekiko da - bat, langilea martxan dagoen denbora osoan (edo hainbat, parametroa aldatzen baduzu konkurrentzia unitate lehenetsia duen agente batetik).

Jarraian, korrontea jarraitzen dugu (mezua jartzen dugu _, guri, agente honetan, ez zaigulako axola gure gaiko mezuen edukia) uneko desplazamenduan existitzen badira, bestela gure zikloa noiz iritsiko zain egongo da. Beno, gure begizta barruan, mezuaren jasotzea erregistratzen dugu, aktiboen zerrenda bat lortuko dugu (get_securities bakarrik aktibo itzultzen lehenespenez, ikusi bezeroaren kodea) eta datu-basean gordetzen dugu, ticker berdina duen segurtasunik dagoen egiaztatuz eta trukea datu-basean, baldin badago, orduan (papera) besterik gabe eguneratuko da.

Abiarazi dezagun gure sorkuntza!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

P.S. Aukerak web osagaia Artikuluetan ez dut faust kontuan hartuko, beraz, dagokion bandera ezarri dugu.

Gure abiarazteko komandoan, Faust-i esan genion non bilatu behar den aplikazioaren objektua eta zer egin harekin (langile bat abiarazi) info log irteera mailarekin. Irteera hau lortzen dugu:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Bizirik dago!!!

Ikus dezagun partizio multzoa. Ikus dezakegunez, gai bat sortu zen kodean izendatu genuen izenarekin, partizio-kopuru lehenetsiarekin (8, gaia_partizioak - aplikazioaren objektuaren parametroa), ez baitugu balio indibidual bat zehaztu gure gairako (partizioen bidez). Langilean abiarazitako agenteari 8 partizio guztiak esleitzen zaizkio, bakarra baita, baina hori zehatzago eztabaidatuko da clustering-aren atalean.

Beno, orain beste terminal leiho batera joan gaitezke eta mezu huts bat bidali gure gaiari:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. erabiliz @ mezu bat bidaltzen ari garela erakusten dugu “bildu_segurtasunak” izeneko gai bati.

Kasu honetan, mezua 6. partiziora joan zen; hau egiaztatu dezakezu kafdrop on-era joanez localhost:9000

Gure langilearekin terminaleko leihora joanez, loguru erabiliz bidalitako mezu zoriontsu bat ikusiko dugu:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Mongo-n ere aztertu dezakegu (Robo3T edo Studio3T erabiliz) eta baloreak datu-basean daudela ikus dezakegu:

Ez naiz milioidun bat, eta horregatik konformatzen gara lehen ikusteko aukerarekin.

Faust-en aurrekariak, II. zatia: eragileak eta taldeakFaust-en aurrekariak, II. zatia: eragileak eta taldeak

Zoriona eta poza - lehen agentea prest dago :)

Agente prest, bizi agente berriak!

Bai, jaunak, artikulu honek prestatutako bidearen 1/3 besterik ez dugu egin, baina ez zaitez desanimatu, orain errazagoa izango delako.

Beraz, orain meta informazioa bildu eta bilduma dokumentu batean jartzen duen agente bat behar dugu:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Agente honek segurtasun zehatz bati buruzko informazioa prozesatuko duenez, segurtasun horren ticker (ikurra) adierazi behar dugu mezuan. Horretarako faust-en daude Records — Agentearen gaian mezu-eskema deklaratzen duten klaseak.

Kasu honetan, goazen erregistroak.py eta deskribatu nolakoa izan behar duen gai honen mezuak:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Asmatuko zenuten bezala, faust-ek python motako oharpena erabiltzen du mezu-eskema deskribatzeko, horregatik liburutegiak onartzen duen gutxieneko bertsioa da. 3.6.

Itzul gaitezen agentera, ezarri motak eta gehitu:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ikus dezakezunez, eskema batekin parametro berri bat pasatzen diogu gaiaren hasierako metodoari - value_type. Gainera, denak eskema bera jarraitzen du, beraz, ez diot ezertarako balio beste ezertan gelditzeari.

Beno, azken ukitua meta informazioa biltzeko agenteari dei bat gehitzea da collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Aurretik iragarritako eskema erabiltzen dugu mezurako. Kasu honetan, .cast metodoa erabili dut, ez baitugu agentearen emaitzaren zain egon behar, baina aipatzekoa da. moduak bidali mezu bat gaiari:

  1. cast - ez du blokeatzen emaitzarik espero ez duelako. Ezin duzu emaitza beste gai batera bidali mezu gisa.

  2. bidali - ez du blokeatzen emaitzarik espero ez duelako. Emaitza nora joango den gaian eragile bat zehaztu dezakezu.

  3. galdetu - emaitza baten zain dago. Emaitza nora joango den gaian eragile bat zehaztu dezakezu.

Beraz, hori dena gaurko eragileekin!

Dream Team

Zati honetan idazteko agindu dudan azken gauza komandoak dira. Lehen aipatu bezala, faust-eko komandoak klikaren inguruko bilgarri bat dira. Izan ere, faust-ek gure komando pertsonalizatua bere interfazeari eransten dio -A gakoa zehaztean

Iragarritako agenteen ostean agenteak.py gehitu funtzio bat dekoratzaile batekin aplikazioa.agindumetodoa deituz bota у bildu_tituluak:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Horrela, komandoen zerrendari deitzen badiogu, gure komando berria bertan egongo da:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Beste edonork bezala erabil dezakegu, beraz, berrabiarazi dezagun faust langilea eta has gaitezen balore bilduma osoa:

> faust -A horton.agents start-collect-securities

Zer gertatuko da gero?

Hurrengo zatian, gainerako agenteak adibide gisa hartuta, urteko merkataritzaren itxiera-prezioetan muturrak bilatzeko hondoratze-mekanismoa eta agenteen kron-abiaraztearen inguruan hartuko dugu kontuan.

Hori da dena gaurko! Eskerrik asko irakurtzeagatik :)

Zati honen kodea

Faust-en aurrekariak, II. zatia: eragileak eta taldeak

P.S. Azken zatian faust eta kafka konfluenteaz galdetu zidaten (zer ezaugarri ditu konfluenteak?). Badirudi confluent funtzionalagoa dela modu askotan, baina kontua da Faustek ez duela bezeroen laguntza osoa confluent-erako - hau da. bezeroen murrizketen deskribapenak dok.

Iturria: www.habr.com

Gehitu iruzkin berria