په فاسټ کې د شالید دندې، دویمه برخه: اجنټان او ټیمونه

په فاسټ کې د شالید دندې، دویمه برخه: اجنټان او ټیمونه

فهرست

  1. لومړۍ برخه: پیژندنه

  2. دویمه برخه: استازي او ټیمونه

موږ دلته څه کوو؟

نو، نو، دویمه برخه. لکه څنګه چې مخکې لیکل شوي، پدې کې به موږ لاندې کار وکړو:

  1. راځئ چې په aiohttp کې د الفاوانټیج لپاره یو کوچنی پیرودونکی ولیکئ د پای ټکي لپاره غوښتنې سره چې موږ ورته اړتیا لرو.

  2. راځئ چې یو اجنټ جوړ کړو چې د امنیت او میټا معلوماتو په اړه معلومات راټول کړي.

مګر، دا هغه څه دي چې موږ به پخپله د پروژې لپاره ترسره کړو، او د چټکې څیړنې په شرایطو کې، موږ به د اجنټانو لیکلو څرنګوالی زده کړو چې د کافکا څخه پیښې پروسس کوي، او همدارنګه د امرونو لیکلو څرنګوالی (کلک ریپر)، زموږ په قضیه کې - د لاسي پیغامونو لپاره موضوع ته چې اجنټ یې څارنه کوي.

د چمتو کولو لپاره

د AlphaVantage مراجع

لومړی، راځئ چې د الفاوانټیج غوښتنې لپاره یو کوچنی aiohttp مراجع ولیکئ.

alphavantage.py

ټوټه ټوټه

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

په حقیقت کې، هرڅه له دې څخه روښانه دي:

  1. د AlphaVantage API خورا ساده او ښکلی ډیزاین شوی، نو ما پریکړه وکړه چې ټولې غوښتنې د میتود له لارې ترسره کړم construct_query چیرې چې په بدل کې د http کال شتون لري.

  2. زه ټولې ساحې ته راوړم snake_case د آرامۍ لپاره.

  3. ښه، د ښکلي او معلوماتي ټریس بیک محصول لپاره logger.catch سجاوٹ.

PS مه هیروئ چې په ځایی ډول config.yml ته د الفاوانټاج نښه اضافه کړئ ، یا د چاپیریال متغیر صادر کړئ HORTON_SERVICE_APIKEY. موږ یوه نښه ترلاسه کوو دلته.

د CRUD ټولګي

موږ به د امنیتونو په اړه د میټا معلوماتو ذخیره کولو لپاره د امنیت ټولګه ولرو.

database/security.py

زما په اند، دلته هیڅ شی تشریح کولو ته اړتیا نشته، او د اساس ټولګی پخپله خورا ساده دی.

get_app()

راځئ چې د اپلیکیشن اعتراض رامینځته کولو لپاره یو فنکشن اضافه کړو app.py

ټوټه ټوټه

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

د اوس لپاره به موږ ترټولو ساده غوښتنلیک جوړ کړو، یو څه وروسته به یې پراخه کړو، په هرصورت، د دې لپاره چې تاسو انتظار ونه ساتو، دلته حوالې د اپلیکیشن ټولګي ته. زه تاسو ته هم مشوره درکوم چې د ترتیباتو ټولګي ته یو نظر وګورئ، ځکه چې دا د ډیری ترتیباتو مسولیت لري.

اصلي برخه

د تضمیناتو لیست راټولولو او ساتلو لپاره اجنټ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

نو، لومړی موږ د فاسټ غوښتنلیک اعتراض ترلاسه کوو - دا خورا ساده دی. بیا، موږ په ښکاره ډول زموږ د اجنټ لپاره یوه موضوع اعلانوو ... دلته دا د یادولو وړ ده چې دا څه دي، داخلي پیرامیټر څه شی دی او دا څنګه په مختلف ډول تنظیم کیدی شي.

  1. په کافکا کې موضوعات، که موږ غواړو دقیق تعریف پوه شو، دا غوره ده چې ولولئ بند سند، یا تاسو لوستلی شئ ټولګه په روسی کې په هابری کې، چیرې چې هرڅه هم په سمه توګه منعکس کیږي :)

  2. داخلي پارامتر، په فاسټ ډاک کې خورا ښه بیان شوی ، موږ ته اجازه راکوي چې موضوع مستقیم په کوډ کې تنظیم کړو ، البته ، دا پدې معنی ده چې د فاسټ پراختیا کونکو لخوا چمتو شوي پیرامیټونه ، د مثال په توګه: ساتل ، د ساتلو پالیسي (د ډیفالټ حذف کول ، مګر تاسو کولی شئ تنظیم کړئ تړوند هرې موضوع د ویشونو شمیر (نمرېد مثال په توګه، لږ څه کول نړیوال اهمیت غوښتنلیکونه فاسټ).

  3. په عموم کې، اجنټ کولی شي د نړیوالو ارزښتونو سره اداره شوې موضوع رامینځته کړي، په هرصورت، زه غواړم هر څه په ښکاره توګه اعلان کړم. برسېره پردې، د اجنټ په اعلان کې د موضوع ځینې پیرامیټونه (د بیلګې په توګه، د برخو شمیر یا د ساتلو پالیسي) نشي تنظیم کیدی.

    دلته هغه څه دي چې کیدای شي په لاسي ډول د موضوع تعریف کولو پرته ښکاري:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ښه، اوس راځئ چې تشریح کړو چې زموږ استازی به څه وکړي :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

نو ، د اجنټ په پیل کې ، موږ زموږ د پیرودونکي له لارې غوښتنو لپاره aiohttp سیشن خلاصوو. پدې توګه ، کله چې د کارګر پیل کول ، کله چې زموږ اجنټ پیل شي ، سمدلاسه به یوه ناسته پرانستل شي - یو ، د ټول وخت لپاره چې کارګر روان وي (یا څو ، که تاسو پیرامیټر بدل کړئ اتفاق د ډیفالټ واحد سره د اجنټ څخه).

بیا، موږ جریان تعقیب کوو (موږ پیغام په کې ځای په ځای کوو _، ځکه چې موږ ، پدې اجنټ کې ، زموږ د موضوع څخه د پیغامونو مینځپانګې ته پاملرنه نه کوو ، که دوی په اوسني آفسیټ کې شتون ولري ، که نه نو زموږ دوره به د دوی رارسیدو ته انتظار باسي. ښه، زموږ په لوپ کې دننه، موږ د پیغام رسید لاګ کوو، د فعال لیست ترلاسه کوو (get_securities یوازې د ډیفالټ له مخې فعاله راګرځي، د پیرودونکي کوډ وګورئ) امنیتونه او ډیټابیس ته یې خوندي کړئ، وګورئ چې د ورته ټیکر سره امنیت شتون لري او که نه. په ډیټابیس کې تبادله، که شتون ولري، نو دا (کاغذ) به په ساده ډول تازه شي.

راځئ چې زموږ جوړونه پیل کړو!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

د PS ځانګړتیاوې ویب برخه زه به په مقالو کې فاسټ په پام کې ونیسم، نو موږ مناسب بیرغ ترتیب کړ.

زموږ د لانچ کمانډ کې ، موږ فاسټ ته وویل چې چیرې د غوښتنلیک څیز په لټه کې شئ او د دې سره څه وکړئ (کارګر پیل کړئ) د معلوماتو لاګ محصول کچې سره. موږ لاندې محصول ترلاسه کوو:

ټوټه ټوټه

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

ژوندی دی!!!

راځئ چې د برخې برخې ته وګورو. لکه څنګه چې موږ لیدلی شو، یوه موضوع د هغه نوم سره جوړه شوې وه چې موږ یې په کوډ کې ډیزاین کړې، د ویشونو اصلي شمیره (8، له دې څخه اخیستل شوي. topic_partitions - د غوښتنلیک اعتراض پیرامیټر)، ځکه چې موږ د خپلې موضوع لپاره انفرادي ارزښت ندی مشخص کړی (د برخې له لارې). په کارګر کې پیل شوی اجنټ ټولې 8 برخې ګمارل شوي ، ځکه چې دا یوازې یو دی ، مګر دا به د کلستر کولو په برخه کې په ډیر تفصیل سره بحث وشي.

ښه، اوس موږ کولی شو بلې ټرمینل کړکۍ ته لاړ شو او زموږ موضوع ته یو خالي پیغام ولیږو:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS کارول @ موږ وښیو چې موږ د "collect_securities" په نوم یوې موضوع ته پیغام لیږو.

پدې حالت کې ، پیغام 6 برخې ته لاړ - تاسو کولی شئ دا د کافډراپ آن ته لاړشئ localhost:9000

زموږ د کارګر سره د ترمینل کړکۍ ته ځي، موږ به د خوښۍ پیغام وګورو چې د لوګورو په کارولو سره لیږل شوی:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

موږ کولی شو مونګو ته هم وګورو (د Robo3T یا Studio3T په کارولو سره) او وګورو چې امنیت په ډیټابیس کې دي:

زه ملیاردر نه یم، او له همدې امله موږ د لومړي لید اختیار څخه راضي یو.

په فاسټ کې د شالید دندې، دویمه برخه: اجنټان او ټیمونهپه فاسټ کې د شالید دندې، دویمه برخه: اجنټان او ټیمونه

خوښۍ او خوښۍ - لومړی اجنټ چمتو دی :)

اجنټ چمتو دی ، ژوندی دې وي نوی اجنټ!

هو ، ښاغلو ، موږ یوازې د دې مقالې لخوا چمتو شوې لاره 1/3 پوښلې ده ، مګر مایوسه مه کوئ ، ځکه چې اوس به اسانه وي.

نو اوس موږ یو اجنټ ته اړتیا لرو چې د میټا معلومات راټول کړي او د راټولولو سند کې یې واچوي:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

څرنګه چې دا اجنټ به د یو ځانګړي امنیت په اړه معلومات پروسس کړي، موږ باید په پیغام کې د دې امنیت ټیکر (سمبول) په ګوته کړو. د دې هدف لپاره په فاسټ کې شتون لري سوابق - هغه ټولګي چې د ایجنټ موضوع کې د پیغام سکیم اعلانوي.

په دې حالت کې، راځئ چې لاړ شو records.py او تشریح کړئ چې د دې موضوع لپاره پیغام باید څه ډول وي:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

لکه څنګه چې تاسو اټکل کړی وي، فاسټ د پیغام سکیما تشریح کولو لپاره د python ډول تشریح کاروي، له همدې امله د کتابتون لخوا ملاتړ شوی لږترلږه نسخه ده 3.6.

راځئ چې اجنټ ته راستون شو، ډولونه یې تنظیم کړئ او اضافه کړئ:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

لکه څنګه چې تاسو لیدلی شئ، موږ د سکیم سره د موضوع پیل کولو میتود ته یو نوی پیرامیټر انتقالوو - value_type. سربیره پردې ، هرڅه ورته سکیم تعقیبوي ، نو زه په بل څه کې د اوسیدو هیڅ ټکی نه ګورم.

ښه، وروستی ټچ د میټا معلوماتو راټولولو اجنټ ته د کلیک_سیکوریټیټس لپاره زنګ اضافه کول دي:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

موږ د پیغام لپاره دمخه اعلان شوی سکیم کاروو. پدې حالت کې ، ما د .cast میتود کارولی ځکه چې موږ اړتیا نلرو د اجنټ پایلې ته انتظار وباسو ، مګر دا د یادونې وړ ده لارې موضوع ته یو پیغام ولېږئ:

  1. کاسټ - بلاک نه کوي ځکه چې دا د پایلې تمه نلري. تاسو نشئ کولی پایلې بلې موضوع ته د پیغام په توګه واستوئ.

  2. لیږل - بلاک نه کوي ځکه چې دا د پایلې تمه نلري. تاسو کولی شئ په موضوع کې یو استازی مشخص کړئ چې پایله به یې لاړ شي.

  3. پوښتنه وکړئ - پایلې ته انتظار وکړئ. تاسو کولی شئ په موضوع کې یو استازی مشخص کړئ چې پایله به یې لاړ شي.

نو، دا ټول د نن ورځې لپاره د اجنټانو سره دي!

د خوب ټیم

وروستی شی چې ما په دې برخه کې د لیکلو ژمنه کړې وه حکمونه دي. لکه څنګه چې مخکې یادونه وشوه، په فاسټ کې کمانډونه د کلک شاوخوا یو ریپر دی. په حقیقت کې، فاسټ په ساده ډول زموږ دودیز کمانډ خپل انٹرفیس ته ضمیمه کوي کله چې د -A کیلي مشخص کوي

وروسته اعلان شوي اجنټان په کې agents.py د سینګار سره یو فنکشن اضافه کړئ app.commandطریقه غږول کاسټ у جمع_امنیتونه:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

په دې توګه، که موږ د قوماندې لیست ووایو، زموږ نوې کمانډ به پدې کې وي:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

موږ کولی شو دا د بل چا په څیر وکاروو، نو راځئ چې فاسټ کارګر بیا پیل کړو او د تضمیناتو بشپړ ټولګه پیل کړو:

> faust -A horton.agents start-collect-securities

بیا به څه کیږي؟

په راتلونکې برخه کې، د مثال په توګه د پاتې اجنټانو په کارولو سره، موږ به د کال لپاره د سوداګرۍ د تړلو نرخونو او د اجنټانو کرون لانچ کې د افراطیت لټون کولو لپاره د سنک میکانیزم په پام کې ونیسو.

دا ټول د نن ورځې لپاره دي! د لوستلو لپاره مننه :)

د دې برخې لپاره کوډ

په فاسټ کې د شالید دندې، دویمه برخه: اجنټان او ټیمونه

PS په وروستۍ برخه کې له ما څخه د فاسټ او متضاد کافکا په اړه پوښتنه وشوه (کنفرنس څه ځانګړتیاوې لري؟). داسې ښکاري چې کنفلونټ په ډیرو لارو کې ډیر فعال دی، مګر حقیقت دا دی چې فاسټ د کنفلونټ لپاره د مراجعینو بشپړ ملاتړ نلري - دا د دې څخه تعقیب کیږي. په سند کې د پیرودونکي محدودیتونو توضیحات.

سرچینه: www.habr.com

Add a comment