फ़ॉस्ट पर पृष्ठभूमि कार्य, भाग II: एजेंट और टीमें

फ़ॉस्ट पर पृष्ठभूमि कार्य, भाग II: एजेंट और टीमें

लेख-सूची

  1. भाग I: परिचय

  2. भाग II: एजेंट और टीमें

हम यहां क्या कर रहे हैं?

तो, तो, दूसरा भाग। जैसा कि पहले लिखा गया है, इसमें हम निम्नलिखित कार्य करेंगे:

  1. आइए aiohttp पर अल्फ़ावेंटेज के लिए एक छोटा क्लाइंट लिखें जिसमें हमें आवश्यक अंतिम बिंदुओं के अनुरोध हों।

  2. आइए एक एजेंट बनाएं जो प्रतिभूतियों पर डेटा और उन पर मेटा जानकारी एकत्र करेगा।

लेकिन, यह वही है जो हम प्रोजेक्ट के लिए करेंगे, और फॉस्ट रिसर्च के संदर्भ में, हम सीखेंगे कि एजेंटों को कैसे लिखना है जो काफ्का से स्ट्रीम घटनाओं को संसाधित करते हैं, साथ ही हमारे मामले में कमांड (क्लिक रैपर) कैसे लिखते हैं - उस विषय पर मैन्युअल पुश संदेशों के लिए जिसकी एजेंट निगरानी कर रहा है।

ट्रेनिंग

अल्फ़ावेंटेज क्लाइंट

सबसे पहले, अल्फावेटेज के अनुरोधों के लिए एक छोटा aiohttp क्लाइंट लिखें।

alphavantage.py

लुटेरा

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

दरअसल, इससे सब कुछ स्पष्ट है:

  1. अल्फ़ावेंटेज एपीआई काफी सरल और खूबसूरती से डिज़ाइन किया गया है, इसलिए मैंने इस विधि के माध्यम से सभी अनुरोध करने का निर्णय लिया construct_query जहां बदले में एक http कॉल होती है।

  2. मैं सभी फ़ील्ड लाता हूँ snake_case सहूलियत के लिए।

  3. खैर, सुंदर और जानकारीपूर्ण ट्रेसबैक आउटपुट के लिए लॉगर.कैच सजावट।

PS स्थानीय रूप से config.yml में अल्फ़ावांटेज टोकन जोड़ना या पर्यावरण चर निर्यात करना न भूलें HORTON_SERVICE_APIKEY. हमें एक टोकन मिलता है यहां.

सीआरयूडी वर्ग

प्रतिभूतियों के बारे में मेटा जानकारी संग्रहीत करने के लिए हमारे पास प्रतिभूतियों का संग्रह होगा।

डेटाबेस/सुरक्षा.py

मेरी राय में, यहां कुछ भी समझाने की आवश्यकता नहीं है, और बेस क्लास स्वयं काफी सरल है।

एप पाओ()

आइए एप्लिकेशन ऑब्जेक्ट बनाने के लिए एक फ़ंक्शन जोड़ें ऐप.पी.ई

लुटेरा

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

अभी हमारे पास सबसे सरल एप्लिकेशन निर्माण होगा, थोड़ी देर बाद हम इसका विस्तार करेंगे, हालांकि, आपको इंतजार न कराने के लिए, यहां प्रतिक्रिया दें संदर्भ ऐप-क्लास के लिए। मैं आपको सेटिंग क्लास पर एक नज़र डालने की भी सलाह देता हूं, क्योंकि यह अधिकांश सेटिंग्स के लिए जिम्मेदार है।

मुख्य भाग

प्रतिभूतियों की सूची एकत्र करने और बनाए रखने के लिए एजेंट

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

तो, सबसे पहले हमें फ़ॉस्ट एप्लिकेशन ऑब्जेक्ट मिलता है - यह काफी सरल है। इसके बाद, हम स्पष्ट रूप से अपने एजेंट के लिए एक विषय घोषित करते हैं... यहां यह उल्लेख करना उचित है कि यह क्या है, आंतरिक पैरामीटर क्या है और इसे अलग तरीके से कैसे व्यवस्थित किया जा सकता है।

  1. काफ्का में विषय, यदि हम सटीक परिभाषा जानना चाहते हैं, तो इसे पढ़ना बेहतर है बंद। दस्तावेज़, या आप पढ़ सकते हैं सारांश रूसी में हब पर, जहां सब कुछ काफी सटीक रूप से परिलक्षित होता है :)

  2. पैरामीटर आंतरिक, फ़ॉस्ट डॉक में काफी अच्छी तरह से वर्णित है, हमें विषय को सीधे कोड में कॉन्फ़िगर करने की अनुमति देता है, निश्चित रूप से, इसका मतलब है कि फ़ॉस्ट डेवलपर्स द्वारा प्रदान किए गए पैरामीटर, उदाहरण के लिए: अवधारण, अवधारण नीति (डिफ़ॉल्ट रूप से हटाएं, लेकिन आप सेट कर सकते हैं सघन), प्रति विषय विभाजन की संख्या (बीसियोंउदाहरण के लिए, इससे कम करना वैश्विक महत्व एप्लिकेशन फ़ॉस्ट)।

  3. सामान्य तौर पर, एजेंट वैश्विक मूल्यों के साथ एक प्रबंधित विषय बना सकता है, हालांकि, मैं सब कुछ स्पष्ट रूप से घोषित करना पसंद करता हूं। इसके अलावा, एजेंट विज्ञापन में विषय के कुछ पैरामीटर (उदाहरण के लिए, विभाजन की संख्या या अवधारण नीति) को कॉन्फ़िगर नहीं किया जा सकता है।

    विषय को मैन्युअल रूप से परिभाषित किए बिना यह कैसा दिख सकता है:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

खैर, अब आइए बताएं कि हमारा एजेंट क्या करेगा :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

इसलिए, एजेंट की शुरुआत में, हम अपने क्लाइंट के माध्यम से अनुरोधों के लिए एक aiohttp सत्र खोलते हैं। इस प्रकार, एक कार्यकर्ता शुरू करते समय, जब हमारा एजेंट लॉन्च किया जाता है, तो एक सत्र तुरंत खोला जाएगा - एक, पूरे समय कार्यकर्ता चल रहा है (या कई, यदि आप पैरामीटर बदलते हैं संगामिति एक डिफ़ॉल्ट इकाई वाले एजेंट से)।

इसके बाद, हम स्ट्रीम का अनुसरण करते हैं (हम संदेश डालते हैं _, चूंकि हम, इस एजेंट में, अपने विषय के संदेशों की सामग्री की परवाह नहीं करते हैं, यदि वे वर्तमान ऑफसेट पर मौजूद हैं, अन्यथा हमारा चक्र उनके आगमन की प्रतीक्षा करेगा। खैर, हमारे लूप के अंदर, हम संदेश की रसीद लॉग करते हैं, सक्रिय की एक सूची प्राप्त करते हैं (get_securities केवल डिफ़ॉल्ट रूप से सक्रिय होती है, क्लाइंट कोड देखें) प्रतिभूतियों और इसे डेटाबेस में सहेजते हैं, जांचते हैं कि क्या समान टिकर के साथ कोई सुरक्षा है और डेटाबेस में विनिमय, अगर वहाँ है, तो यह (कागज) बस अद्यतन किया जाएगा।

आइए अपनी रचना लॉन्च करें!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

पीएस सुविधाएँ वेब घटक मैं लेखों में फ़ॉस्ट पर विचार नहीं करूंगा, इसलिए हमने उपयुक्त ध्वज स्थापित किया है।

हमारे लॉन्च कमांड में, हमने फ़ॉस्ट को बताया कि एप्लिकेशन ऑब्जेक्ट को कहां देखना है और जानकारी लॉग आउटपुट स्तर के साथ इसके साथ क्या करना है (एक कार्यकर्ता को लॉन्च करना)। हमें निम्नलिखित आउटपुट मिलता है:

लुटेरा

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

यह जीवित है!!!

आइए विभाजन सेट को देखें. जैसा कि हम देख सकते हैं, एक विषय उस नाम से बनाया गया था जिसे हमने कोड में निर्दिष्ट किया था, विभाजन की डिफ़ॉल्ट संख्या (8, से ली गई) विषय_विभाजन - एप्लिकेशन ऑब्जेक्ट पैरामीटर), चूंकि हमने अपने विषय के लिए (विभाजन के माध्यम से) कोई व्यक्तिगत मान निर्दिष्ट नहीं किया है। वर्कर में लॉन्च किए गए एजेंट को सभी 8 विभाजन सौंपे गए हैं, क्योंकि यह एकमात्र है, लेकिन क्लस्टरिंग के बारे में भाग में इस पर अधिक विस्तार से चर्चा की जाएगी।

खैर, अब हम दूसरे टर्मिनल विंडो पर जा सकते हैं और अपने विषय पर एक खाली संदेश भेज सकते हैं:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

पी.एस. का उपयोग कर रहा हूँ @ हम दिखाते हैं कि हम "collect_securities" नामक विषय पर एक संदेश भेज रहे हैं।

इस स्थिति में, संदेश पार्टीशन 6 पर चला गया - आप इसे kafdrop on पर जाकर जांच सकते हैं localhost:9000

अपने कार्यकर्ता के साथ टर्मिनल विंडो पर जाकर, हम लॉगुरू का उपयोग करके भेजा गया एक सुखद संदेश देखेंगे:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

हम मोंगो (रोबो3टी या स्टूडियो3टी का उपयोग करके) पर भी गौर कर सकते हैं और देख सकते हैं कि प्रतिभूतियाँ डेटाबेस में हैं:

मैं अरबपति नहीं हूं, और इसलिए हम पहले देखने के विकल्प से संतुष्ट हैं।

फ़ॉस्ट पर पृष्ठभूमि कार्य, भाग II: एजेंट और टीमेंफ़ॉस्ट पर पृष्ठभूमि कार्य, भाग II: एजेंट और टीमें

खुशी और आनंद - पहला एजेंट तैयार है :)

एजेंट तैयार, नये एजेंट जिंदाबाद!

हां, सज्जनों, हमने इस लेख द्वारा तैयार किए गए पथ का केवल 1/3 भाग ही कवर किया है, लेकिन निराश न हों, क्योंकि अब यह आसान हो जाएगा।

तो अब हमें एक ऐसे एजेंट की आवश्यकता है जो मेटा जानकारी एकत्र करे और उसे एक संग्रह दस्तावेज़ में रखे:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

चूँकि यह एजेंट एक विशिष्ट सुरक्षा के बारे में जानकारी संसाधित करेगा, इसलिए हमें संदेश में इस सुरक्षा के टिकर (प्रतीक) को इंगित करना होगा। इस प्रयोजन के लिए फ़ॉस्ट में हैं अभिलेख - कक्षाएं जो एजेंट विषय में संदेश योजना घोषित करती हैं।

इस मामले में, आइए चलते हैं रिकॉर्ड्स.py और वर्णन करें कि इस विषय के लिए संदेश कैसा दिखना चाहिए:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

जैसा कि आपने अनुमान लगाया होगा, फ़ॉस्ट संदेश स्कीमा का वर्णन करने के लिए पायथन प्रकार के एनोटेशन का उपयोग करता है, यही कारण है कि लाइब्रेरी द्वारा समर्थित न्यूनतम संस्करण है 3.6.

आइए एजेंट पर वापस लौटें, प्रकार सेट करें और इसे जोड़ें:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

जैसा कि आप देख सकते हैं, हम विषय आरंभीकरण विधि में एक योजना के साथ एक नया पैरामीटर पास करते हैं - value_type। इसके अलावा, हर चीज़ एक ही योजना का अनुसरण करती है, इसलिए मुझे किसी और चीज़ पर ध्यान देने का कोई मतलब नहीं दिखता।

खैर, अंतिम स्पर्श मेटा सूचना संग्रह एजेंट को Collect_securitites में एक कॉल जोड़ना है:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

हम संदेश के लिए पूर्व घोषित योजना का उपयोग करते हैं। इस मामले में, मैंने .कास्ट विधि का उपयोग किया क्योंकि हमें एजेंट से परिणाम की प्रतीक्षा करने की आवश्यकता नहीं है, लेकिन यह उल्लेख करने योग्य है способов विषय पर एक संदेश भेजें:

  1. कास्ट - ब्लॉक नहीं करता क्योंकि यह किसी परिणाम की अपेक्षा नहीं करता है। आप परिणाम को किसी अन्य विषय पर संदेश के रूप में नहीं भेज सकते.

  2. भेजें - ब्लॉक नहीं करता क्योंकि यह परिणाम की अपेक्षा नहीं करता है। आप उस विषय में एक एजेंट निर्दिष्ट कर सकते हैं जिस पर परिणाम जाएगा।

  3. पूछना - परिणाम की प्रतीक्षा करता है। आप उस विषय में एक एजेंट निर्दिष्ट कर सकते हैं जिस पर परिणाम जाएगा।

तो, आज एजेंटों के लिए बस इतना ही!

सर्वश्रेष्ठ टीम

आखिरी चीज़ जो मैंने इस भाग में लिखने का वादा किया था वह है आदेश। जैसा कि पहले उल्लेख किया गया है, फ़ॉस्ट में कमांड क्लिक के चारों ओर एक आवरण हैं। वास्तव में, -A कुंजी निर्दिष्ट करते समय faust बस हमारे कस्टम कमांड को इसके इंटरफ़ेस से जोड़ देता है

एजेंटों की घोषणा के बाद Agents.py डेकोरेटर के साथ एक फ़ंक्शन जोड़ें ऐप.कमांडविधि को कॉल करना डालना у कलेक्ट_सिक्योरिटीज़:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

इस प्रकार, यदि हम आदेशों की सूची कहते हैं, तो हमारा नया आदेश उसमें होगा:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

हम इसे किसी अन्य की तरह उपयोग कर सकते हैं, तो आइए फ़ॉस्ट वर्कर को पुनः आरंभ करें और प्रतिभूतियों का पूर्ण संग्रह शुरू करें:

> faust -A horton.agents start-collect-securities

आगे क्या होगा?

अगले भाग में, शेष एजेंटों को एक उदाहरण के रूप में उपयोग करते हुए, हम वर्ष के लिए व्यापार की समापन कीमतों और एजेंटों के क्रॉन लॉन्च में चरम सीमाओं की खोज के लिए सिंक तंत्र पर विचार करेंगे।

यह सभी आज के लिए है! पढ़ने के लिए धन्यवाद :)

इस भाग के लिए कोड

फ़ॉस्ट पर पृष्ठभूमि कार्य, भाग II: एजेंट और टीमें

पीएस पिछले भाग के तहत मुझसे फॉस्ट और कंफ्लुएंट काफ्का के बारे में पूछा गया था (कंफ्लुएंट में क्या विशेषताएं हैं?). ऐसा लगता है कि कंफ्लुएंट कई मायनों में अधिक कार्यात्मक है, लेकिन तथ्य यह है कि फॉस्ट के पास कंफ्लुएंट के लिए पूर्ण ग्राहक समर्थन नहीं है - यह इस प्रकार है दस्तावेज़ में ग्राहक प्रतिबंधों का विवरण.

स्रोत: www.habr.com

एक टिप्पणी जोड़ें