فاسٹ پر پس منظر کے کام، حصہ دوم: ایجنٹ اور ٹیمیں۔

فاسٹ پر پس منظر کے کام، حصہ دوم: ایجنٹ اور ٹیمیں۔

مواد کی میز

  1. حصہ اول: تعارف

  2. حصہ دوم: ایجنٹ اور ٹیمیں۔

ہم یہاں کیا کر رہے ہیں؟

تو، تو، دوسرا حصہ. جیسا کہ پہلے لکھا گیا ہے، اس میں ہم درج ذیل کام کریں گے۔

  1. آئیے aiohttp پر الفاوانٹیج کے لیے ایک چھوٹا سا کلائنٹ لکھتے ہیں جن کی ہمیں ضرورت کے اختتامی پوائنٹس کی درخواستیں ہیں۔

  2. آئیے ایک ایسا ایجنٹ بنائیں جو سیکیورٹیز کا ڈیٹا اور ان پر میٹا معلومات اکٹھا کرے۔

لیکن، یہ وہی ہے جو ہم خود پروجیکٹ کے لیے کریں گے، اور تیز تحقیق کے لحاظ سے، ہم یہ سیکھیں گے کہ ایسے ایجنٹوں کو کیسے لکھنا ہے جو کافکا سے واقعات کو پروسیس کرتے ہیں، اور ساتھ ہی ہمارے معاملے میں کمانڈز (کلک ریپر) کو کیسے لکھنا ہے۔ اس موضوع پر دستی پیغامات کے لیے جس کی ایجنٹ نگرانی کر رہا ہے۔

ٹریننگ

الفا وینٹیج کلائنٹ

سب سے پہلے، آئیے ایک چھوٹا سا aiohttp کلائنٹ لکھتے ہیں کہ الفاوانٹیج کی درخواستوں کے لیے۔

alphavantage.py

بگاڑنے

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

دراصل، اس سے سب کچھ واضح ہے:

  1. AlphaVantage API کافی آسان اور خوبصورتی سے ڈیزائن کیا گیا ہے، اس لیے میں نے طریقہ کے ذریعے تمام درخواستیں کرنے کا فیصلہ کیا۔ construct_query جہاں بدلے میں ایک HTTP کال ہوتی ہے۔

  2. میں تمام کھیتوں کو لاتا ہوں۔ snake_case آرام کے لئے.

  3. ٹھیک ہے، خوبصورت اور معلوماتی ٹریس بیک آؤٹ پٹ کے لیے logger.catch کی سجاوٹ۔

PS مقامی طور پر config.yml میں الفاوانٹیج ٹوکن شامل کرنا نہ بھولیں، یا ماحولیاتی متغیر کو برآمد کریں۔ HORTON_SERVICE_APIKEY. ہمیں ایک ٹوکن ملتا ہے۔ یہاں.

CRUD کلاس

ہمارے پاس سیکیورٹیز کے بارے میں میٹا معلومات کو ذخیرہ کرنے کے لیے سیکیورٹیز کا مجموعہ ہوگا۔

database/security.py

میری رائے میں، یہاں کسی بھی چیز کی وضاحت کرنے کی ضرورت نہیں ہے، اور بیس کلاس خود بہت آسان ہے.

get_app()

آئیے میں ایپلی کیشن آبجیکٹ بنانے کے لیے ایک فنکشن شامل کریں۔ app.py

بگاڑنے

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

ابھی ہمارے پاس سب سے آسان ایپلی کیشن تخلیق ہوگی، تھوڑی دیر بعد ہم اسے بڑھا دیں گے، تاہم، آپ کو انتظار نہ کرنے کے لیے، یہاں حوالہ جات ایپ کلاس میں۔ میں ترتیبات کی کلاس پر ایک نظر ڈالنے کی بھی سفارش کرتا ہوں، کیونکہ یہ زیادہ تر ترتیبات کے لیے ذمہ دار ہے۔

اہم حصہ

سیکیورٹیز کی فہرست جمع کرنے اور برقرار رکھنے کے لیے ایجنٹ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

تو، سب سے پہلے ہمیں فاسٹ ایپلیکیشن آبجیکٹ ملتا ہے - یہ کافی آسان ہے۔ اس کے بعد، ہم اپنے ایجنٹ کے لیے واضح طور پر ایک موضوع کا اعلان کرتے ہیں... یہاں یہ بات قابل ذکر ہے کہ یہ کیا ہے، اندرونی پیرامیٹر کیا ہے اور اسے مختلف طریقے سے کیسے ترتیب دیا جا سکتا ہے۔

  1. کافکا میں عنوانات، اگر ہم صحیح تعریف جاننا چاہتے ہیں، تو اسے پڑھنا بہتر ہے۔ بند. دستاویز، یا آپ پڑھ سکتے ہیں۔ خلاصہ روسی میں Habré پر، جہاں ہر چیز بالکل درست طریقے سے جھلکتی ہے :)

  2. اندرونی پیرامیٹر, faust doc میں بہت اچھی طرح سے بیان کیا گیا ہے، ہمیں موضوع کو براہ راست کوڈ میں ترتیب دینے کی اجازت دیتا ہے، یقیناً، اس کا مطلب ہے فاسٹ ڈویلپرز کے فراہم کردہ پیرامیٹرز، مثال کے طور پر: برقرار رکھنے، برقرار رکھنے کی پالیسی (بطور ڈیفالٹ ڈیلیٹ، لیکن آپ سیٹ کر سکتے ہیں۔ کمپیکٹ) فی موضوع پارٹیشنز کی تعداد (سکورکرنا، مثال کے طور پر، اس سے کم عالمی اہمیت ایپلی کیشنز فاسٹ)۔

  3. عام طور پر، ایجنٹ عالمی اقدار کے ساتھ ایک منظم موضوع بنا سکتا ہے، تاہم، میں ہر چیز کا واضح طور پر اعلان کرنا چاہتا ہوں۔ اس کے علاوہ، ایجنٹ کے اشتہار میں موضوع کے کچھ پیرامیٹرز (مثال کے طور پر پارٹیشنز کی تعداد یا برقرار رکھنے کی پالیسی) کو ترتیب نہیں دیا جا سکتا۔

    موضوع کو دستی طور پر بیان کیے بغیر یہ کیسا نظر آتا ہے:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ٹھیک ہے، اب ہم بیان کرتے ہیں کہ ہمارا ایجنٹ کیا کرے گا :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

لہذا، ایجنٹ کے آغاز میں، ہم اپنے کلائنٹ کے ذریعے درخواستوں کے لیے ایک aiohttp سیشن کھولتے ہیں۔ اس طرح، ورکر کو شروع کرتے وقت، جب ہمارا ایجنٹ لانچ کیا جائے گا، تو فوراً ایک سیشن کھول دیا جائے گا - ایک، اس پورے وقت کے لیے جب کارکن چل رہا ہے (یا کئی، اگر آپ پیرامیٹر کو تبدیل کرتے ہیں۔ اتفاق پہلے سے طے شدہ یونٹ والے ایجنٹ سے)۔

اگلا، ہم اس سلسلے کی پیروی کرتے ہیں (ہم پیغام کو اندر رکھتے ہیں۔ _چونکہ ہم، اس ایجنٹ میں، ہمارے موضوع کے پیغامات کے مواد کی پرواہ نہیں کرتے ہیں، اگر وہ موجودہ آفسیٹ پر موجود ہیں، بصورت دیگر ہمارا سائیکل ان کی آمد کا انتظار کرے گا۔ ٹھیک ہے، اپنے لوپ کے اندر، ہم پیغام کی رسید کو لاگ ان کرتے ہیں، ایکٹیو کی فہرست حاصل کرتے ہیں (get_securities صرف ڈیفالٹ کے طور پر فعال ہوتے ہیں، کلائنٹ کوڈ دیکھیں) سیکیورٹیز اور اسے ڈیٹا بیس میں محفوظ کرتے ہیں، یہ چیک کرتے ہیں کہ آیا اسی ٹکر کے ساتھ سیکیورٹی موجود ہے یا نہیں۔ ڈیٹا بیس میں تبادلہ، اگر وہاں ہے، تو یہ (کاغذ) آسانی سے اپ ڈیٹ کیا جائے گا.

آئیے اپنی تخلیق کا آغاز کریں!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

پی ایس کی خصوصیات ویب جزو میں مضامین میں فاسٹ پر غور نہیں کروں گا، لہذا ہم نے مناسب جھنڈا مقرر کیا۔

ہماری لانچ کمانڈ میں، ہم نے فاسٹ کو بتایا کہ ایپلی کیشن آبجیکٹ کو کہاں تلاش کرنا ہے اور اس کے ساتھ کیا کرنا ہے (ایک کارکن لانچ کریں) انفارمیشن لاگ آؤٹ پٹ لیول کے ساتھ۔ ہمیں درج ذیل آؤٹ پٹ ملتا ہے:

بگاڑنے

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

یہ زندہ ہے!!!

آئیے پارٹیشن سیٹ کو دیکھتے ہیں۔ جیسا کہ ہم دیکھ سکتے ہیں، ایک عنوان اس نام کے ساتھ بنایا گیا تھا جسے ہم نے کوڈ میں نامزد کیا تھا، پارٹیشنز کی ڈیفالٹ تعداد (8، سے لیا گیا topic_partitions - ایپلیکیشن آبجیکٹ پیرامیٹر)، چونکہ ہم نے اپنے موضوع کے لیے انفرادی قدر کی وضاحت نہیں کی (بذریعہ پارٹیشنز)۔ ورکر میں لانچ کردہ ایجنٹ کو تمام 8 پارٹیشنز تفویض کیے گئے ہیں، کیونکہ یہ صرف ایک ہے، لیکن اس پر کلسٹرنگ کے بارے میں مزید تفصیل سے بات کی جائے گی۔

ٹھیک ہے، اب ہم ایک اور ٹرمینل ونڈو پر جا سکتے ہیں اور اپنے موضوع پر ایک خالی پیغام بھیج سکتے ہیں:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

پی ایس کا استعمال کرتے ہوئے @ ہم یہ ظاہر کرتے ہیں کہ ہم "collect_securities" کے عنوان سے ایک پیغام بھیج رہے ہیں۔

اس صورت میں، پیغام پارٹیشن 6 پر چلا گیا - آپ کیف ڈراپ آن پر جا کر اسے چیک کر سکتے ہیں۔ localhost:9000

اپنے کارکن کے ساتھ ٹرمینل ونڈو پر جاتے ہوئے، ہم لوگورو کا استعمال کرتے ہوئے ایک خوش کن پیغام دیکھیں گے:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

ہم منگو کو بھی دیکھ سکتے ہیں (Robo3T یا Studio3T کا استعمال کرتے ہوئے) اور دیکھ سکتے ہیں کہ سیکیورٹیز ڈیٹا بیس میں ہیں:

میں ارب پتی نہیں ہوں، اور اس لیے ہم پہلے دیکھنے کے آپشن سے مطمئن ہیں۔

فاسٹ پر پس منظر کے کام، حصہ دوم: ایجنٹ اور ٹیمیں۔فاسٹ پر پس منظر کے کام، حصہ دوم: ایجنٹ اور ٹیمیں۔

خوشی اور خوشی - پہلا ایجنٹ تیار ہے :)

ایجنٹ تیار، نئے ایجنٹ زندہ باد!

جی ہاں، حضرات، ہم نے اس مضمون کے ذریعہ تیار کردہ راستے کا صرف 1/3 احاطہ کیا ہے، لیکن ہمت نہ ہاریں، کیونکہ اب یہ آسان ہو جائے گا۔

لہذا اب ہمیں ایک ایجنٹ کی ضرورت ہے جو میٹا معلومات جمع کرے اور اسے جمع کرنے والے دستاویز میں ڈالے:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

چونکہ یہ ایجنٹ ایک مخصوص سیکیورٹی کے بارے میں معلومات پر کارروائی کرے گا، اس لیے ہمیں پیغام میں اس سیکیورٹی کے ٹکر (علامت) کی نشاندہی کرنے کی ضرورت ہے۔ اس مقصد کے لیے فاسٹ میں موجود ہیں۔ ریکارڈز - وہ کلاسیں جو ایجنٹ کے عنوان میں پیغام اسکیم کا اعلان کرتی ہیں۔

اس صورت میں، چلو چلتے ہیں records.py اور بیان کریں کہ اس موضوع کے لیے پیغام کیسا نظر آنا چاہیے:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

جیسا کہ آپ نے اندازہ لگایا ہوگا، فاسٹ میسج اسکیما کو بیان کرنے کے لیے python ٹائپ تشریح کا استعمال کرتا ہے، یہی وجہ ہے کہ لائبریری کے ذریعہ تعاون یافتہ کم از کم ورژن 3.6.

آئیے ایجنٹ کی طرف لوٹتے ہیں، اقسام سیٹ کریں اور اسے شامل کریں:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

جیسا کہ آپ دیکھ سکتے ہیں، ہم ایک نیا پیرامیٹر ایک اسکیم کے ساتھ موضوع کے آغاز کے طریقہ کار میں منتقل کرتے ہیں - value_type۔ اس کے علاوہ، سب کچھ ایک ہی اسکیم کی پیروی کرتا ہے، لہذا مجھے کسی اور چیز پر غور کرنے کا کوئی فائدہ نظر نہیں آتا ہے۔

ٹھیک ہے، حتمی ٹچ میٹا معلومات جمع کرنے والے ایجنٹ کو collect_securitites کے لیے کال شامل کرنا ہے:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

ہم پیغام کے لیے پہلے اعلان کردہ اسکیم کا استعمال کرتے ہیں۔ اس معاملے میں، میں نے .cast طریقہ استعمال کیا کیونکہ ہمیں ایجنٹ کے نتیجے کا انتظار کرنے کی ضرورت نہیں ہے، لیکن یہ بات قابل ذکر ہے کہ طریقے موضوع پر پیغام بھیجیں:

  1. کاسٹ - بلاک نہیں کرتا کیونکہ اس سے نتیجہ کی توقع نہیں ہوتی ہے۔ آپ نتیجہ کسی دوسرے موضوع پر بطور پیغام نہیں بھیج سکتے۔

  2. send - بلاک نہیں کرتا کیونکہ اس سے نتیجہ کی توقع نہیں ہوتی۔ آپ اس موضوع میں ایک ایجنٹ کی وضاحت کر سکتے ہیں جس پر نتیجہ آئے گا۔

  3. پوچھیں - نتیجہ کا انتظار کریں۔ آپ اس موضوع میں ایک ایجنٹ کی وضاحت کر سکتے ہیں جس پر نتیجہ آئے گا۔

تو، آج کے لیے ایجنٹوں کے ساتھ بس اتنا ہی ہے!

خوابوں کی ٹیم

آخری چیز جس کا میں نے اس حصے میں لکھنے کا وعدہ کیا تھا وہ کمانڈز ہیں۔ جیسا کہ پہلے ذکر کیا گیا ہے، فاسٹ میں کمانڈز کلک کے ارد گرد ایک ریپر ہیں۔ درحقیقت، faust -A کلید کی وضاحت کرتے وقت ہماری کسٹم کمانڈ کو اس کے انٹرفیس سے جوڑتا ہے۔

اعلان کردہ ایجنٹوں کے بعد agents.py ڈیکوریٹر کے ساتھ ایک فنکشن شامل کریں۔ app.commandطریقہ کار کو کال کرنا ڈال у جمع_سیکیورٹائٹس:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

اس طرح، اگر ہم کمانڈز کی فہرست کو کہتے ہیں، تو ہماری نئی کمانڈ اس میں ہوگی:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

ہم اسے کسی اور کی طرح استعمال کر سکتے ہیں، تو آئیے فاسٹ ورکر کو دوبارہ شروع کریں اور سیکیورٹیز کا ایک مکمل مجموعہ شروع کریں:

> faust -A horton.agents start-collect-securities

آگے کیا ہوگا؟

اگلے حصے میں، بقیہ ایجنٹوں کو بطور مثال استعمال کرتے ہوئے، ہم سال کے لیے ٹریڈنگ کی اختتامی قیمتوں اور ایجنٹوں کے کرون لانچ میں غلو کو تلاش کرنے کے لیے سنک میکانزم پر غور کریں گے۔

آج کیلئے بس اتنا ہی! پڑھنے کا شکریہ :)

اس حصے کے لیے کوڈ

فاسٹ پر پس منظر کے کام، حصہ دوم: ایجنٹ اور ٹیمیں۔

PS آخری حصے کے تحت مجھ سے فاسٹ اور سنگم کافکا کے بارے میں پوچھا گیا تھا (سنگم میں کیا خصوصیات ہیں؟)۔ ایسا لگتا ہے کہ confluent بہت سے طریقوں سے زیادہ فعال ہے، لیکن حقیقت یہ ہے کہ faust کو confluent کے لیے کلائنٹ کی مکمل حمایت حاصل نہیں ہے - یہ مندرجہ ذیل ہے دستاویز میں کلائنٹ کی پابندیوں کی تفصیل.

ماخذ: www.habr.com

نیا تبصرہ شامل کریں