Travay background sou Faust, Pati II: Ajan ak Ekip

Travay background sou Faust, Pati II: Ajan ak Ekip

Table of Contents

  1. Pati I: Entwodiksyon

  2. Pati II: Ajan ak Ekip

Kisa nap fè la?

Se konsa, se konsa, dezyèm pati a. Jan sa ekri pi bonè, nan li nou pral fè bagay sa yo:

  1. Ann ekri yon ti kliyan pou alphavantage sou aiohttp ak demann pou pwen final nou bezwen yo.

  2. Ann kreye yon ajan ki pral kolekte done sou sekirite ak meta enfòmasyon sou yo.

Men, sa a se sa nou pral fè pou pwojè a tèt li, ak an tèm de rechèch faust, nou pral aprann ki jan yo ekri ajan ki trete evènman kouran soti nan kafka, osi byen ke ki jan yo ekri kòmandman (klike sou wrapper), nan ka nou an - pou mesaj pouse manyèl nan sijè a ke ajan an ap kontwole.

Fòmasyon

AlphaVantage Kliyan

Premyèman, se pou yo ekri yon ti kliyan aiohttp pou demann alphavantage.

alphaavantage.py

beke

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Aktyèlman, tout bagay klè nan li:

  1. AlphaVantage API a se byen senp epi trè byen fèt, kidonk mwen deside fè tout demann atravè metòd la. construct_query kote nan vire gen yon apèl http.

  2. Mwen mennen tout jaden yo snake_case pou konfò.

  3. Oke, dekorasyon an logger.catch pou pwodiksyon traceback bèl ak enfòmatif.

PS pa bliye ajoute jeton alphavantage lokalman nan config.yml, oswa ekspòte varyab anviwònman an HORTON_SERVICE_APIKEY. Nou resevwa yon siy isit la.

Klas CRUD

Nou pral gen yon koleksyon sekirite nan magazen meta enfòmasyon sou sekirite.

database/security.py

Nan opinyon mwen, pa gen okenn bezwen eksplike anyen isit la, ak klas debaz la li menm se byen senp.

get_app()

Ann ajoute yon fonksyon pou kreye yon objè aplikasyon an app.py

beke

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Pou kounye a nou pral gen kreyasyon aplikasyon ki pi senp, yon ti kras pita nou pral elaji li, sepandan, yo nan lòd yo pa kenbe ou ap tann, isit la referans a App-klas. Mwen konseye w tou pran yon gade nan klas la anviwònman, paske li responsab pou pifò nan anviwònman yo.

Pati prensipal la

Ajan pou kolekte epi kenbe yon lis sekirite

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Se konsa, premye nou jwenn objè aplikasyon faust la - li byen senp. Apre sa, nou klèman deklare yon sijè pou ajan nou an... Isit la li vo mansyone sa li ye, ki sa paramèt entèn la se ak ki jan sa a ka ranje yon fason diferan.

  1. Sijè nan kafka, si nou vle konnen definisyon egzak la, li pi bon pou li koupe. dokiman, oswa ou ka li konpendim sou Habré nan Ris, kote tout bagay tou reflete byen avèk presizyon :)

  2. Paramèt entèn, ki dekri byen nan doc faust a, pèmèt nou konfigirasyon sijè a dirèkteman nan kòd la, nan kou, sa vle di paramèt yo bay devlopè yo faust, pou egzanp: retansyon, retansyon politik (pa default efase, men ou ka mete kontra enfòmèl ant), kantite patisyon pou chak sijè (nòtfè, pou egzanp, mwens pase siyifikasyon mondyal aplikasyon pou faust).

  3. An jeneral, ajan an ka kreye yon sijè jere ak valè mondyal, sepandan, mwen renmen deklare tout bagay klèman. Anplis de sa, kèk paramèt (pa egzanp, kantite patisyon oswa politik retansyon) nan sijè a nan reklam ajan an pa ka configuré.

    Men sa li ta ka sanble san yo pa defini sijè a manyèlman:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Oke, kounye a ann dekri sa ajan nou an pral fè :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Se konsa, nan kòmansman ajan an, nou louvri yon sesyon aiohttp pou demann atravè kliyan nou an. Kidonk, lè yo kòmanse yon travayè, lè ajan nou an lanse, yon sesyon pral imedyatman louvri - youn, pou tout tan travayè a ap kouri (oswa plizyè, si ou chanje paramèt la). konkouran soti nan yon ajan ki gen yon inite default).

Apre sa, nou swiv kouran an (nou mete mesaj la nan _, depi nou, nan ajan sa a, pa pran swen sou kontni an) nan mesaj ki soti nan sijè nou an, si yo egziste nan konpanse aktyèl la, otreman sik nou an ap tann pou rive yo. Oke, andedan bouk nou an, nou konekte resi mesaj la, jwenn yon lis sekirite aktif (get_securities retounen sèlman aktif pa default, gade kòd kliyan) epi sove li nan baz done a, tcheke si gen yon sekirite ki gen menm ticker la ak echanj nan baz done a, si gen, Lè sa a, li (papye a) pral tou senpleman mete ajou.

Ann lanse kreyasyon nou an!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Karakteristik eleman entènèt Mwen pa pral konsidere faust nan atik yo, kidonk nou mete drapo ki apwopriye a.

Nan lòd lansman nou an, nou te di faust ki kote pou chèche objè aplikasyon an ak sa pou w fè ak li (lanse yon travayè) ak nivo pwodiksyon boutèy enfòmasyon an. Nou jwenn pwodiksyon sa a:

beke

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Li vivan!!!

Ann gade nan seri patisyon an. Kòm nou ka wè, yo te kreye yon sijè ak non ke nou deziyen nan kòd la, nimewo a default nan patisyon (8, pran nan topic_partitions - paramèt objè aplikasyon), paske nou pa t presize yon valè endividyèl pou sijè nou an (via partisyon). Ajan an te lanse nan travayè a asiyen tout 8 patisyon, paske li se youn nan sèlman, men sa a pral diskite an plis detay nan pati a sou clustering.

Oke, kounye a nou ka ale nan yon lòt fenèt tèminal epi voye yon mesaj vid nan sijè nou an:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS itilize @ nou montre ke nou ap voye yon mesaj nan yon sijè ki rele "collect_securities".

Nan ka sa a, mesaj la te ale nan patisyon 6 - ou ka tcheke sa a pa ale nan kafdrop sou localhost:9000

Ale nan fennèt tèminal la ak travayè nou an, nou pral wè yon mesaj kontan voye lè l sèvi avèk loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Nou ka gade tou nan mongo (lè l sèvi avèk Robo3T oswa Studio3T) epi wè ke sekirite yo nan baz done a:

Mwen pa yon bilyonè, e se poutèt sa nou kontante nou ak premye opsyon gade.

Travay background sou Faust, Pati II: Ajan ak EkipTravay background sou Faust, Pati II: Ajan ak Ekip

Bonè ak kè kontan - premye ajan an pare :)

Ajan pare, viv nouvo ajan an!

Wi, mesye, nou te sèlman kouvri 1/3 nan chemen an prepare nan atik sa a, men pa dekouraje, paske kounye a li pral pi fasil.

Se konsa, kounye a nou bezwen yon ajan ki kolekte meta enfòmasyon epi mete l nan yon dokiman koleksyon:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Piske ajan sa a pral trete enfòmasyon sou yon sekirite espesifik, nou bezwen endike ticker (senbòl) sekirite sa a nan mesaj la. Pou rezon sa a nan faust genyen Albòm — klas ki deklare konplo mesaj la nan sijè ajan an.

Nan ka sa a, ann ale nan dosye.py epi dekri ki jan mesaj pou sijè sa a ta dwe sanble:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kòm ou ta ka devine, faust sèvi ak anotasyon tip python pou dekri chema mesaj la, ki se poukisa vèsyon minimòm bibliyotèk la sipòte se 3.6.

Ann retounen nan ajan an, mete kalite yo epi ajoute li:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kòm ou ka wè, nou pase yon nouvo paramèt ak yon konplo nan metòd inisyalizasyon sijè a - value_type. Pli lwen, tout bagay swiv menm konplo a, kidonk mwen pa wè okenn pwen nan rete sou nenpòt lòt bagay.

Oke, manyen final la se ajoute yon apèl nan ajan koleksyon meta enfòmasyon pou collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Nou itilize konplo a te anonse deja pou mesaj la. Nan ka sa a, mwen te itilize metòd .cast la paske nou pa bezwen tann rezilta nan ajan an, men li vo mansyone ke fason voye yon mesaj sou sijè a:

  1. jete - pa bloke paske li pa atann yon rezilta. Ou pa ka voye rezilta a nan yon lòt sijè kòm yon mesaj.

  2. voye - pa bloke paske li pa atann yon rezilta. Ou ka presize yon ajan nan sijè ki rezilta a pral ale.

  3. mande - tann yon rezilta. Ou ka presize yon ajan nan sijè ki rezilta a pral ale.

Se konsa, sa a tout ak ajan pou jodi a!

Ekip rèv la

Dènye bagay mwen te pwomèt pou m ekri nan pati sa a se kòmandman. Kòm mansyone pi bonè, kòmandman nan faust se yon wrapper alantou klike sou. An reyalite, faust tou senpleman atache kòmand koutim nou an nan koòdone li yo lè li presize kle a -A

Apre ajan yo te anonse nan agents.py ajoute yon fonksyon ak yon dekoratè app.commandrele metòd la jete у kolekte_sekirite:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Kidonk, si nou rele lis kòmandman an, nouvo kòmandman nou an pral ladan l:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Nou ka sèvi ak li tankou nenpòt lòt moun, kidonk ann rekòmanse travayè faust la epi kòmanse yon koleksyon sekirite konplè:

> faust -A horton.agents start-collect-securities

Kisa ki pral rive apre?

Nan pwochen pati a, lè l sèvi avèk ajan ki rete yo kòm yon egzanp, nou pral konsidere mekanis nan koule pou chèche ekstrèm nan pri yo fèmen nan komès pou ane a ak lansman an cron nan ajan.

Se tout pou jodi a! Mèsi pou lekti :)

Kòd pou pati sa a

Travay background sou Faust, Pati II: Ajan ak Ekip

PS Anba dènye pati a, mwen te mande sou faust ak konfluent kafka (ki karakteristik confluent genyen?). Li sanble ke confluent se pi fonksyonèl nan plizyè fason, men reyalite a se ke faust pa gen sipò konplè kliyan pou konfluent - sa a swiv soti nan. deskripsyon restriksyon kliyan nan doc la.

Sous: www.habr.com

Add nouvo kòmantè