Asa fototra ao amin'ny Faust, Fizarana II: Agents and Teams

Asa fototra ao amin'ny Faust, Fizarana II: Agents and Teams

Lohahevitra misy

  1. Fizarana I: Fampidirana

  2. Fizarana II: Mpikambana sy Ekipa

Inona no ataontsika eto?

Noho izany, ny ampahany faharoa. Araka ny efa voasoratra teo aloha dia hanao izao manaraka izao izahay:

  1. Andao hanoratra mpanjifa kely ho an'ny alphavantage amin'ny aiohttp miaraka amin'ny fangatahana ireo teboka farany ilaintsika.

  2. Andao hamorona mpandraharaha iray izay hanangona angona momba ny fiarovana sy ny fampahalalana meta momba azy ireo.

Saingy, izao no hataontsika ho an'ny tetikasa mihitsy, ary amin'ny resaka fikarohana haingana, dia hianatra ny fomba hanoratana mpiasa izay mandrindra ny hetsika mivantana avy amin'ny kafka, ary koa ny fomba fanoratana baiko (click wrapper), amin'ity tranga ity - ho an'ny hafatra fanosehana amin'ny tanana amin'ny lohahevitra izay arahin'ny masoivoho.

Fiomanana

AlphaVantage Client

Voalohany, andao hanoratra mpanjifa aiohttp kely ho an'ny fangatahana alphavantage.

alphavantage.py

mpandringana

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Raha ny marina, mazava ny zava-drehetra avy amin'izany:

  1. Ny AlphaVantage API dia natao tsotra sy tsara tarehy, noho izany dia nanapa-kevitra ny hanao ny fangatahana rehetra amin'ny alàlan'ny fomba aho construct_query aiza indray no misy antso http.

  2. Ento ny saha rehetra ho snake_case ho an'ny fanamorana.

  3. Eny, ny haingo logger.catch ho an'ny vokatra traceback tsara tarehy sy ahalalana.

PS Aza adino ny manampy ny marika alphavantage eo an-toerana amin'ny config.yml, na manondrana ny fari-piainan'ny tontolo iainana HORTON_SERVICE_APIKEY. Mahazo marika izahay eto.

kilasy CRUD

Hanana fanangonam-bola hitehirizana fampahalalana meta momba ny antoka isika.

database/security.py

Raha ny hevitro dia tsy ilaina ny manazava na inona na inona eto, ary ny kilasy fototra dia tena tsotra.

get_app()

Andeha isika hanampy asa hamoronana zavatra fampiharana ao app.py

mpandringana

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Amin'izao fotoana izao dia hanana ny famoronana fampiharana tsotra indrindra isika, aoriana kely dia hanitatra izany izahay, na izany aza, mba tsy hiandry anao, eto References mankany amin'ny App-class. Manoro hevitra ihany koa aho hijery ny kilasin'ny fandrindrana, satria izy no tompon'andraikitra amin'ny ankamaroan'ny toe-javatra.

Ny ampahany lehibe indrindra

Agent amin'ny fanangonana sy fikojakojana ny lisitry ny antoka

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Noho izany, mahazo ny faust application object aloha isika - tena tsotra. Manaraka izany, manambara mazava lohahevitra iray ho an'ny masoivoho izahay... Eto dia ilaina ny milaza hoe inona izany, inona ny mari-pamantarana anatiny ary ahoana no ahafahana mandamina izany amin'ny fomba hafa.

  1. Ny lohahevitra ao amin'ny kafka, raha tiantsika ny hahafantatra ny famaritana marina, dia tsara kokoa ny mamaky miala. tahirin-kevitra, na azonao vakiana famintinana ao amin'ny Habré amin'ny teny Rosiana, izay hita taratra tsara ihany koa ny zava-drehetra :)

  2. Parameter anatiny, voafaritra tsara ao amin'ny faust doc, dia mamela antsika hanitsy ny lohahevitra mivantana ao amin'ny code, mazava ho azy, midika izany fa ny mari-pamantarana nomen'ny mpamorona faust, ohatra: fitazonana, politika fitazonana (famafana default, fa azonao atao ny mametraka mitambatra), isan'ny fisarahana isaky ny lohahevitra (maroatao, ohatra, latsaky ny manan-danja maneran-tany applications faust).

  3. Amin'ny ankapobeny, ny mpandraharaha dia afaka mamorona lohahevitra voatanisa miaraka amin'ny soatoavina manerantany, na izany aza, tiako ny manambara mazava ny zava-drehetra. Fanampin'izay, tsy azo amboarina ny masontsivana sasany (ohatra, ny isan'ny fizarazarana na ny politikan'ny fihazonana) amin'ny lohahevitra ao amin'ny dokam-barotra.

    Toy izao ny mety ho endrik'ilay izy raha tsy mamaritra amin'ny tanana ny lohahevitra:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Eny ary, andeha hofaritanay izay hataon'ny agent-nay :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Noho izany, amin'ny fiandohan'ny mpandraharaha, dia manokatra fivoriana aiohttp izahay ho an'ny fangatahana amin'ny alàlan'ny mpanjifanay. Noho izany, rehefa manomboka mpiasa, rehefa atomboka ny mpiasanay, dia hisokatra avy hatrany ny fivoriana iray - iray, mandritra ny fotoana rehetra iasan'ny mpiasa (na maromaro, raha manova ny parameter ianao fifanarahana avy amin'ny masoivoho manana unit default).

Manaraka, manaraka ny stream izahay (apetrakay ny hafatra ao _, satria izahay, ato amin'ity mpandraharaha ity, dia tsy miraharaha ny votoatin'ny) hafatra avy amin'ny lohahevitray, raha misy izy ireo amin'izao fotoana izao, raha tsy izany dia hiandry ny fahatongavany ny tsingerinay. Eny ary, ao anatin'ny tadivavaranay, dia misoratra anarana ny fandraisana ny hafatra, mahazo lisitry ny mavitrika (get_securities dia miverina mavitrika amin'ny alàlan'ny default, jereo ny kaody mpanjifa) ary tehirizo ao amin'ny tahiry, manamarina raha misy fiarovana mitovy amin'ny ticker ary mifanakalo amin'ny angon-drakitra , raha misy, dia havaozina fotsiny ilay izy (ny taratasy).

Andao hanomboka ny famoronana!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features singa web Tsy hodinihintsika ao amin'ny lahatsoratra ny faus, ka nametraka ny saina mety izahay.

Tao amin'ny baikon'ny fandefasanay dia nilaza tamin'i faust izahay hoe aiza no hitadiavana ilay zavatra fampiharana sy ny tokony hatao amin'izany (manomboha mpiasa) miaraka amin'ny haavon'ny famoahana info log. Mahazo ity vokatra manaraka ity izahay:

mpandringana

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Velona io!!!

Andeha hojerentsika ny set partition. Araka ny hitantsika, dia nisy lohahevitra noforonina miaraka amin'ny anarana izay notendrenay ao amin'ny kaody, ny isan'ny partitions default (8, nalaina tamin'ny lohahevitra_fizarana - Parameter object application), satria tsy nanondro sanda tsirairay ho an'ny lohahevitray izahay (amin'ny alàlan'ny fizarazarana). Ny solontena navoaka ao amin'ny mpiasa dia nomena ny fizarana 8 rehetra, satria izy irery ihany, fa horesahina amin'ny antsipiriany bebe kokoa amin'ny ampahany momba ny clustering.

Eny ary, afaka mandeha any amin'ny varavarankely terminal hafa isika ary mandefa hafatra tsy misy dikany amin'ny lohahevitray:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS mampiasa @ asehontsika fa mandefa hafatra amin'ny lohahevitra iray antsoina hoe "collect_securities".

Amin'ity tranga ity, ny hafatra dia nandeha tamin'ny fisarahana 6 - azonao atao ny manamarina izany amin'ny alàlan'ny kafdrop on localhost:9000

Mandeha any amin'ny varavarankelin'ny terminal miaraka amin'ny mpiasantsika isika dia hahita hafatra mahafaly alefa amin'ny fampiasana loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Afaka mijery ny mongo ihany koa isika (mampiasa Robo3T na Studio3T) ary mahita fa ao anaty tahiry ny securities:

Tsy miliaridera aho, ary noho izany dia afa-po amin'ny safidy fijerena voalohany izahay.

Asa fototra ao amin'ny Faust, Fizarana II: Agents and TeamsAsa fototra ao amin'ny Faust, Fizarana II: Agents and Teams

Fahasambarana sy fifaliana - vonona ny mpandraharaha voalohany :)

Agent vonona, ho ela velona ny agent vaovao!

Eny tompoko, ny 1/3 amin’ny lalana nomanina ato amin’ity lahatsoratra ity ihany no vitantsika, fa aza kivy fa izao dia ho mora kokoa.

Koa ankehitriny dia mila mpandraharaha iray manangona fampahalalana meta isika ary mametraka izany ao anaty antontan-taratasy fanangonana:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Koa satria ity mpandraharaha ity dia hikarakara fampahalalana momba ny fiarovana manokana, dia mila manondro ny mari-pamantarana (marika) amin'ity fiarovana ity amin'ny hafatra. Ho an'ity tanjona ity ao faus dia misy Records — kilasy izay manambara ny rafitra hafatra amin'ny lohahevitra mpandraharaha.

Amin'ity tranga ity, andao ho any records.py ary lazao hoe inona no tokony ho endriky ny hafatra ho an'ity lohahevitra ity:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Araka ny efa noeritreretinao, faust dia mampiasa ny karazana python fanoritsoritana ny schema hafatra, ka izany no mahatonga ny kinova farany ambany tohanan'ny tranomboky. 3.6.

Andao hiverina any amin'ny mpandraharaha, apetraho ireo karazana ary ampio:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Araka ny hitanao dia mandefa paramètre vaovao misy lamina mankany amin'ny fomba fanombohana lohahevitra - value_type. Fanampin'izany, ny zava-drehetra dia manaraka ny tetika mitovy, ka tsy hitako izay tokony hieritreretana zavatra hafa.

Eny, ny fikitihana farany dia ny manampy antso amin'ny mpanangom-baovao meta mba collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Mampiasa ny tetika nambara teo aloha izahay ho an'ny hafatra. Amin'ity tranga ity dia nampiasa ny fomba .cast aho satria tsy mila miandry ny vokatra avy amin'ny mpandraharaha isika, saingy mendrika ny manamarika fa fomba mandefasa hafatra amin'ny lohahevitra:

  1. cast - tsy manakana satria tsy manantena vokatra. Tsy afaka mandefa ny valiny amin'ny lohahevitra hafa ho hafatra ianao.

  2. mandefa - tsy manakana satria tsy manantena valiny. Azonao atao ny mamaritra mpandraharaha iray amin'ny lohahevitra izay halehan'ny valiny.

  3. manontany - miandry valiny. Azonao atao ny mamaritra mpandraharaha iray amin'ny lohahevitra izay halehan'ny valiny.

Noho izany, izany rehetra izany miaraka amin'ny masoivoho anio!

Ny ekipa nofy

Ny zavatra farany nampanantenaiko hosoratako amin'ity ampahany ity dia baiko. Araka ny voalaza teo aloha, ny baiko ao amin'ny fau dia fonon-tsindry. Raha ny marina, i faust dia ampifandraisina fotsiny amin'ny baiko mahazatra antsika amin'ny seha-pifandraisana rehefa mamaritra ny lakile -A

Taorian'ny nanambaran'ireo masoivoho tao agents.py ampio asa miaraka amin'ny haingo app.commandmiantso ny fomba nandatsaka у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Noho izany, raha miantso ny lisitry ny baiko isika dia ho ao anatiny ny baiko vaovao:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Azontsika ampiasaina toy ny olon-drehetra izany, koa andao atomboka indray ny mpiasa faust ary hanomboka fanangonana tahiry feno:

> faust -A horton.agents start-collect-securities

Inona no hitranga manaraka?

Amin'ny ampahany manaraka, amin'ny fampiasana ireo mpiasa sisa tavela ho ohatra, dia hodinihintsika ny mekanika milentika amin'ny fikatsahana tafahoatra amin'ny vidin'ny fanakatonana ny varotra ho an'ny taona sy ny fandefasana cron ny mpiasa.

Izay ihany ny anio! Misaotra namaky :)

Code ho an'ity ampahany ity

Asa fototra ao amin'ny Faust, Fizarana II: Agents and Teams

PS Eo ambanin'ny tapany farany dia nanontaniana momba ny faust sy confluent kafka aho (inona no mampiavaka ny confluent?). Toa ny confluent dia miasa kokoa amin'ny lafiny maro, fa ny zava-misy dia ny faust dia tsy manana fanohanan'ny mpanjifa feno amin'ny confluent - izany dia avy amin'ny famaritana ny famerana ny mpanjifa ao amin'ny doc.

Source: www.habr.com

Add a comment