Faust၊ အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များအတွက် နောက်ခံတာဝန်များ

Faust၊ အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များအတွက် နောက်ခံတာဝန်များ

မာတိကာ

  1. အပိုင်း ၁- နိဒါန်း

  2. အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များ

ငါတို့ ဒီမှာ ဘာလုပ်နေတာလဲ။

ဒီတော့ ဒုတိယပိုင်းပေါ့။ အထက်တွင်ရေးခဲ့သည့်အတိုင်း၊ ၎င်းတွင်ကျွန်ုပ်တို့သည်အောက်ပါအတိုင်းလုပ်ဆောင်လိမ့်မည်။

  1. ကျွန်ုပ်တို့လိုအပ်သော အဆုံးမှတ်များအတွက် တောင်းဆိုမှုများဖြင့် aiohttp တွင် alphavantage အတွက် သေးငယ်သော client တစ်ခုကို ရေးကြပါစို့။

  2. လုံခြုံရေးနှင့် မက်တာအချက်အလက်ဆိုင်ရာ အချက်အလက်များကို စုဆောင်းမည့် ကိုယ်စားလှယ်တစ်ဦးကို ဖန်တီးကြပါစို့။

သို့သော်၊ ဤသည်မှာ ပရောဂျက်အတွက် ကျွန်ုပ်တို့လုပ်ဆောင်ရမည့်အရာဖြစ်ပြီး faust သုတေသနအရ၊ kafka မှ အဖြစ်အပျက်များကို ထုတ်လွှင့်သည့် အေးဂျင့်များကို မည်သို့ရေးရမည်နည်းအပြင် အမိန့်ပေးနည်း (click wrapper) ကို မည်သို့ရေးရမည်ကို ကျွန်ုပ်တို့ လေ့လာပါမည်။ အေးဂျင့်က စောင့်ကြည့်နေသည့် ခေါင်းစဉ်ဆီသို့ လူကိုယ်တိုင် တွန်းပို့သည့် မက်ဆေ့ချ်များအတွက်။

လေ့ကျင့်ရေး

AlphaVantage ဖောက်သည်

ပထမဦးစွာ၊ alphavantage ကိုတောင်းဆိုမှုများအတွက် aiohttp client လေးတစ်ခုရေးလိုက်ရအောင်။

alphavantage.py

လုယူသောသူ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

တကယ်တော့ အရာအားလုံးက ရှင်းပါတယ်။

  1. AlphaVantage API သည် ရိုးရိုးရှင်းရှင်းနှင့် လှပစွာ ဒီဇိုင်းထုတ်ထားသောကြောင့် တောင်းဆိုမှုအားလုံးကို နည်းလမ်းဖြင့် ပြုလုပ်ရန် ဆုံးဖြတ်ခဲ့သည်။ construct_query အဲဒီ့မှာ http call တစ်ခုရှိတယ်။

  2. လယ်တွေအကုန် ယူလာပေးတယ်။ snake_case အဆင်ပြေစေရန်။

  3. ကောင်းပြီ၊ လှပပြီး သတင်းအချက်အလတ် ကောက်ကြောင်းထွက်ရှိမှုအတွက် logger.catch အလှဆင်ခြင်း။

PS သည် config.yml သို့ စက်တွင်းရှိ alphavantage တိုကင်ကို ထည့်ရန် မမေ့ပါနှင့်၊ သို့မဟုတ် ပတ်ဝန်းကျင် ပြောင်းလဲနိုင်သော ပြောင်းလဲနိုင်သောကို တင်ပို့ပါ HORTON_SERVICE_APIKEY. ကျွန်ုပ်တို့သည် တိုကင်တစ်ခုကို လက်ခံရရှိပါသည်။ ဒီမှာ.

CRUD အတန်း

လုံခြုံရေးဆိုင်ရာ မက်တာအချက်အလက်ကို သိမ်းဆည်းရန် ကျွန်ုပ်တို့တွင် ငွေချေးသက်သေခံလက်မှတ် စုဆောင်းမှုတစ်ခု ရှိပါမည်။

database/security.py

ကျွန်တော့်အမြင်အရတော့ ဒီနေရာမှာ ဘာမှရှင်းပြစရာ မလိုပါဘူး၊ အခြေခံလူတန်းစားကိုယ်တိုင်က တော်တော်ရိုးရှင်းပါတယ်။

get_app()

အက်ပလီကေးရှင်းအရာဝတ္ထုတစ်ခု ဖန်တီးရန်အတွက် လုပ်ဆောင်ချက်တစ်ခုကို ထည့်လိုက်ကြပါစို့ app.py

လုယူသောသူ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

ယခုတွင် ကျွန်ုပ်တို့သည် အရိုးရှင်းဆုံးသော အက်ပ်လီကေးရှင်းဖန်တီးမှုကို လုပ်ဆောင်နိုင်မည်ဖြစ်ပြီး အနည်းငယ်အကြာတွင် သင့်အား စောင့်မနေစေရန်အတွက် ၎င်းကို ချဲ့ထွင်ပါမည်။ အကိုးအကား App-class သို့။ ဆက်တင်များ အများစုအတွက် တာဝန်ရှိသောကြောင့် ဆက်တင်များ အတန်းကို ကြည့်ပါ။

အဓိက

ငွေချေးစာရင်းကို စုဆောင်းထိန်းသိမ်းခြင်းအတွက် ကိုယ်စားလှယ်

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ထို့ကြောင့်၊ ဦးစွာ faust application object ကိုကျွန်ုပ်တို့ရရှိသည် - ၎င်းသည်အတော်လေးရိုးရှင်းပါသည်။ ထို့နောက် ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏အေးဂျင့်အတွက် ခေါင်းစဉ်တစ်ခုကို အတိအလင်းကြေညာလိုက်သည်... ဤနေရာတွင် ၎င်းသည် အဘယ်အရာဖြစ်သည်၊ အတွင်းဘောင်သတ်မှတ်ချက်ဟူသည် အဘယ်နည်း၊ ၎င်းကို ကွဲပြားစွာစီစဉ်နိုင်ပုံကို ဖော်ပြသွားပါမည်။

  1. kafka ရှိ အကြောင်းအရာများ အတိအကျ အဓိပ္ပါယ် သိချင်ပါက ဖတ်ရန် ပိုကောင်းပါတယ်။ off စာရွက်စာတမ်းဒါမှမဟုတ် ဖတ်နိုင်ပါတယ်။ ပါဒီယမ် ရုရှလို Habré တွင် အရာအားလုံးသည် တိကျစွာ ထင်ဟပ်နေပါသည်။)

  2. ကန့်သတ်ချက်အတွင်းပိုင်းfaust doc တွင် ကောင်းစွာဖော်ပြထားသော၊ ကုဒ်တွင် ခေါင်းစဉ်ကို တိုက်ရိုက် configure လုပ်ခွင့်ပေးသည်၊ ဆိုလိုသည်မှာ faust developer များမှ ပေးထားသော parameters များ ဥပမာ- retention၊ retention policy (default အားဖြင့် delete သော်လည်း၊ သင်သည် သတ်မှတ်နိုင်သည်။ ကျစ်လစ်သော) ခေါင်းစဉ်တစ်ခုစီ၏ partitions အရေအတွက် (partitions ကိုဥပမာ ထက်နည်းအောင် လုပ်ရမယ်။ ကမ္ဘာလုံးဆိုင်ရာ အရေးပါမှု လျှောက်လွှာများ faust) ။

  3. ယေဘုယျအားဖြင့်၊ အေးဂျင့်သည် ကမ္ဘာလုံးဆိုင်ရာတန်ဖိုးများဖြင့် စီမံခန့်ခွဲသည့်အကြောင်းအရာကို ဖန်တီးနိုင်သော်လည်း၊ ကျွန်ုပ်သည် အရာအားလုံးကို အတိအလင်းကြေညာလိုပါသည်။ ထို့အပြင်၊ အချို့သော ကန့်သတ်ချက်များ (ဥပမာ၊ အခန်းကန့်အရေအတွက် သို့မဟုတ် ထိန်းသိမ်းမှုမူဝါဒ) ကို အေးဂျင့်ကြော်ငြာတွင် ထည့်သွင်း၍မရပါ။

    ဤအရာသည် ခေါင်းစဉ်ကို ကိုယ်တိုင်မသတ်မှတ်ဘဲ ၎င်းသည် မည်သို့သောပုံသဏ္ဌာန်ရှိနိုင်မည်နည်း။

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ကဲ၊ ငါတို့ရဲ့ အေးဂျင့်က ဘာလုပ်မယ်ဆိုတာကို အခုပြောပြလိုက်ရအောင်။ :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

ထို့ကြောင့်၊ အေးဂျင့်၏အစတွင်၊ ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ဖောက်သည်မှတစ်ဆင့် တောင်းဆိုချက်များအတွက် aiohttp စက်ရှင်ကိုဖွင့်ပါသည်။ ထို့ကြောင့်၊ အလုပ်သမားတစ်ဦးကို စတင်သောအခါ၊ ကျွန်ုပ်တို့၏ အေးဂျင့်ကို စတင်လိုက်သောအခါ၊ စက်ရှင်တစ်ခုသည် ချက်ချင်းဖွင့်လှစ်ပါမည် - တစ်ခု၊ အလုပ်သမားသည် အလုပ်လုပ်နေချိန်တစ်ခုလုံးအတွက် (သို့မဟုတ် ကန့်သတ်ဘောင်ကို သင်ပြောင်းလဲပါက များစွာ၊ တစ်ပြိုင်နက်တည်း ပုံသေယူနစ်တစ်ခုရှိသော အေးဂျင့်ထံမှ)။

ထို့နောက် ကျွန်ုပ်တို့သည် စမ်းချောင်းကို လိုက်လျှောက်ပါ (မက်ဆေ့ချ်ကို ထည့်ပါ။ _ကျွန်ုပ်တို့၊ ဤအေးဂျင့်တွင် ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏အကြောင်းအရာမှ မက်ဆေ့ချ်များ) ၏အကြောင်းအရာကို ဂရုမစိုက်သောကြောင့်၊ ၎င်းတို့သည် လက်ရှိ offset တွင်ရှိနေပါက၊ သို့မဟုတ်ပါက ကျွန်ုပ်တို့၏စက်ဝန်းသည် ၎င်းတို့၏ရောက်ရှိလာခြင်းကို စောင့်ဆိုင်းနေမည်ဖြစ်ပါသည်။ ကောင်းပြီ၊ ကျွန်ုပ်တို့၏စက်ဝိုင်းအတွင်းတွင်၊ ကျွန်ုပ်တို့သည် မက်ဆေ့ချ်လက်ခံဖြတ်ပိုင်းကို မှတ်တမ်းရယူပြီး၊ တက်ကြွသောစာရင်းတစ်ခုရယူပြီး (get_securities သည် ပုံမှန်အတိုင်းသာ တက်ကြွစွာပြန်တက်လာသည်၊ client ကုဒ်ကိုကြည့်ပါ) လုံခြုံရေးများကို ဒေတာဘေ့စ်တွင်သိမ်းဆည်းကာ တူညီသော ticker နှင့် လုံခြုံရေးရှိမရှိစစ်ဆေးခြင်း၊ ဒေတာဘေ့စ်တွင်လဲလှယ် ၊ ရှိလျှင်၎င်းသည် (စာရွက်) ကိုရိုးရှင်းစွာမွမ်းမံလိမ့်မည်။

ကျွန်ုပ်တို့၏ဖန်တီးမှုကို စတင်လိုက်ရအောင်။

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS အင်္ဂါရပ်များ ဝဘ်အစိတ်အပိုင်း ဆောင်းပါးများတွင် faust ကို ကျွန်တော် ထည့်သွင်းစဉ်းစားမည်မဟုတ်ပါ၊ ထို့ကြောင့် ကျွန်ုပ်တို့သည် သင့်လျော်သောအလံကို သတ်မှတ်ပါသည်။

ကျွန်ုပ်တို့၏ launch command တွင်၊ faust သည် အပလီကေးရှင်းအရာဝတ္တုကို မည်သည့်နေရာတွင်ရှာရမည်နှင့် ၎င်းနှင့်မည်သို့လုပ်ဆောင်ရမည်ကို အချက်အလက်မှတ်တမ်းအထွက်အဆင့်ဖြင့် faust အား ပြောကြားခဲ့ပါသည်။ ကျွန်ုပ်တို့သည် အောက်ပါ output ကိုရရှိသည်-

လုယူသောသူ

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

အသက်​ရှင်​တယ်​!!!

Partition Set ကို ကြည့်ရအောင်။ ကျွန်ုပ်တို့မြင်နိုင်သည်အတိုင်း၊ ကုဒ်တွင်သတ်မှတ်ထားသောအမည်၊ အပိုင်းပိုင်းများ၏မူလနံပါတ် (8၊ မှယူသည်) ဖြင့်ခေါင်းစဉ်တစ်ခုကိုဖန်တီးခဲ့သည် topic_partitions - အပလီကေးရှင်းအရာဝတ္ထု ကန့်သတ်ချက်များ)၊ ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏အကြောင်းအရာအတွက် တစ်ဦးချင်းတန်ဖိုးကို မသတ်မှတ်ထားသောကြောင့် (အပိုင်းခွဲများမှတစ်ဆင့်)။ တစ်ခုတည်းသော အပိုင်းဖြစ်သောကြောင့် အလုပ်သမားတွင် စတင်ထုတ်လုပ်သည့် အေးဂျင့်အား အပိုင်း 8 ခုစလုံးကို တာဝန်ပေးအပ်ထားသော်လည်း ၎င်းကို အစုအဝေးပြုလုပ်ခြင်းဆိုင်ရာ အပိုင်းတွင် အသေးစိတ် ဆွေးနွေးပါမည်။

ယခု ကျွန်ုပ်တို့သည် အခြားသော terminal window သို့သွား၍ ကျွန်ုပ်တို့၏ခေါင်းစဉ်သို့ အလွတ်မက်ဆေ့ခ်ျတစ်စောင် ပေးပို့နိုင်သည်-

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS သုံးတယ်။ @ ကျွန်ုပ်တို့သည် “collect_securities” အမည်ရှိ ခေါင်းစဉ်တစ်ခုထံသို့ မက်ဆေ့ချ်ပို့နေကြောင်း ကျွန်ုပ်တို့ပြသပါသည်။

ဤကိစ္စတွင်၊ မက်ဆေ့ချ်သည် partition 6 သို့သွားသည် - kafdrop ကိုသွားခြင်းဖြင့်၎င်းကိုသင်စစ်ဆေးနိုင်သည်။ localhost:9000

ကျွန်ုပ်တို့၏အလုပ်သမားနှင့် terminal window သို့သွားပါက loguru ကိုအသုံးပြု၍ ပျော်ရွှင်သောမက်ဆေ့ချ်တစ်ခုကို ကျွန်ုပ်တို့တွေ့မြင်ရပါမည်-

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

ကျွန်ုပ်တို့သည် mongo (Robo3T သို့မဟုတ် Studio3T ကိုသုံး၍) ကိုလည်း mongo သို့ကြည့်ရှုနိုင်ပြီး လုံခြုံရေးများသည် ဒေတာဘေ့စ်ထဲတွင် ရှိနေကြောင်းတွေ့နိုင်ပါသည်။

ကျွန်ုပ်သည် ဘီလျံနာတစ်ဦးမဟုတ်ပါ၊ ထို့ကြောင့် ကျွန်ုပ်တို့သည် ပထမဆုံးကြည့်ရှုခြင်းရွေးချယ်မှုကို ကျေနပ်ပါသည်။

Faust၊ အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များအတွက် နောက်ခံတာဝန်များFaust၊ အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များအတွက် နောက်ခံတာဝန်များ

ပျော်ရွှင်မှုနှင့် ပျော်ရွှင်မှု - ပထမဆုံး အေးဂျင့် အဆင်သင့်ဖြစ်နေပါပြီ :)

အေးဂျင့် အဆင်သင့်ဖြစ်ပြီ အေးဂျင့်အသစ် အသက်ရှည်ပါစေ။

ဟုတ်ကဲ့ လူကြီးမင်းတို့၊ ဒီဆောင်းပါးမှာ ပြင်ဆင်ထားတဲ့ လမ်းကြောင်းရဲ့ 1/3 ကိုသာ ခြုံငုံမိပေမယ့် အခုက ပိုလွယ်လာတဲ့အတွက် စိတ်ဓာတ်မကျပါနဲ့။

ထို့ကြောင့် ယခု ကျွန်ုပ်တို့သည် မက်တာအချက်အလက်ကို စုဆောင်းပြီး ၎င်းကို စုစည်းမှုစာတမ်းတစ်ခုအဖြစ် ထည့်သွင်းသည့် အေးဂျင့်တစ်ဦး လိုအပ်ပါသည်။

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

ဤအေးဂျင့်သည် သီးခြားလုံခြုံရေးတစ်ခုနှင့်ပတ်သက်သည့် အချက်အလက်များကို လုပ်ဆောင်ပေးမည်ဖြစ်သောကြောင့် မက်ဆေ့ချ်တွင် ဤလုံခြုံရေး၏ လက်မှတ် (သင်္ကေတ) ကို ညွှန်ပြရန် လိုအပ်ပါသည်။ ဒီရည်ရွယ်ချက်အတွက် faust မှာရှိတယ်။ မှတ်တမ်း — အေးဂျင့်အကြောင်းအရာရှိ မက်ဆေ့ချ်အစီအစဉ်ကို ကြေညာသည့်အတန်းများ။

ဒီနေရာမှာ သွားကြည့်ရအောင် records.py နှင့် ဤအကြောင်းအရာအတွက် မက်ဆေ့ချ်သည် မည်ကဲ့သို့ ဖြစ်သင့်သည်ကို ဖော်ပြပါ-

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

သင်မှန်းဆထားသည့်အတိုင်း faust သည် မက်ဆေ့ချ်အစီအစဉ်ကိုဖော်ပြရန် python အမျိုးအစားမှတ်ချက်ကိုအသုံးပြုသည်၊ ထို့ကြောင့် စာကြည့်တိုက်မှပံ့ပိုးပေးသည့် အနိမ့်ဆုံးဗားရှင်းဖြစ်သည်။ 3.6.

အေးဂျင့်ထံ ပြန်သွားရအောင်၊ အမျိုးအစားများကို သတ်မှတ်ပြီး ထည့်ကြပါစို့။

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

သင်တွေ့မြင်ရသည့်အတိုင်း၊ ကျွန်ုပ်တို့သည် ခေါင်းစဉ်စတင်ခြင်းနည်းလမ်း - value_type သို့ အစီအစဉ်တစ်ခုဖြင့် ဘောင်အသစ်တစ်ခုကို ဖြတ်သန်းပါသည်။ ထို့အပြင်၊ အရာရာတိုင်းသည် တူညီသောအစီအစဥ်အတိုင်း လုပ်ဆောင်နေသောကြောင့် အခြားမည်သည့်အရာကိုမျှ မှီခိုအားထားခြင်းကို ကျွန်ုပ်မမြင်ပါ။

ကောင်းပြီ၊ နောက်ဆုံးထိတွေ့မှုမှာ collect_securitites သို့ meta အချက်အလက်စုဆောင်းမှု အေးဂျင့်သို့ ခေါ်ဆိုမှုတစ်ခု ထည့်ရန်ဖြစ်သည်-

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

မက်ဆေ့ချ်အတွက် ယခင်က ကြေညာထားသော အစီအစဉ်ကို ကျွန်ုပ်တို့ အသုံးပြုပါသည်။ ဤကိစ္စတွင်၊ ကျွန်ုပ်သည် အေးဂျင့်ထံမှရလဒ်ကိုစောင့်ဆိုင်းရန်မလိုအပ်သောကြောင့် .cast နည်းလမ်းကိုအသုံးပြုခဲ့သည်၊ သို့သော်၎င်းကိုဖော်ပြရကျိုးနပ်ပါသည်။ နည်းလမ်းများ ခေါင်းစဉ်သို့ မက်ဆေ့ခ်ျပို့ပါ-

  1. cast - ရလဒ်ကိုမမျှော်လင့်ထားသောကြောင့်ပိတ်ဆို့မထားပေ။ ရလဒ်ကို မက်ဆေ့ချ်အနေဖြင့် အခြားအကြောင်းအရာသို့ ပေးပို့၍မရပါ။

  2. send - ရလဒ်ကိုမျှော်လင့်မထားသောကြောင့်ပိတ်ဆို့မထားပေ။ ရလဒ်ထွက်မည့်အကြောင်းအရာတွင် ကိုယ်စားလှယ်တစ်ဦးကို သင်သတ်မှတ်နိုင်သည်။

  3. မေး - ရလဒ်ကိုစောင့်ပါ။ ရလဒ်ထွက်မည့်အကြောင်းအရာတွင် ကိုယ်စားလှယ်တစ်ဦးကို သင်သတ်မှတ်နိုင်သည်။

ဒီတော့ ဒီနေ့အတွက် အေးဂျင့်တွေနဲ့ ဒီလောက်ပါပဲ။

အိပ်မက်အဖွဲ့

ဒီအပိုင်းမှာ ရေးဖို့ ကတိပြုထားတဲ့ နောက်ဆုံးအချက်က အမိန့်တွေပါ။ အစောပိုင်းတွင်ဖော်ပြခဲ့သည့်အတိုင်း faust ရှိ command များသည် click ပတ်ပတ်လည်တွင်ခြုံငုံသုံးသပ်ချက်ဖြစ်သည်။ အမှန်မှာ၊ faust သည် -A သော့ကို သတ်မှတ်သောအခါတွင် ကျွန်ုပ်တို့၏ စိတ်ကြိုက် command ကို ၎င်း၏ interface တွင် ပူးတွဲထားသည်။

ကြေငြာပြီးရင် အေးဂျင့်တွေ အတွက် agents.py အလှဆင်ကိရိယာဖြင့် လုပ်ဆောင်ချက်တစ်ခုကို ထည့်ပါ။ app.commandmethod ကိုခေါ်သည်။ သွန်းပုံ у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

ထို့ကြောင့်၊ ကျွန်ုပ်တို့သည် commands များစာရင်းကိုခေါ်ဆိုပါက၊ ကျွန်ုပ်တို့၏ command အသစ်သည် ၎င်းတွင်ရှိနေမည်ဖြစ်သည်။

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

ကျွန်ုပ်တို့သည် ၎င်းကို အခြားသူများကဲ့သို့ သုံးနိုင်သည်၊ ထို့ကြောင့် faust အလုပ်သမားကို ပြန်လည်စတင်ပြီး ပြည့်စုံသော လုံခြုံရေးစုဆောင်းမှုကို စတင်ကြပါစို့။

> faust -A horton.agents start-collect-securities

နောက်ဘာဆက်ဖြစ်မလဲ။

နောက်အပိုင်းတွင်၊ ကျန်ရှိသောအေးဂျင့်များကိုဥပမာတစ်ခုအနေဖြင့်အသုံးပြုကာ၊ နှစ်အတွက်ကုန်သွယ်ခြင်း၏အပိတ်စျေးနှုန်းများနှင့်အေးဂျင့်များ၏ cron စတင်ခြင်းအတွက်အလွန်အကျွံရှာဖွေခြင်းအတွက်နစ်မြုပ်မှုယန္တရားကိုကျွန်ုပ်တို့ကြည့်ရှုပါမည်။

ဒီနေ့အတွက် ဒီလောက်ပါပဲ။ ဖတ်ရှုပေးတဲ့အတွက် ကျေးဇူးတင်ပါတယ် :)

ဤအပိုင်းအတွက် ကုဒ်

Faust၊ အပိုင်း II- အေးဂျင့်များနှင့် အဖွဲ့များအတွက် နောက်ခံတာဝန်များ

PS နောက်ဆုံးအပိုင်းမှာတော့ faust နဲ့ confluent kafka (ပေါင်းစပ်ပါဝင်မှုတွင် အဘယ်အင်္ဂါရပ်များ ရှိသနည်း။) confluent သည် နည်းလမ်းများစွာဖြင့် ပိုမိုလုပ်ဆောင်နိုင်သည်ဟု ထင်ရသော်လည်း အမှန်မှာ faust သည် confluent အတွက် client support အပြည့်အဝမရှိခြင်းဖြစ်သည် - ၎င်းမှ အောက်ပါအတိုင်းဖြစ်သည်။ doc တွင် client ကန့်သတ်ချက်များအကြောင်း ဖော်ပြချက်.

source: www.habr.com

မှတ်ချက် Add