فاسٽ تي پس منظر جا ڪم، حصو II: ايجنٽ ۽ ٽيمون

فاسٽ تي پس منظر جا ڪم، حصو II: ايجنٽ ۽ ٽيمون

مضمونن جو جدول

  1. حصو I: تعارف

  2. حصو II: ايجنٽ ۽ ٽيمون

اسان هتي ڇا ڪري رهيا آهيون؟

تنهن ڪري، ٻيو حصو. جيئن اڳ ۾ لکيو ويو آهي، ان ۾ اسين هيٺيان ڪنداسين:

  1. اچو ته aiohttp تي alphavantage لاءِ ھڪڙو ننڍڙو ڪلائنٽ لکون جن جي اسان کي ضرورت آھي آخري پوائنٽن جي درخواستن سان.

  2. اچو ته هڪ ايجنٽ ٺاهيو جيڪو انهن تي سيڪيورٽيز ۽ ميٽا معلومات تي ڊيٽا گڏ ڪندو.

پر، اھو اھو آھي جيڪو اسان پاڻ پروجيڪٽ لاءِ ڪنداسين، ۽ فاسٽ ريسرچ جي لحاظ کان، اسين ڄاڻنداسين ته ايجنٽ ڪيئن لکجي جيڪي ڪافڪا کان واقعن کي اسٽريم ڪن ٿا، ۽ ان سان گڏ ڪمانڊ ڪيئن لکجن (ڪِلڪ ريپر)، اسان جي ڪيس ۾ - دستي پش پيغامن لاءِ انهي موضوع تي جيڪو ايجنٽ مانيٽر ڪري رهيو آهي.

جي تياري

AlphaVantage ڪلائنٽ

پهرين، اچو ته هڪ ننڍڙو aiohttp ڪلائنٽ لکون درخواستن لاءِ alphavantage.

alphavantage.py

اسپيلر

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

حقيقت ۾، ان مان هر شيء واضح آهي:

  1. AlphaVantage API بلڪل سادو ۽ خوبصورت طور تي ٺهيل آهي، تنهنڪري مون طريقي سان سڀني درخواستن کي ٺاهڻ جو فيصلو ڪيو construct_query جتي موڙ ۾ هڪ http ڪال آهي.

  2. مان سڀني شعبن کي آڻيان ٿو snake_case سهولت لاءِ.

  3. خير، logger.catch سينگار لاءِ خوبصورت ۽ معلوماتي ٽريڪ بيڪ آئوٽ.

PS config.yml ۾ مقامي طور تي الفاوانٽيج ٽوڪن شامل ڪرڻ نه وساريو، يا ماحولياتي متغير برآمد ڪريو HORTON_SERVICE_APIKEY. اسان هڪ ٽوڪن وصول ڪندا آهيون هتي.

CRUD ڪلاس

اسان وٽ سيڪيورٽيز جو مجموعو هوندو جيڪو سيڪيورٽيز بابت ميٽا معلومات کي محفوظ ڪرڻ لاءِ.

database/security.py

منهنجي خيال ۾، هتي ڪجهه به وضاحت ڪرڻ جي ڪا ضرورت ناهي، ۽ بنيادي ڪلاس پاڻ بلڪل سادو آهي.

get_app()

اچو ته هڪ ايپليڪيشن شئي ٺاهڻ لاءِ هڪ فنڪشن شامل ڪريون app.py

اسپيلر

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

ھاڻي اسان وٽ آسان ترين ايپليڪيشن ٺاھڻ ھوندي، ٿوري دير کان پوءِ اسين ان کي وڌائينداسين، پر توھان کي انتظار ۾ رکڻ لاءِ، ھتي حوالا ايپ ڪلاس تائين. مان توهان کي صلاح ڏيان ٿو ته هڪ نظر وٺو سيٽنگون ڪلاس، ڇاڪاڻ ته اهو اڪثر سيٽنگون لاء ذميوار آهي.

مکيه حصو

سيڪيورٽيز جي لسٽ گڏ ڪرڻ ۽ برقرار رکڻ لاءِ ايجنٽ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

تنهن ڪري، پهرين اسان حاصل ڪيو faust ايپليڪيشن اعتراض - اهو بلڪل سادو آهي. اڳيون، اسان واضح طور تي اسان جي ايجنٽ لاءِ هڪ موضوع جو اعلان ڪريون ٿا... هتي اهو ذڪر ڪرڻ جي قابل آهي ته اهو ڇا آهي، اندروني پيٽرولر ڇا آهي ۽ ان کي مختلف طريقي سان ڪيئن ترتيب ڏئي سگهجي ٿو.

  1. ڪافڪا ۾ عنوان، جيڪڏهن اسان صحيح تعريف ڄاڻڻ چاهيون ٿا، اهو پڙهڻ بهتر آهي بند دستاويز، يا توهان پڙهي سگهو ٿا خلاصو روسي ۾ Habré تي، جتي هر شيء بلڪل صحيح طور تي ظاهر ٿئي ٿي :)

  2. اندروني پيٽرولر, faust doc ۾ چڱي طرح بيان ڪيو ويو آهي، اسان کي موضوع کي سڌو سنئون ڪوڊ ۾ ترتيب ڏيڻ جي اجازت ڏئي ٿو، يقينا، هن جو مطلب آهي فاسٽ ڊولپرز پاران مهيا ڪيل پيٽرولر، مثال طور: برقرار رکڻ، برقرار رکڻ واري پاليسي (ڊفالٽ طور تي حذف ڪريو، پر توهان سيٽ ڪري سگهو ٿا. (توريت)) في موضوع جي ورهاڱي جو تعداد (ڀاڱاڪرڻ، مثال طور، کان گهٽ عالمي اهميت ايپليڪيشن فاسٽ).

  3. عام طور تي، ايجنٽ عالمي قدرن سان منظم موضوع ٺاهي سگھي ٿو، جڏهن ته، مان هر شي کي واضح طور تي بيان ڪرڻ چاهيان ٿو. ان کان علاوه، ايجنٽ جي اشتهار ۾ موضوع جا ڪجهه پيرا ميٽرز (مثال طور، ورهاڱي جو تعداد يا برقرار رکڻ واري پاليسي) ترتيب نه ٿي سگھي.

    ھتي اھو آھي جيڪو دستي طور تي موضوع جي وضاحت ڪرڻ کان سواء نظر اچي سگھي ٿو:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

چڱو، هاڻي اچو ته بيان ڪريون ته اسان جو ايجنٽ ڇا ڪندو :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

تنهن ڪري، ايجنٽ جي شروعات ۾، اسان پنهنجي ڪلائنٽ جي ذريعي درخواستن لاء هڪ aiohttp سيشن کوليو. اهڙيء طرح، جڏهن هڪ ڪم ڪندڙ شروع ڪيو ويندو، جڏهن اسان جو ايجنٽ شروع ڪيو ويندو، هڪ سيشن فوري طور تي کوليو ويندو - هڪ، سڄي وقت لاء ڪم ڪندڙ هلائي رهيو آهي (يا ڪيترائي، جيڪڏهن توهان پيٽرولر کي تبديل ڪندا آهيو. مطابقت هڪ ايجنٽ کان ڊفالٽ يونٽ سان).

اڳيون، اسان وهڪرو جي پيروي ڪندا آهيون (اسان پيغام رکون ٿا _، جيئن ته اسان، هن ايجنٽ ۾، اسان جي موضوع جي پيغامن جي مواد جي پرواهه ناهي، جيڪڏهن اهي موجوده آفيس تي موجود آهن، ٻي صورت ۾ اسان جو چڪر انهن جي اچڻ جو انتظار ڪندو. خير، اسان جي لوپ جي اندر، اسان پيغام جي رسيد کي لاگ ان ڪريون ٿا، فعال جي فهرست حاصل ڪريو (get_securities صرف ڊفالٽ طور تي فعال طور تي، ڏسو ڪلائنٽ ڪوڊ ڏسو) سيڪيورٽيز ۽ ان کي ڊيٽابيس ۾ محفوظ ڪريو، چيڪ ڪريو ته ڇا ساڳئي ٽڪر سان سيڪيورٽي آهي ۽ ڊيٽابيس ۾ مٽائي، جيڪڏهن اتي آهي، پوء اهو (پيپر) صرف اپڊيٽ ڪيو ويندو.

اچو ته اسان جي تخليق کي شروع ڪريون!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

پي ايس خاصيتون ويب جزو آئون آرٽيڪل ۾ فاسٽ تي غور نه ڪندس، تنهنڪري اسان مناسب پرچم مقرر ڪيو.

اسان جي لانچ ڪمانڊ ۾، اسان فاسٽ کي ٻڌايو ته ايپليڪيشن اعتراض کي ڪٿي ڳولڻ ۽ ان سان ڇا ڪجي (هڪ ورڪر لانچ ڪريو) انفارميشن لاگ آئوٽ ليول سان. اسان هيٺ ڏنل پيداوار حاصل ڪندا آهيون:

اسپيلر

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

اهو زنده آهي !!!

اچو ته ورهاڱي جي سيٽ کي ڏسو. جيئن اسان ڏسي سگهون ٿا، هڪ موضوع ٺاهيو ويو نالي سان جنهن کي اسان ڪوڊ ۾ نامزد ڪيو آهي، ڀاڱن جو ڊفالٽ نمبر (8، مان ورتو ويو. topic_partitions - ايپليڪيشن اعتراض پيٽرولر)، ڇاڪاڻ ته اسان اسان جي موضوع لاء هڪ انفرادي قدر بيان نه ڪيو آهي (پارٽيشن ذريعي). ڪم ڪندڙ ۾ شروع ٿيل ايجنٽ سڀني 8 ورهاڱي کي مقرر ڪيو ويو آهي، ڇاڪاڻ ته اهو صرف هڪ آهي، پر اهو ڪلستر جي باري ۾ وڌيڪ تفصيل سان بحث ڪيو ويندو.

خير، هاڻي اسان ٻئي ٽرمينل ونڊو ڏانهن وڃو ۽ اسان جي موضوع تي هڪ خالي پيغام موڪليو:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

پي ايس استعمال ڪندي @ اسان ڏيکاريون ٿا ته اسان "collect_securities" نالي هڪ موضوع ڏانهن پيغام موڪلي رهيا آهيون.

انهي صورت ۾، پيغام ورهاڱي 6 ڏانهن ويو - توهان هن کي چيڪ ڪري سگهو ٿا ڪيفڊراپ تي وڃڻ سان localhost:9000

اسان جي ڪم ڪندڙ سان ٽرمينل ونڊو ڏانهن وڃو، اسان ڏسندا سين ته هڪ خوشيء جو پيغام موڪليو ويو آهي لوگو استعمال ڪندي:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

اسان مونگو ۾ پڻ ڏسي سگهون ٿا (Robo3T يا Studio3T استعمال ڪندي) ۽ ڏسو ته سيڪيورٽيز ڊيٽابيس ۾ آهن:

مان هڪ ارب پتي نه آهيان، ۽ تنهنڪري اسان پهرين ڏسڻ جي اختيار سان مطمئن آهيون.

فاسٽ تي پس منظر جا ڪم، حصو II: ايجنٽ ۽ ٽيمونفاسٽ تي پس منظر جا ڪم، حصو II: ايجنٽ ۽ ٽيمون

خوشي ۽ خوشي - پهريون ايجنٽ تيار آهي :)

ايجنٽ تيار، نئين ايجنٽ ڊگھي رهو!

ها، حضرات، اسان هن آرٽيڪل پاران تيار ڪيل رستي جو صرف 1/3 ڍڪيو آهي، پر مايوس نه ٿيو، ڇو ته هاڻي اهو آسان ٿي ويندو.

تنهن ڪري هاڻي اسان کي هڪ ايجنٽ جي ضرورت آهي جيڪا ميٽا معلومات گڏ ڪري ۽ ان کي گڏ ڪرڻ واري دستاويز ۾ وجهي:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

جيئن ته هي ايجنٽ هڪ مخصوص سيڪيورٽي بابت معلومات تي عمل ڪندو، اسان کي پيغام ۾ هن سيڪيورٽي جي ٽڪر (علامت) کي ظاهر ڪرڻ جي ضرورت آهي. هن مقصد لاء، اتي موجود آهن رڪارڊ - طبقن جو اعلان ڪري ٿو پيغام اسڪيم کي ايجنٽ موضوع ۾.

انهي حالت ۾، اچو ته وڃو records.py ۽ بيان ڪريو ته هن موضوع لاءِ پيغام ڇا ڏسڻ گهرجي:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

جئين توهان اندازو لڳايو هوندو، فاسٽ پيغام اسڪيما کي بيان ڪرڻ لاءِ پٿون قسم جي تشريح استعمال ڪري ٿو، اهو ئي سبب آهي ته لائبريري طرفان سپورٽ ڪيل گهٽ ۾ گهٽ ورزن 3.6.

اچو ته ايجنٽ ڏانھن واپس وڃو، قسم مقرر ڪريو ۽ ان کي شامل ڪريو:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

جئين توهان ڏسي سگهو ٿا، اسان هڪ اسڪيم سان هڪ نئون پيٽرولر پاس ڪريون ٿا موضوع جي شروعات واري طريقي سان - value_type. ان کان علاوه، هر شيء ساڳئي اسڪيم جي پٺيان آهي، تنهنڪري مون کي ڪنهن به شيء تي رهڻ ۾ ڪو به نقطو نظر نٿو اچي.

خير، آخري رابطي ميٽا معلومات گڏ ڪرڻ واري ايجنٽ کي ڪال شامل ڪرڻ آهي collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

اسان پيغام لاءِ اڳئين اعلان ڪيل اسڪيم استعمال ڪريون ٿا. انهي صورت ۾، مون .cast طريقو استعمال ڪيو ڇو ته اسان کي ايجنٽ جي نتيجن جو انتظار ڪرڻ جي ضرورت ناهي، پر اهو قابل ذڪر آهي ته طريقا موضوع ڏانهن پيغام موڪليو:

  1. cast - بلاڪ نٿو ڪري ڇاڪاڻ ته اهو نتيجو جي اميد نٿو رکي. توهان نتيجو نه موڪلي سگهو ٿا ٻئي موضوع ڏانهن پيغام جي طور تي.

  2. موڪليو - بلاڪ نٿو ڪري ڇاڪاڻ ته اهو نتيجو جي اميد نٿو رکي. توھان ھڪڙي ايجنٽ کي موضوع ۾ بيان ڪري سگھو ٿا جنھن جو نتيجو ٿيندو.

  3. پڇو - نتيجي جو انتظار. توھان ھڪڙي ايجنٽ کي موضوع ۾ بيان ڪري سگھو ٿا جنھن جو نتيجو ٿيندو.

سو، اهو سڀ ڪجهه اڄ جي ايجنٽن سان آهي!

خوابن جي ٽيم

آخري شيء مون هن حصي ۾ لکڻ جو واعدو ڪيو آهي حڪم. جيئن اڳ ذڪر ڪيو ويو آهي، فاسٽ ۾ ڪمانڊ ڪلڪ جي چوڌاري لفافي آهن. حقيقت ۾، faust صرف اسان جي ڪسٽم ڪمان کي ان جي انٽرفيس سان ڳنڍيندو آهي جڏهن -A ڪيچ کي بيان ڪندي

اعلانيل ايجنٽن کان پوءِ agents.py آرائشي سان گڏ فنڪشن شامل ڪريو app.commandسڏڻ جو طريقو وڌائين у جمع_سيڪيورٽيٽس:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

ان ڪري، جيڪڏهن اسان حڪمن جي فهرست کي سڏيندا آهيون، اسان جو نئون حڪم ان ۾ هوندو:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

اسان ان کي ڪنهن ٻئي وانگر استعمال ڪري سگهون ٿا، تنهنڪري اچو ته فاسٽ ورڪر کي ٻيهر شروع ڪريون ۽ سيڪيورٽيز جو هڪ مڪمل مجموعو شروع ڪريون:

> faust -A horton.agents start-collect-securities

اڳتي ڇا ٿيندو؟

ايندڙ حصي ۾، باقي ايجنٽن کي مثال طور استعمال ڪندي، اسان سنڪ ميڪنزم تي غور ڪنداسين ته انتها کي ڳولڻ لاءِ سال جي واپار جي بند قيمتن ۽ ايجنٽن جي ڪرون لانچ ۾.

اهو سڀ ڪجهه اڄ لاء آهي! پڙهڻ لاء مهرباني :)

هن حصي لاء ڪوڊ

فاسٽ تي پس منظر جا ڪم، حصو II: ايجنٽ ۽ ٽيمون

پي ايس جي آخري حصي ۾ مون کان فاسٽ ۽ سنگم ڪافڪا بابت پڇيو ويو (سنگم ۾ ڪهڙيون خاصيتون آهن؟). اهو لڳي ٿو ته ڪنفلوئنٽ ڪيترن ئي طريقن سان وڌيڪ ڪارائتو آهي، پر حقيقت اها آهي ته فاسٽ کي سنگم لاءِ مڪمل ڪلائنٽ سپورٽ نه آهي - هي هيٺ ڏنل آهي دستاويز ۾ ڪلائنٽ جي پابندين جي وضاحت.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو