KarĂȘn paƟün ĂȘn li ser Faust, Beß II: Ajan Ă» TĂźm

KarĂȘn paƟün ĂȘn li ser Faust, Beß II: Ajan Ă» TĂźm

Table of Contents

  1. Beß I: DestpĂȘk

  2. Beß II: Ajan Ă» TĂźm

Em li vir çi dikin?

Ji ber vĂȘ yekĂȘ, beßa duyemĂźn. WekĂź ku berĂȘ hatĂź nivĂźsandin, di wĂȘ de em ĂȘ jĂȘrĂźn bikin:

  1. Ka em ji bo alphavantage xerĂźdarek piçûk li aiohttp bi daxwazĂȘn xalĂȘn dawĂź yĂȘn ku em hewce ne binivĂźsin.

  2. Ka em ajanek biafirĂźnin ku dĂȘ daneyĂȘn li ser ewlehiyĂȘ Ă» agahdariya meta li ser wan berhev bike.

LĂȘ, ya ku em ĂȘ ji bo projeyĂȘ bixwe bikin ev e, Ă» di warĂȘ lĂȘkolĂźna faustĂȘ de, em ĂȘ fĂȘr bibin ka meriv çawa ajanĂȘn ku bĂ»yeran ji kafka diherikĂźnin binivĂźsin, Ă» her weha meriv çawa fermanan binivĂźsĂźne (wrapper bikirtĂźne), di doza me de - ji bo peyamĂȘn push manual ji bo mijara ku agent çavdĂȘriya.

Amadekirin

AlphaVantage Client

PĂȘƟün, bila ji bo daxwazĂȘn alphavantage xerĂźdarek aiohttp piçûk binivĂźsin.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Bi rastĂź, her tißt ji wĂȘ zelal e:

  1. AlphaVantage API pir sade Ă» xweßik hatĂź sĂȘwirandin, ji ber vĂȘ yekĂȘ min biryar da ku hemĂź daxwazan bi rĂȘbazĂȘ bikim construct_query li ku derĂȘ bangek http heye.

  2. Ez hemĂ» zeviyan bĂźnim ber snake_case ji bo rihetiyĂȘ.

  3. WelĂȘ, xemla logger.catch ji bo derketina ßopandina xweßik Ă» agahdar.

PS Ji bĂźr nekin ku nüƟana alphavantage-ya herĂȘmĂź li config.yml zĂȘde bikin, an guhĂȘrbara jĂźngehĂȘ derxĂźnin HORTON_SERVICE_APIKEY. Em nüƟanek distĂźnin vir.

çßna CRUD

Em ĂȘ berhevokek ewlehiyĂȘ hebe ku agahdariya meta di derheqĂȘ ewlehiyĂȘ de hilĂźne.

database/security.py

Bi dĂźtina min, ne hewce ye ku li vir tißtek were ravekirin, Ă» çßna bingehĂźn bixwe pir hĂȘsan e.

get_app ()

Werin em fonksiyonek ji bo afirandina objeyek serĂźlĂȘdanĂȘ lĂȘ zĂȘde bikin app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Heya nuha em ĂȘ xwedan çĂȘkirina serĂźlĂȘdanĂȘ ya herĂź hĂȘsan bin, piçek paßĂȘ em ĂȘ wĂȘ berfireh bikin, lĂȘbelĂȘ, ji bo ku hĂ»n li bendĂȘ nemĂźnin, li vir referansĂȘn ji bo App-class. Di heman demĂȘ de ez ji we re Ɵüret dikim ku hĂ»n li çßna mĂźhengan binihĂȘrin, ji ber ku ew ji piraniya mĂźhengan berpirsiyar e.

Beßek sereke

Agent ji bo berhevkirin Ă» domandina navnüƟek ewlehiyĂȘ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ji ber vĂȘ yekĂȘ, pĂȘƟü em armanca serĂźlĂȘdana faust digirin - ew pir hĂȘsan e. DĂ»v re, em bi eßkere mijarek ji bo nĂ»nerĂȘ xwe radigihĂźnin... Li vir hĂȘjayĂź gotinĂȘ ye ku ew çi ye, pĂźvana hundurĂźn çi ye Ă» çawa ev dikare bi rengek cĂ»da were saz kirin.

  1. MijarĂȘn di kafkayĂȘ de, ger em bixwazin pĂȘnaseya rast bizanibin, çĂȘtir e ku em bixwĂźnin ji. belge, an hĂ»n dikarin bixwĂźnin berhevok li ser HabrĂ© bi rĂ»sĂź, ku her tißt jĂź bi rengek rast xuya dike :)

  2. Parametreya navxweyĂź, ku di belgeya faustĂȘ de pir xweß tĂȘ diyar kirin, dihĂȘle ku em mijarĂȘ rasterast di kodĂȘ de mĂźheng bikin, bĂȘ guman, ev tĂȘ vĂȘ wateyĂȘ ku pĂźvanĂȘn ku ji hĂȘla pĂȘßdebirĂȘn faust ve tĂȘne peyda kirin, mĂźnakĂź: ragirtin, polĂźtĂźkaya ragirtinĂȘ (bi xwerĂ» jĂȘbirin, lĂȘ hĂ»n dikarin saz bikin gißt), hejmara beßan li ser her mijarĂȘ (partĂźsiyonanji bo nimĂ»ne, kĂȘmtir ji girĂźngiya cĂźhanĂź sepanĂȘn faust).

  3. Bi gelemperĂź, ajan dikare mijarek birĂȘvebirĂź bi nirxĂȘn gerdĂ»nĂź biafirĂźne, lĂȘbelĂȘ, ez dixwazim her tißtĂź bi eßkere eßkere bikim. WekĂź din, hin pĂźvan (mĂźnak, hejmara dabeßan an polĂźtĂźkaya ragirtinĂȘ) ya mijarĂȘ di reklama ajansĂȘ de nayĂȘ mĂźheng kirin.

    Li vir ev e ku ew bĂȘyĂź destnüƟankirina mijarĂȘ bi destan xuya dike:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

BelĂȘ, naha em diyar bikin ka dĂȘ nĂ»nerĂȘ me çi bike :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ji ber vĂȘ yekĂȘ, di destpĂȘka nĂ»nerĂȘ de, em ji bo daxwaznameyĂȘn bi navgĂźniya muwekĂźlĂȘ xwe ve rĂ»nißtinek aiohttp vedikin. Bi vĂź rengĂź, dema ku xebatkarek dest pĂȘ dike, dema ku nĂ»nerĂȘ me dest pĂȘ dike, dĂȘ tavilĂȘ daniƟünek were vekirin - yek, ji bo tevahiya dema ku xebatkar dimeƟüne (an çend, heke hĂ»n pĂźvanĂȘ biguherĂźnin hevdemĂź ji karmendek bi yekĂźneyek xwerĂ»).

DĂ»v re, em tĂźrĂȘjĂȘ dißopĂźnin (em peyamĂȘ tĂȘ de cih digirin _, ji ber ku em, di vĂȘ ajansĂȘ de, bala xwe nadin naverokĂȘ) peyamĂȘn ji mijara xwe, heke ew di dema niha de hebin, wekĂź din çerxa me dĂȘ li benda hatina wan bimĂźne. WelĂȘ, di hundurĂȘ lĂ»leya xwe de, em wergirtina peyamĂȘ tĂȘketinĂȘ, navnüƟek ewlekariya çalak (get_securities tenĂȘ ji hĂȘla xwerĂ» ve vedigere, koda xerĂźdar bibĂźne) distĂźnin Ă» wĂȘ li databasĂȘ hilĂźnin, kontrol dikin ka ewlehĂźyek bi heman tĂźpĂȘ heye Ă» danĂ»stendina di databasĂȘ de, heke hebe, wĂȘ hingĂȘ ew (kaxiz) tenĂȘ dĂȘ were nĂ»ve kirin.

Werin em afirandina xwe bidin destpĂȘkirin!

> docker-compose up -d
... ЗапусĐș ĐșĐŸĐœŃ‚Đ”ĐčĐœĐ”Ń€ĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info

TaybetmendiyĂȘn PS pĂȘkhateya webĂȘ Ez ĂȘ di gotaran de faustĂȘ nehesibĂźnim, ji ber vĂȘ yekĂȘ em ala guncan destnüƟan dikin.

Di fermana destpĂȘkirina xwe de, me ji faustĂȘ re got ku li kuderĂȘ li hĂȘmana serĂźlĂȘdanĂȘ bigere Ă» bi wĂȘ re çi bike (karkerek bide destpĂȘkirin) bi asta derana tĂȘketina agahdariyĂȘ. Em encamek jĂȘrĂźn bistĂźnin:

Spoiler

ڟa”S† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┮───────────────────────────────────────────────────┘
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┌─────────────
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┮────────────┘ 

Ew zindĂź ye!!!

Ka em li set dabeßkirinĂȘ binĂȘrin. WekĂź ku em dibĂźnin, mijarek bi navĂȘ ku me di kodĂȘ de destnüƟan kiriye, hejmara xwerĂ» ya dabeßan (8, ji topic_partitions - Parametreya objeya serĂźlĂȘdanĂȘ), ji ber ku me ji bo mijara xwe nirxek kesane diyar nekir (bi riya dabeßan). Di xebatkarĂȘ de nĂ»nerĂȘ dest pĂȘkirĂź ji her 8 dabeßan re tĂȘne destnüƟan kirin, ji ber ku ew yekane ye, lĂȘ ev ĂȘ di beßa di derbarĂȘ kombĂ»nĂȘ de bi hĂ»rgulĂź were nĂźqaß kirin.

WelĂȘ, naha em dikarin biçin pencereyek termĂźnalek din Ă» peyamek vala ji mijara xwe re biƟünin:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS bikar tĂźne @ em nüƟan didin ku em ji mijarek bi navĂȘ "collect_securities" re peyamek diƟünin.

Di vĂȘ rewßĂȘ de, peyam çû dabeßkirina 6 - hĂ»n dikarin vĂȘ yekĂȘ bi çûna kafdrop-ĂȘ kontrol bikin localhost:9000

Bi xebatkarĂȘ xwe re diçin pencereya termĂźnalĂȘ, em ĂȘ peyamek kĂȘfxweß bibĂźnin ku bi karanĂźna loguru hatĂź ßandin:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Em dikarin li mongo jĂź binihĂȘrin (bi Robo3T an Studio3T bikar tĂźnin) Ă» bibĂźnin ku ewlekarĂź di databasĂȘ de ne:

Ez ne mĂźlyarder im, Ă» ji ber vĂȘ yekĂȘ em bi vebijarka dĂźtina yekem razĂź ne.

KarĂȘn paƟün ĂȘn li ser Faust, Beß II: Ajan Ă» TĂźmKarĂȘn paƟün ĂȘn li ser Faust, Beß II: Ajan Ă» TĂźm

KĂȘfxweƟü Ă» ßahĂź - nĂ»nerĂȘ yekem amade ye :)

Agent amade ye, bijĂź ajanĂȘ nĂ»!

ErĂȘ, birĂȘzan, me tenĂȘ 1/3 riya ku ji hĂȘla vĂȘ gotarĂȘ ve hatĂź amadekirin vegirtiye, lĂȘ dilteng nebin, ji ber ku niha ew ĂȘ hĂȘsantir be.

Ji ber vĂȘ yekĂȘ naha ji me re karmendek pĂȘdivĂź ye ku agahdariya meta berhev dike Ă» wĂȘ dixe nav belgeyek berhevokĂȘ:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ji ber ku ev ajan dĂȘ di derheqĂȘ ewlehĂźyek taybetĂź de agahdariya pĂȘvajoyĂȘ bike, pĂȘdivĂź ye ku em di peyamĂȘ de nüƟana (sembola) vĂȘ ewlehiyĂȘ destnüƟan bikin. Ji bo vĂȘ armancĂȘ di faustĂȘ de hene Records - dersĂȘn ku di mijara agentĂȘ de nexßeya peyamĂȘ radigihĂźnin.

Di vĂȘ rewßĂȘ de, em biçin records.py Ă» diyar bike ka peyama vĂȘ mijarĂȘ divĂȘ çawa xuya bike:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

WekĂź ku we texmĂźn kir, faust annotation tĂźpa python bikar tĂźne da ku ßema peyamĂȘ rave bike, ji ber vĂȘ yekĂȘ guhertoya herĂź kĂȘm a ku ji hĂȘla pirtĂ»kxaneyĂȘ ve hatĂź pißtgirĂź kirin e. 3.6.

Ka em vegerin ser ajanĂȘ, celeb saz bikin Ă» lĂȘ zĂȘde bikin:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

WekĂź ku hĂ»n dibĂźnin, em bi pileyek pĂźvanek nĂ» ji rĂȘbaza destpĂȘkirina mijarĂȘ re derbas dikin - value_type. Digel vĂȘ yekĂȘ, her tißt heman pilanĂȘ dißopĂźne, ji ber vĂȘ yekĂȘ ez ti xalĂȘ nabĂźnim ku ez li ser tißtek din bisekinim.

WelĂȘ, pĂȘwendiya paƟün ev e ku meriv bangek li nĂ»nerĂȘ berhevkirina agahdariya meta zĂȘde bike da ku collect_securites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Em ji bo peyamĂȘ nexßeya ku berĂȘ hatĂź ragihandin bikar tĂźnin. Di vĂȘ rewßĂȘ de, min rĂȘbaza .cast bikar anĂź ji ber ku em ne hewce ne ku li benda encamĂȘ ji nĂ»nerĂȘ bisekinin, lĂȘ hĂȘjayĂź gotinĂȘ ye ku awayĂȘn ji mijarĂȘ re peyamek biƟünin:

  1. avĂȘtin - asteng nake ji ber ku ew li hĂȘviya encamek ne. HĂ»n nikarin encamĂȘ wekĂź peyamek ji mijarek din re biƟünin.

  2. ßandin - asteng nake ji ber ku encamek hĂȘvĂź nake. HĂ»n dikarin di mijara ku encam jĂȘ re diçe de karmendek diyar bikin.

  3. bipirse - li benda encamĂȘ ye. HĂ»n dikarin di mijara ku encam jĂȘ re diçe de karmendek diyar bikin.

Ji ber vĂȘ yekĂȘ, ew hemĂź bi ajanĂȘn Ăźro re ye!

TĂźma xewnĂȘ

Tißta dawĂź ku min soz da ku di vĂȘ beßĂȘ de binivĂźsim ferman e. WekĂź ku berĂȘ hate behs kirin, fermanĂȘn di faustĂȘ de li dora klĂźk pĂȘça ne. Bi rastĂź, faust dema ku mifteya -A destnüƟan dike, tenĂȘ fermana meya xwerĂ» bi navbeyna xwe ve girĂȘdide

PißtĂź ku ajanĂȘn hatin ragihandin ketin agents.py fonksiyonek bi dekoratorĂȘ zĂȘde bike app.commandgazĂźkirina rĂȘbazĂȘ avdan у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Bi vĂź rengĂź, heke em navnüƟa fermanan bang bikin, emrĂȘ meya nĂ» dĂȘ tĂȘ de be:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Em dikarin wĂȘ mĂźna her kesĂȘ bikar bĂźnin, ji ber vĂȘ yekĂȘ em xebatkarĂȘ faust ji nĂ» ve bidin destpĂȘkirin Ă» berhevokek bĂȘkĂȘmasĂź ya ewlehiyĂȘ dest pĂȘ bikin:

> faust -A horton.agents start-collect-securities

DĂȘ paßĂȘ çi bibe?

Di beßa paƟün de, wekĂź mĂźnakek ajanĂȘn mayĂź bikar bĂźnin, em ĂȘ mekanĂźzmaya sink ji bo lĂȘgerĂźna tundĂ»tĂ»jiyĂȘ di bihayĂȘn girtina bazirganiya salĂȘ de Ă» destpĂȘkirina krona ajanan de binirxĂźnin.

Ji bo Ăźro her tißt e! Spas ji bo xwendinĂȘ :)

Koda ji bo vĂȘ beßĂȘ

KarĂȘn paƟün ĂȘn li ser Faust, Beß II: Ajan Ă» TĂźm

PS Di beßa dawĂźn de ji min hat pirsĂźn kafka faust Ă» tevlihev (çi taybetiyĂȘn hevgirtĂź hene?). Wusa dixuye ku confluent bi gelek awayan fonksiyoneltir e, lĂȘ rastiyek ev e ku faust ji bo konfluentĂȘ xwedan pißtgiriyek tam a xerĂźdar nĂźne - ev ji danasĂźnĂȘn sĂźnorkirinĂȘn xerĂźdar di doc.

Source: www.habr.com

Ji bo malperĂȘn bi parastina DDoS, serverĂȘn VPS VDS mĂȘvandariya pĂȘbawer bikirin đŸ”„ Hostinga malperĂȘ ya pĂȘbawer bi parastina DDoS, serverĂȘn VPS VDS bikirin | ProHoster