ಫೌಸ್ಟ್‌ನಲ್ಲಿ ಹಿನ್ನೆಲೆ ಕಾರ್ಯಗಳು, ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳು

ಫೌಸ್ಟ್‌ನಲ್ಲಿ ಹಿನ್ನೆಲೆ ಕಾರ್ಯಗಳು, ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳು

ಪರಿವಿಡಿ

  1. ಭಾಗ I: ಪರಿಚಯ

  2. ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳು

ನಾವು ಇಲ್ಲಿ ಏನು ಮಾಡುತ್ತಿದ್ದೇವೆ?

ಆದ್ದರಿಂದ, ಆದ್ದರಿಂದ, ಎರಡನೇ ಭಾಗ. ಮೊದಲೇ ಬರೆದಂತೆ, ಅದರಲ್ಲಿ ನಾವು ಈ ಕೆಳಗಿನವುಗಳನ್ನು ಮಾಡುತ್ತೇವೆ:

  1. ನಮಗೆ ಅಗತ್ಯವಿರುವ ಅಂತಿಮ ಬಿಂದುಗಳಿಗಾಗಿ ವಿನಂತಿಗಳೊಂದಿಗೆ aiohttp ನಲ್ಲಿ ಆಲ್ಫಾವಾಂಟೇಜ್‌ಗಾಗಿ ಸಣ್ಣ ಕ್ಲೈಂಟ್ ಅನ್ನು ಬರೆಯೋಣ.

  2. ಸೆಕ್ಯುರಿಟೀಸ್ ಮತ್ತು ಅವುಗಳ ಮೇಲಿನ ಮೆಟಾ ಮಾಹಿತಿಯನ್ನು ಸಂಗ್ರಹಿಸುವ ಏಜೆಂಟ್ ಅನ್ನು ರಚಿಸೋಣ.

ಆದರೆ, ಪ್ರಾಜೆಕ್ಟ್‌ಗಾಗಿ ನಾವು ಇದನ್ನು ಮಾಡುತ್ತೇವೆ ಮತ್ತು ಫಸ್ಟ್ ಸಂಶೋಧನೆಯ ವಿಷಯದಲ್ಲಿ, ಕಾಫ್ಕಾದಿಂದ ಈವೆಂಟ್‌ಗಳನ್ನು ಸ್ಟ್ರೀಮ್ ಮಾಡುವ ಏಜೆಂಟ್‌ಗಳನ್ನು ಹೇಗೆ ಬರೆಯುವುದು, ಹಾಗೆಯೇ ಆಜ್ಞೆಗಳನ್ನು ಹೇಗೆ ಬರೆಯುವುದು (ರೇಪರ್ ಕ್ಲಿಕ್ ಮಾಡಿ), ನಮ್ಮ ಸಂದರ್ಭದಲ್ಲಿ ನಾವು ಕಲಿಯುತ್ತೇವೆ - ಏಜೆಂಟ್ ಮೇಲ್ವಿಚಾರಣೆ ಮಾಡುತ್ತಿರುವ ವಿಷಯಕ್ಕೆ ಹಸ್ತಚಾಲಿತ ಪುಶ್ ಸಂದೇಶಗಳಿಗಾಗಿ.

ತರಬೇತಿ

ಆಲ್ಫಾವಾಂಟೇಜ್ ಕ್ಲೈಂಟ್

ಮೊದಲಿಗೆ, ಆಲ್ಫಾವಾಂಟೇಜ್‌ಗೆ ವಿನಂತಿಗಳಿಗಾಗಿ ಸಣ್ಣ aiohttp ಕ್ಲೈಂಟ್ ಅನ್ನು ಬರೆಯೋಣ.

alphavantage.py

ಸ್ಪಾಯ್ಲರ್

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

ವಾಸ್ತವವಾಗಿ, ಅದರಿಂದ ಎಲ್ಲವೂ ಸ್ಪಷ್ಟವಾಗಿದೆ:

  1. AlphaVantage API ಅನ್ನು ಸರಳವಾಗಿ ಮತ್ತು ಸುಂದರವಾಗಿ ವಿನ್ಯಾಸಗೊಳಿಸಲಾಗಿದೆ, ಆದ್ದರಿಂದ ನಾನು ವಿಧಾನದ ಮೂಲಕ ಎಲ್ಲಾ ವಿನಂತಿಗಳನ್ನು ಮಾಡಲು ನಿರ್ಧರಿಸಿದೆ construct_query ಅಲ್ಲಿ ಪ್ರತಿಯಾಗಿ ಒಂದು http ಕರೆ ಇರುತ್ತದೆ.

  2. ನಾನು ಎಲ್ಲಾ ಕ್ಷೇತ್ರಗಳನ್ನು ತರುತ್ತೇನೆ snake_case ಸೌಕರ್ಯಕ್ಕಾಗಿ.

  3. ಒಳ್ಳೆಯದು, ಸುಂದರವಾದ ಮತ್ತು ತಿಳಿವಳಿಕೆ ನೀಡುವ ಟ್ರೇಸ್‌ಬ್ಯಾಕ್ ಔಟ್‌ಪುಟ್‌ಗಾಗಿ ಲಾಗರ್.ಕ್ಯಾಚ್ ಅಲಂಕಾರ.

PS ಸ್ಥಳೀಯವಾಗಿ config.yml ಗೆ ಆಲ್ಫಾವಾಂಟೇಜ್ ಟೋಕನ್ ಅನ್ನು ಸೇರಿಸಲು ಮರೆಯಬೇಡಿ, ಅಥವಾ ಪರಿಸರ ವೇರಿಯಬಲ್ ಅನ್ನು ರಫ್ತು ಮಾಡಿ HORTON_SERVICE_APIKEY. ನಾವು ಟೋಕನ್ ಸ್ವೀಕರಿಸುತ್ತೇವೆ ಇಲ್ಲಿ.

CRUD ವರ್ಗ

ಸೆಕ್ಯುರಿಟಿಗಳ ಬಗ್ಗೆ ಮೆಟಾ ಮಾಹಿತಿಯನ್ನು ಸಂಗ್ರಹಿಸಲು ನಾವು ಸೆಕ್ಯುರಿಟೀಸ್ ಸಂಗ್ರಹವನ್ನು ಹೊಂದಿದ್ದೇವೆ.

ಡೇಟಾಬೇಸ್/security.py

ನನ್ನ ಅಭಿಪ್ರಾಯದಲ್ಲಿ, ಇಲ್ಲಿ ಏನನ್ನೂ ವಿವರಿಸುವ ಅಗತ್ಯವಿಲ್ಲ, ಮತ್ತು ಮೂಲ ವರ್ಗವು ತುಂಬಾ ಸರಳವಾಗಿದೆ.

get_app()

ಅಪ್ಲಿಕೇಶನ್ ವಸ್ತುವನ್ನು ರಚಿಸಲು ಕಾರ್ಯವನ್ನು ಸೇರಿಸೋಣ app.py

ಸ್ಪಾಯ್ಲರ್

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

ಸದ್ಯಕ್ಕೆ ನಾವು ಸರಳವಾದ ಅಪ್ಲಿಕೇಶನ್ ರಚನೆಯನ್ನು ಹೊಂದಿದ್ದೇವೆ, ಸ್ವಲ್ಪ ಸಮಯದ ನಂತರ ನಾವು ಅದನ್ನು ವಿಸ್ತರಿಸುತ್ತೇವೆ, ಆದಾಗ್ಯೂ, ನಿಮ್ಮನ್ನು ಕಾಯದಂತೆ ಮಾಡಲು, ಇಲ್ಲಿ ಉಲ್ಲೇಖಗಳು ಅಪ್ಲಿಕೇಶನ್-ವರ್ಗಕ್ಕೆ. ಸೆಟ್ಟಿಂಗ್‌ಗಳ ವರ್ಗವನ್ನು ನೋಡಲು ನಾನು ನಿಮಗೆ ಸಲಹೆ ನೀಡುತ್ತೇನೆ, ಏಕೆಂದರೆ ಇದು ಹೆಚ್ಚಿನ ಸೆಟ್ಟಿಂಗ್‌ಗಳಿಗೆ ಕಾರಣವಾಗಿದೆ.

ಮುಖ್ಯ ದೇಹ

ಸೆಕ್ಯೂರಿಟಿಗಳ ಪಟ್ಟಿಯನ್ನು ಸಂಗ್ರಹಿಸುವ ಮತ್ತು ನಿರ್ವಹಿಸುವ ಏಜೆಂಟ್

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ಆದ್ದರಿಂದ, ಮೊದಲು ನಾವು ಫಾಸ್ಟ್ ಅಪ್ಲಿಕೇಶನ್ ವಸ್ತುವನ್ನು ಪಡೆಯುತ್ತೇವೆ - ಇದು ತುಂಬಾ ಸರಳವಾಗಿದೆ. ಮುಂದೆ, ನಾವು ನಮ್ಮ ಏಜೆಂಟರಿಗೆ ವಿಷಯವನ್ನು ಸ್ಪಷ್ಟವಾಗಿ ಘೋಷಿಸುತ್ತೇವೆ ... ಇಲ್ಲಿ ಅದು ಏನು, ಆಂತರಿಕ ಪ್ಯಾರಾಮೀಟರ್ ಏನು ಮತ್ತು ಇದನ್ನು ಹೇಗೆ ವಿಭಿನ್ನವಾಗಿ ಜೋಡಿಸಬಹುದು ಎಂಬುದನ್ನು ನಮೂದಿಸುವುದು ಯೋಗ್ಯವಾಗಿದೆ.

  1. ಕಾಫ್ಕಾದಲ್ಲಿನ ವಿಷಯಗಳು, ನಾವು ನಿಖರವಾದ ವ್ಯಾಖ್ಯಾನವನ್ನು ತಿಳಿಯಲು ಬಯಸಿದರೆ, ಅದನ್ನು ಓದುವುದು ಉತ್ತಮ ಆರಿಸಿ. ದಾಖಲೆ, ಅಥವಾ ನೀವು ಓದಬಹುದು ಸಂಕಲನ ರಷ್ಯನ್ ಭಾಷೆಯಲ್ಲಿ ಹಬ್ರೆಯಲ್ಲಿ, ಅಲ್ಲಿ ಎಲ್ಲವೂ ಸಾಕಷ್ಟು ನಿಖರವಾಗಿ ಪ್ರತಿಫಲಿಸುತ್ತದೆ :)

  2. ಪ್ಯಾರಾಮೀಟರ್ ಆಂತರಿಕ, ಫಾಸ್ಟ್ ಡಾಕ್‌ನಲ್ಲಿ ಉತ್ತಮವಾಗಿ ವಿವರಿಸಲಾಗಿದೆ, ವಿಷಯವನ್ನು ನೇರವಾಗಿ ಕೋಡ್‌ನಲ್ಲಿ ಕಾನ್ಫಿಗರ್ ಮಾಡಲು ನಮಗೆ ಅನುಮತಿಸುತ್ತದೆ, ಸಹಜವಾಗಿ, ಇದರರ್ಥ ಫಾಸ್ಟ್ ಡೆವಲಪರ್‌ಗಳು ಒದಗಿಸಿದ ನಿಯತಾಂಕಗಳು, ಉದಾಹರಣೆಗೆ: ಧಾರಣ, ಧಾರಣ ನೀತಿ (ಡೀಫಾಲ್ಟ್ ಅಳಿಸಿ, ಆದರೆ ನೀವು ಹೊಂದಿಸಬಹುದು ಕಾಂಪ್ಯಾಕ್ಟ್), ಪ್ರತಿ ವಿಷಯಕ್ಕೆ ವಿಭಾಗಗಳ ಸಂಖ್ಯೆ (ವಿಭಾಗಗಳುಮಾಡಲು, ಉದಾಹರಣೆಗೆ, ಕಡಿಮೆ ಜಾಗತಿಕ ಪ್ರಾಮುಖ್ಯತೆ ಅಪ್ಲಿಕೇಶನ್ಗಳು ಫಾಸ್ಟ್).

  3. ಸಾಮಾನ್ಯವಾಗಿ, ಏಜೆಂಟ್ ಜಾಗತಿಕ ಮೌಲ್ಯಗಳೊಂದಿಗೆ ನಿರ್ವಹಿಸಲಾದ ವಿಷಯವನ್ನು ರಚಿಸಬಹುದು, ಆದಾಗ್ಯೂ, ನಾನು ಎಲ್ಲವನ್ನೂ ಸ್ಪಷ್ಟವಾಗಿ ಘೋಷಿಸಲು ಇಷ್ಟಪಡುತ್ತೇನೆ. ಹೆಚ್ಚುವರಿಯಾಗಿ, ಏಜೆಂಟ್ ಜಾಹೀರಾತಿನಲ್ಲಿನ ವಿಷಯದ ಕೆಲವು ನಿಯತಾಂಕಗಳನ್ನು (ಉದಾಹರಣೆಗೆ, ವಿಭಾಗಗಳ ಸಂಖ್ಯೆ ಅಥವಾ ಧಾರಣ ನೀತಿ) ಕಾನ್ಫಿಗರ್ ಮಾಡಲಾಗುವುದಿಲ್ಲ.

    ವಿಷಯವನ್ನು ಹಸ್ತಚಾಲಿತವಾಗಿ ವ್ಯಾಖ್ಯಾನಿಸದೆ ಅದು ಹೇಗಿರಬಹುದು ಎಂಬುದು ಇಲ್ಲಿದೆ:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ಸರಿ, ಈಗ ನಮ್ಮ ಏಜೆಂಟ್ ಏನು ಮಾಡಬೇಕೆಂದು ವಿವರಿಸೋಣ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

ಆದ್ದರಿಂದ, ಏಜೆಂಟ್‌ನ ಆರಂಭದಲ್ಲಿ, ನಮ್ಮ ಕ್ಲೈಂಟ್ ಮೂಲಕ ವಿನಂತಿಗಳಿಗಾಗಿ ನಾವು aiohttp ಸೆಶನ್ ಅನ್ನು ತೆರೆಯುತ್ತೇವೆ. ಹೀಗಾಗಿ, ಕೆಲಸಗಾರನನ್ನು ಪ್ರಾರಂಭಿಸುವಾಗ, ನಮ್ಮ ಏಜೆಂಟ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಿದಾಗ, ತಕ್ಷಣವೇ ಒಂದು ಅಧಿವೇಶನವನ್ನು ತೆರೆಯಲಾಗುತ್ತದೆ - ಒಂದು, ಸಂಪೂರ್ಣ ಸಮಯಕ್ಕೆ ಕೆಲಸಗಾರ ಚಾಲನೆಯಲ್ಲಿದೆ (ಅಥವಾ ಹಲವಾರು, ನೀವು ನಿಯತಾಂಕವನ್ನು ಬದಲಾಯಿಸಿದರೆ ಏಕಕಾಲೀನತೆ ಡೀಫಾಲ್ಟ್ ಘಟಕವನ್ನು ಹೊಂದಿರುವ ಏಜೆಂಟ್‌ನಿಂದ).

ಮುಂದೆ, ನಾವು ಸ್ಟ್ರೀಮ್ ಅನ್ನು ಅನುಸರಿಸುತ್ತೇವೆ (ನಾವು ಸಂದೇಶವನ್ನು ಇರಿಸುತ್ತೇವೆ _, ನಾವು, ಈ ಏಜೆಂಟ್‌ನಲ್ಲಿ, ನಮ್ಮ ವಿಷಯದ) ವಿಷಯದ ಬಗ್ಗೆ ಕಾಳಜಿ ವಹಿಸುವುದಿಲ್ಲವಾದ್ದರಿಂದ, ಪ್ರಸ್ತುತ ಆಫ್‌ಸೆಟ್‌ನಲ್ಲಿ ಅವು ಅಸ್ತಿತ್ವದಲ್ಲಿದ್ದರೆ, ಇಲ್ಲದಿದ್ದರೆ ನಮ್ಮ ಸೈಕಲ್ ಅವರ ಆಗಮನಕ್ಕಾಗಿ ಕಾಯುತ್ತದೆ. ಸರಿ, ನಮ್ಮ ಲೂಪ್ ಒಳಗೆ, ನಾವು ಸಂದೇಶದ ರಸೀದಿಯನ್ನು ಲಾಗ್ ಮಾಡುತ್ತೇವೆ, ಸಕ್ರಿಯ (get_securities ಡೀಫಾಲ್ಟ್ ಆಗಿ ಮಾತ್ರ ಸಕ್ರಿಯವಾಗಿ ಹಿಂತಿರುಗುತ್ತದೆ, ಕ್ಲೈಂಟ್ ಕೋಡ್ ಅನ್ನು ನೋಡಿ) ಸೆಕ್ಯುರಿಟಿಗಳ ಪಟ್ಟಿಯನ್ನು ಪಡೆಯಿರಿ ಮತ್ತು ಅದನ್ನು ಡೇಟಾಬೇಸ್‌ಗೆ ಉಳಿಸಿ, ಅದೇ ಟಿಕರ್‌ನೊಂದಿಗೆ ಭದ್ರತೆ ಇದೆಯೇ ಎಂದು ಪರಿಶೀಲಿಸುತ್ತದೆ ಮತ್ತು ಡೇಟಾಬೇಸ್‌ನಲ್ಲಿ ವಿನಿಮಯ , ಇದ್ದರೆ, ಅದನ್ನು (ಕಾಗದ) ಸರಳವಾಗಿ ನವೀಕರಿಸಲಾಗುತ್ತದೆ.

ನಮ್ಮ ಸೃಷ್ಟಿಯನ್ನು ಪ್ರಾರಂಭಿಸೋಣ!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

ಪಿಎಸ್ ವೈಶಿಷ್ಟ್ಯಗಳು ವೆಬ್ ಘಟಕ ನಾನು ಲೇಖನಗಳಲ್ಲಿ ಫೌಸ್ಟ್ ಅನ್ನು ಪರಿಗಣಿಸುವುದಿಲ್ಲ, ಆದ್ದರಿಂದ ನಾವು ಸೂಕ್ತವಾದ ಧ್ವಜವನ್ನು ಹೊಂದಿಸಿದ್ದೇವೆ.

ನಮ್ಮ ಲಾಂಚ್ ಕಮಾಂಡ್‌ನಲ್ಲಿ, ಮಾಹಿತಿ ಲಾಗ್ ಔಟ್‌ಪುಟ್ ಮಟ್ಟದೊಂದಿಗೆ ಅಪ್ಲಿಕೇಶನ್ ಆಬ್ಜೆಕ್ಟ್ ಅನ್ನು ಎಲ್ಲಿ ನೋಡಬೇಕು ಮತ್ತು ಅದರೊಂದಿಗೆ ಏನು ಮಾಡಬೇಕೆಂದು (ಕಾರ್ಮಿಕನನ್ನು ಪ್ರಾರಂಭಿಸಿ) ನಾವು ಫಾಸ್ಟ್‌ಗೆ ಹೇಳಿದ್ದೇವೆ. ನಾವು ಈ ಕೆಳಗಿನ ಔಟ್ಪುಟ್ ಅನ್ನು ಪಡೆಯುತ್ತೇವೆ:

ಸ್ಪಾಯ್ಲರ್

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

ಅದು ಜೀವಂತವಾಗಿದೆ!!!

ವಿಭಜನೆಯ ಸೆಟ್ ಅನ್ನು ನೋಡೋಣ. ನಾವು ನೋಡುವಂತೆ, ನಾವು ಕೋಡ್‌ನಲ್ಲಿ ಗೊತ್ತುಪಡಿಸಿದ ಹೆಸರಿನೊಂದಿಗೆ ವಿಷಯವನ್ನು ರಚಿಸಲಾಗಿದೆ, ವಿಭಾಗಗಳ ಡೀಫಾಲ್ಟ್ ಸಂಖ್ಯೆ (8, ತೆಗೆದುಕೊಳ್ಳಲಾಗಿದೆ ವಿಷಯ_ವಿಭಾಗಗಳು - ಅಪ್ಲಿಕೇಶನ್ ಆಬ್ಜೆಕ್ಟ್ ಪ್ಯಾರಾಮೀಟರ್), ನಾವು ನಮ್ಮ ವಿಷಯಕ್ಕೆ ಪ್ರತ್ಯೇಕ ಮೌಲ್ಯವನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸದ ಕಾರಣ (ವಿಭಾಗಗಳ ಮೂಲಕ). ಕೆಲಸಗಾರನಲ್ಲಿ ಪ್ರಾರಂಭಿಸಲಾದ ಏಜೆಂಟ್ ಎಲ್ಲಾ 8 ವಿಭಾಗಗಳನ್ನು ನಿಯೋಜಿಸಲಾಗಿದೆ, ಏಕೆಂದರೆ ಇದು ಒಂದೇ ಒಂದು, ಆದರೆ ಕ್ಲಸ್ಟರಿಂಗ್ ಬಗ್ಗೆ ಭಾಗದಲ್ಲಿ ಇದನ್ನು ಹೆಚ್ಚು ವಿವರವಾಗಿ ಚರ್ಚಿಸಲಾಗುವುದು.

ಸರಿ, ಈಗ ನಾವು ಇನ್ನೊಂದು ಟರ್ಮಿನಲ್ ವಿಂಡೋಗೆ ಹೋಗಬಹುದು ಮತ್ತು ನಮ್ಮ ವಿಷಯಕ್ಕೆ ಖಾಲಿ ಸಂದೇಶವನ್ನು ಕಳುಹಿಸಬಹುದು:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

ಪಿಎಸ್ ಬಳಸುತ್ತಿದೆ @ "collect_securities" ಹೆಸರಿನ ವಿಷಯಕ್ಕೆ ನಾವು ಸಂದೇಶವನ್ನು ಕಳುಹಿಸುತ್ತಿದ್ದೇವೆ ಎಂದು ನಾವು ತೋರಿಸುತ್ತೇವೆ.

ಈ ಸಂದರ್ಭದಲ್ಲಿ, ಸಂದೇಶವು ವಿಭಾಗ 6 ಕ್ಕೆ ಹೋಯಿತು - ನೀವು kafdrop on ಗೆ ಹೋಗುವ ಮೂಲಕ ಇದನ್ನು ಪರಿಶೀಲಿಸಬಹುದು localhost:9000

ನಮ್ಮ ಕೆಲಸಗಾರರೊಂದಿಗೆ ಟರ್ಮಿನಲ್ ವಿಂಡೋಗೆ ಹೋಗುವಾಗ, ಲೋಗುರು ಬಳಸಿ ಕಳುಹಿಸಲಾದ ಸಂತೋಷದ ಸಂದೇಶವನ್ನು ನಾವು ನೋಡುತ್ತೇವೆ:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

ನಾವು ಮೊಂಗೊವನ್ನು (Robo3T ಅಥವಾ Studio3T ಬಳಸಿ) ನೋಡಬಹುದು ಮತ್ತು ಸೆಕ್ಯುರಿಟಿಗಳು ಡೇಟಾಬೇಸ್‌ನಲ್ಲಿವೆ ಎಂದು ನೋಡಬಹುದು:

ನಾನು ಬಿಲಿಯನೇರ್ ಅಲ್ಲ, ಮತ್ತು ಆದ್ದರಿಂದ ನಾವು ಮೊದಲ ವೀಕ್ಷಣೆ ಆಯ್ಕೆಯೊಂದಿಗೆ ತೃಪ್ತರಾಗಿದ್ದೇವೆ.

ಫೌಸ್ಟ್‌ನಲ್ಲಿ ಹಿನ್ನೆಲೆ ಕಾರ್ಯಗಳು, ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳುಫೌಸ್ಟ್‌ನಲ್ಲಿ ಹಿನ್ನೆಲೆ ಕಾರ್ಯಗಳು, ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳು

ಸಂತೋಷ ಮತ್ತು ಸಂತೋಷ - ಮೊದಲ ಏಜೆಂಟ್ ಸಿದ್ಧವಾಗಿದೆ :)

ಏಜೆಂಟ್ ಸಿದ್ಧವಾಗಿದೆ, ಹೊಸ ಏಜೆಂಟ್ ದೀರ್ಘಕಾಲ ಬದುಕಲಿ!

ಹೌದು, ಮಹನೀಯರೇ, ಈ ಲೇಖನವು ಸಿದ್ಧಪಡಿಸಿದ ಮಾರ್ಗದ 1/3 ಅನ್ನು ಮಾತ್ರ ನಾವು ಆವರಿಸಿದ್ದೇವೆ, ಆದರೆ ನಿರುತ್ಸಾಹಗೊಳಿಸಬೇಡಿ, ಏಕೆಂದರೆ ಈಗ ಅದು ಸುಲಭವಾಗುತ್ತದೆ.

ಆದ್ದರಿಂದ ಈಗ ನಮಗೆ ಮೆಟಾ ಮಾಹಿತಿಯನ್ನು ಸಂಗ್ರಹಿಸುವ ಮತ್ತು ಸಂಗ್ರಹಣೆ ಡಾಕ್ಯುಮೆಂಟ್‌ನಲ್ಲಿ ಇರಿಸುವ ಏಜೆಂಟ್ ಅಗತ್ಯವಿದೆ:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

ಈ ಏಜೆಂಟ್ ನಿರ್ದಿಷ್ಟ ಭದ್ರತೆಯ ಕುರಿತು ಮಾಹಿತಿಯನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದರಿಂದ, ನಾವು ಸಂದೇಶದಲ್ಲಿ ಈ ಭದ್ರತೆಯ ಟಿಕ್ಕರ್ (ಚಿಹ್ನೆ) ಅನ್ನು ಸೂಚಿಸಬೇಕಾಗಿದೆ. ಈ ಉದ್ದೇಶಕ್ಕಾಗಿ ಫಾಸ್ಟ್ ಇವೆ ದಾಖಲೆಗಳು - ಏಜೆಂಟ್ ವಿಷಯದಲ್ಲಿ ಸಂದೇಶ ಯೋಜನೆಯನ್ನು ಘೋಷಿಸುವ ತರಗತಿಗಳು.

ಈ ಸಂದರ್ಭದಲ್ಲಿ, ನಾವು ಹೋಗೋಣ ದಾಖಲೆಗಳು.py ಮತ್ತು ಈ ವಿಷಯದ ಸಂದೇಶವು ಹೇಗಿರಬೇಕು ಎಂಬುದನ್ನು ವಿವರಿಸಿ:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

ನೀವು ಊಹಿಸಿದಂತೆ, ಸಂದೇಶ ಸ್ಕೀಮಾವನ್ನು ವಿವರಿಸಲು ಫೌಸ್ಟ್ ಪೈಥಾನ್ ಪ್ರಕಾರದ ಟಿಪ್ಪಣಿಯನ್ನು ಬಳಸುತ್ತದೆ, ಅದಕ್ಕಾಗಿಯೇ ಲೈಬ್ರರಿಯು ಬೆಂಬಲಿಸುವ ಕನಿಷ್ಠ ಆವೃತ್ತಿಯಾಗಿದೆ 3.6.

ನಾವು ಏಜೆಂಟ್‌ಗೆ ಹಿಂತಿರುಗಿ, ಪ್ರಕಾರಗಳನ್ನು ಹೊಂದಿಸಿ ಮತ್ತು ಅದನ್ನು ಸೇರಿಸಿ:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

ನೀವು ನೋಡುವಂತೆ, ನಾವು ಹೊಸ ಪ್ಯಾರಾಮೀಟರ್ ಅನ್ನು ಸ್ಕೀಮ್ನೊಂದಿಗೆ ವಿಷಯದ ಪ್ರಾರಂಭದ ವಿಧಾನಕ್ಕೆ ರವಾನಿಸುತ್ತೇವೆ - value_type. ಇದಲ್ಲದೆ, ಎಲ್ಲವೂ ಒಂದೇ ಯೋಜನೆಯನ್ನು ಅನುಸರಿಸುತ್ತದೆ, ಆದ್ದರಿಂದ ನಾನು ಬೇರೆ ಯಾವುದರ ಬಗ್ಗೆಯೂ ವಾಸಿಸುವುದರಲ್ಲಿ ಯಾವುದೇ ಅರ್ಥವನ್ನು ಕಾಣುವುದಿಲ್ಲ.

ಸರಿ, ಕಲೆಕ್ಟ್_ಸೆಕ್ಯುರಿಟೈಟ್‌ಗಳಿಗೆ ಮೆಟಾ ಮಾಹಿತಿ ಸಂಗ್ರಹಣೆ ಏಜೆಂಟ್‌ಗೆ ಕರೆಯನ್ನು ಸೇರಿಸುವುದು ಅಂತಿಮ ಸ್ಪರ್ಶವಾಗಿದೆ:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

ಸಂದೇಶಕ್ಕಾಗಿ ನಾವು ಹಿಂದೆ ಘೋಷಿಸಿದ ಯೋಜನೆಯನ್ನು ಬಳಸುತ್ತೇವೆ. ಈ ಸಂದರ್ಭದಲ್ಲಿ, ನಾನು .cast ವಿಧಾನವನ್ನು ಬಳಸಿದ್ದೇನೆ ಏಕೆಂದರೆ ನಾವು ಏಜೆಂಟ್‌ನಿಂದ ಫಲಿತಾಂಶಕ್ಕಾಗಿ ಕಾಯಬೇಕಾಗಿಲ್ಲ, ಆದರೆ ಅದನ್ನು ನಮೂದಿಸುವುದು ಯೋಗ್ಯವಾಗಿದೆ ಮಾರ್ಗಗಳು ವಿಷಯಕ್ಕೆ ಸಂದೇಶವನ್ನು ಕಳುಹಿಸಿ:

  1. ಎರಕಹೊಯ್ದ - ನಿರ್ಬಂಧಿಸುವುದಿಲ್ಲ ಏಕೆಂದರೆ ಅದು ಫಲಿತಾಂಶವನ್ನು ನಿರೀಕ್ಷಿಸುವುದಿಲ್ಲ. ನೀವು ಫಲಿತಾಂಶವನ್ನು ಮತ್ತೊಂದು ವಿಷಯಕ್ಕೆ ಸಂದೇಶದಂತೆ ಕಳುಹಿಸಲಾಗುವುದಿಲ್ಲ.

  2. ಕಳುಹಿಸು - ನಿರ್ಬಂಧಿಸುವುದಿಲ್ಲ ಏಕೆಂದರೆ ಅದು ಫಲಿತಾಂಶವನ್ನು ನಿರೀಕ್ಷಿಸುವುದಿಲ್ಲ. ಫಲಿತಾಂಶವು ಹೋಗುವ ವಿಷಯದಲ್ಲಿ ನೀವು ಏಜೆಂಟ್ ಅನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸಬಹುದು.

  3. ಕೇಳಿ - ಫಲಿತಾಂಶಕ್ಕಾಗಿ ಕಾಯುತ್ತಿದೆ. ಫಲಿತಾಂಶವು ಹೋಗುವ ವಿಷಯದಲ್ಲಿ ನೀವು ಏಜೆಂಟ್ ಅನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸಬಹುದು.

ಆದ್ದರಿಂದ, ಇಂದಿನ ಏಜೆಂಟ್‌ಗಳೊಂದಿಗೆ ಅಷ್ಟೆ!

ಕನಸಿನ ತಂಡ

ಈ ಭಾಗದಲ್ಲಿ ಬರೆಯಲು ನಾನು ಭರವಸೆ ನೀಡಿದ ಕೊನೆಯ ವಿಷಯವೆಂದರೆ ಆಜ್ಞೆಗಳು. ಮೊದಲೇ ಹೇಳಿದಂತೆ, ಫಾಸ್ಟ್‌ನಲ್ಲಿನ ಆಜ್ಞೆಗಳು ಕ್ಲಿಕ್‌ನ ಸುತ್ತ ಸುತ್ತುತ್ತವೆ. ವಾಸ್ತವವಾಗಿ, -A ಕೀಲಿಯನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸುವಾಗ ಫೌಸ್ಟ್ ನಮ್ಮ ಕಸ್ಟಮ್ ಆಜ್ಞೆಯನ್ನು ಅದರ ಇಂಟರ್ಫೇಸ್‌ಗೆ ಸರಳವಾಗಿ ಜೋಡಿಸುತ್ತದೆ

ಘೋಷಿಸಿದ ಏಜೆಂಟ್ ನಂತರ agents.py ಡೆಕೋರೇಟರ್ನೊಂದಿಗೆ ಕಾರ್ಯವನ್ನು ಸೇರಿಸಿ app.commandವಿಧಾನವನ್ನು ಕರೆಯುವುದು ಎರಕಹೊಯ್ದ у ಸಂಗ್ರಹ_ಸೆಕ್ಯುರಿಟಿಗಳು:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

ಹೀಗಾಗಿ, ನಾವು ಆಜ್ಞೆಗಳ ಪಟ್ಟಿಯನ್ನು ಕರೆದರೆ, ನಮ್ಮ ಹೊಸ ಆಜ್ಞೆಯು ಅದರಲ್ಲಿರುತ್ತದೆ:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

ನಾವು ಅದನ್ನು ಬೇರೆಯವರಂತೆ ಬಳಸಬಹುದು, ಆದ್ದರಿಂದ ನಾವು ಫಾಸ್ಟ್ ವರ್ಕರ್ ಅನ್ನು ಮರುಪ್ರಾರಂಭಿಸೋಣ ಮತ್ತು ಭದ್ರತೆಗಳ ಪೂರ್ಣ ಪ್ರಮಾಣದ ಸಂಗ್ರಹವನ್ನು ಪ್ರಾರಂಭಿಸೋಣ:

> faust -A horton.agents start-collect-securities

ಮುಂದೆ ಏನಾಗುತ್ತದೆ?

ಮುಂದಿನ ಭಾಗದಲ್ಲಿ, ಉಳಿದ ಏಜೆಂಟ್‌ಗಳನ್ನು ಉದಾಹರಣೆಯಾಗಿ ಬಳಸಿ, ವರ್ಷಕ್ಕೆ ವ್ಯಾಪಾರದ ಮುಕ್ತಾಯದ ಬೆಲೆಗಳು ಮತ್ತು ಏಜೆಂಟರ ಕ್ರಾನ್ ಉಡಾವಣೆಯಲ್ಲಿ ವಿಪರೀತಗಳನ್ನು ಹುಡುಕುವ ಸಿಂಕ್ ಕಾರ್ಯವಿಧಾನವನ್ನು ನಾವು ಪರಿಗಣಿಸುತ್ತೇವೆ.

ಇವತ್ತಿಗೂ ಅಷ್ಟೆ! ಓದಿದ್ದಕ್ಕೆ ಧನ್ಯವಾದಗಳು :)

ಈ ಭಾಗಕ್ಕೆ ಕೋಡ್

ಫೌಸ್ಟ್‌ನಲ್ಲಿ ಹಿನ್ನೆಲೆ ಕಾರ್ಯಗಳು, ಭಾಗ II: ಏಜೆಂಟ್‌ಗಳು ಮತ್ತು ತಂಡಗಳು

PS ಕೊನೆಯ ಭಾಗದ ಅಡಿಯಲ್ಲಿ ನನಗೆ ಫೌಸ್ಟ್ ಮತ್ತು ಕನ್ಫ್ಯೂಯೆಂಟ್ ಕಾಫ್ಕಾ ಬಗ್ಗೆ ಕೇಳಲಾಯಿತು (ಸಂಗಮವು ಯಾವ ವೈಶಿಷ್ಟ್ಯಗಳನ್ನು ಹೊಂದಿದೆ?) ಸಂಗಮವು ಅನೇಕ ವಿಧಗಳಲ್ಲಿ ಹೆಚ್ಚು ಕ್ರಿಯಾತ್ಮಕವಾಗಿದೆ ಎಂದು ತೋರುತ್ತದೆ, ಆದರೆ ವಾಸ್ತವವೆಂದರೆ ಫೌಸ್ಟ್ ಸಂಗಮಕ್ಕೆ ಸಂಪೂರ್ಣ ಕ್ಲೈಂಟ್ ಬೆಂಬಲವನ್ನು ಹೊಂದಿಲ್ಲ - ಇದು ಅನುಸರಿಸುತ್ತದೆ ಡಾಕ್‌ನಲ್ಲಿ ಕ್ಲೈಂಟ್ ನಿರ್ಬಂಧಗಳ ವಿವರಣೆಗಳು.

ಮೂಲ: www.habr.com

ಕಾಮೆಂಟ್ ಅನ್ನು ಸೇರಿಸಿ