وظایف پیشینه فاوست، قسمت دوم: عوامل و تیم ها

وظایف پیشینه فاوست، قسمت دوم: عوامل و تیم ها

فهرست مندرجات

  1. بخش اول: مقدمه

  2. بخش دوم: عوامل و تیم ها

ما اینجا چه کار می کنیم؟

بنابراین، بنابراین، بخش دوم. همانطور که قبلاً نوشته شد، در آن موارد زیر را انجام خواهیم داد:

  1. بیایید یک کلاینت کوچک برای alphavantage در aiohttp با درخواست هایی برای نقاط پایانی مورد نیاز خود بنویسیم.

  2. بیایید یک عامل ایجاد کنیم که داده‌های مربوط به اوراق بهادار و متا اطلاعات مربوط به آنها را جمع‌آوری کند.

اما، این همان کاری است که ما برای خود پروژه انجام خواهیم داد، و از نظر تحقیقات فاست، نحوه نوشتن عواملی که رویدادها را از کافکا پردازش می کنند و همچنین نحوه نوشتن دستورات (کلیک wrapper) را در مورد ما یاد خواهیم گرفت - برای پیام‌های فشار دستی به موضوعی که عامل نظارت می‌کند.

پرورش

کلاینت AlphaVantage

ابتدا، بیایید یک مشتری aiohttp کوچک برای درخواست‌های alphavantage بنویسیم.

alphavantage.py

تباه کننده

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

در واقع، همه چیز از آن مشخص است:

  1. API AlphaVantage کاملاً ساده و زیبا طراحی شده است، بنابراین تصمیم گرفتم تمام درخواست ها را از طریق این روش انجام دهم construct_query جایی که به نوبه خود یک تماس http وجود دارد.

  2. من همه زمینه ها را به snake_case برای آسودگی.

  3. خوب، دکوراسیون logger.catch برای خروجی ردیابی زیبا و آموزنده.

PS فراموش نکنید که توکن alphavantage را به صورت محلی به config.yml اضافه کنید یا متغیر محیطی را صادر کنید. HORTON_SERVICE_APIKEY. ما یک نشانه دریافت می کنیم اینجا.

کلاس CRUD

ما یک مجموعه اوراق بهادار برای ذخیره اطلاعات متا در مورد اوراق بهادار خواهیم داشت.

پایگاه داده/security.py

به نظر من اینجا نیازی به توضیح چیزی نیست و خود کلاس پایه کاملاً ساده است.

آپلیکیشین را دریافت کن()

بیایید یک تابع برای ایجاد یک شی برنامه در آن اضافه کنیم app.py

تباه کننده

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

در حال حاضر ما ساده ترین ایجاد برنامه را خواهیم داشت، کمی بعد آن را گسترش خواهیم داد، اما برای اینکه شما را منتظر نگذاریم، در اینجا منابع به کلاس App. من همچنین به شما توصیه می کنم به کلاس تنظیمات نگاهی بیندازید، زیرا این کلاس مسئول اکثر تنظیمات است.

قسمت اصلی

عامل جمع آوری و نگهداری لیست اوراق بهادار

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

بنابراین، ابتدا شی برنامه faust را دریافت می کنیم - بسیار ساده است. در مرحله بعد، ما به صراحت یک موضوع را برای نماینده خود اعلام می کنیم... در اینجا لازم به ذکر است که چیست، پارامتر داخلی چیست و چگونه می توان این موضوع را متفاوت ترتیب داد.

  1. موضوعات در کافکا، اگر بخواهیم تعریف دقیق آن را بدانیم، بهتر است بخوانیم خاموش سند، یا می توانید بخوانید چکیده در Habré به زبان روسی، جایی که همه چیز نیز کاملاً دقیق منعکس شده است :)

  2. پارامتر داخلی، که به خوبی در faust doc توضیح داده شده است، به ما امکان می دهد موضوع را مستقیماً در کد پیکربندی کنیم، البته، این به معنای پارامترهای ارائه شده توسط توسعه دهندگان faust است، به عنوان مثال: حفظ، سیاست حفظ (به طور پیش فرض حذف، اما می توانید تنظیم کنید جمع و جورتعداد پارتیشن ها در هر موضوع (نمراتبرای انجام، برای مثال، کمتر از اهمیت جهانی برنامه های کاربردی فاست).

  3. به طور کلی، عامل می تواند یک موضوع مدیریت شده با مقادیر جهانی ایجاد کند، با این حال، من دوست دارم همه چیز را به صراحت اعلام کنم. علاوه بر این، برخی از پارامترها (به عنوان مثال، تعداد پارتیشن ها یا خط مشی حفظ) موضوع در تبلیغ نماینده قابل پیکربندی نیستند.

    در اینجا ممکن است بدون تعریف دستی موضوع به نظر برسد:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

خب، حالا بیایید توضیح دهیم که نماینده ما چه خواهد کرد :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

بنابراین، در ابتدای کارگزار، یک جلسه aiohttp برای درخواست ها از طریق مشتری خود باز می کنیم. بنابراین، هنگام راه‌اندازی یک Worker، هنگامی که عامل ما راه‌اندازی می‌شود، بلافاصله یک جلسه باز می‌شود - یک جلسه، برای تمام مدت زمانی که کارگر در حال اجرا است (یا چندین، اگر پارامتر را تغییر دهید. همزمانی از یک عامل با واحد پیش فرض).

در مرحله بعد، جریان را دنبال می کنیم (پیام را در آن قرار می دهیم _از آنجایی که ما در این عامل به محتوای) پیام‌های موضوع خود اهمیت نمی‌دهیم، در صورتی که در افست فعلی وجود داشته باشند، در غیر این صورت چرخه ما منتظر رسیدن آنها خواهد بود. خوب، در داخل حلقه خود، دریافت پیام را ثبت می کنیم، لیستی از اوراق بهادار فعال (get_securities فقط به طور پیش فرض فعال است، کد مشتری را ببینید) دریافت می کنیم و آن را در پایگاه داده ذخیره می کنیم، بررسی می کنیم که آیا امنیتی با همان تیک وجود دارد یا خیر و تبادل در پایگاه داده، اگر وجود داشته باشد، آن (کاغذ) به سادگی به روز می شود.

بیایید خلقت خود را راه اندازی کنیم!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

ویژگی های PS جزء وب من فاوست را در مقالات در نظر نخواهم گرفت، بنابراین ما پرچم مناسب را تعیین می کنیم.

در دستور راه‌اندازی، به فاست گفتیم که کجا به دنبال شی برنامه باشد و با آن چه کاری انجام دهد (کارگر را راه اندازی کند) با سطح خروجی گزارش اطلاعات. خروجی زیر را دریافت می کنیم:

تباه کننده

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

زنده است!!!

بیایید به مجموعه پارتیشن نگاه کنیم. همانطور که می بینیم، یک موضوع با نامی که در کد تعیین کردیم، تعداد پیش فرض پارتیشن ها ایجاد شد (8، برگرفته از موضوع_پارتیشن ها - پارامتر شی برنامه)، از آنجایی که ما یک مقدار جداگانه برای موضوع خود (از طریق پارتیشن ها) مشخص نکرده ایم. عامل راه‌اندازی شده در worker به هر 8 پارتیشن اختصاص داده می‌شود، زیرا تنها پارتیشن است، اما در قسمت مربوط به خوشه‌بندی با جزئیات بیشتر در مورد آن صحبت خواهد شد.

خوب، اکنون می توانیم به پنجره ترمینال دیگری برویم و یک پیام خالی به موضوع خود ارسال کنیم:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS با استفاده از @ ما نشان می‌دهیم که در حال ارسال پیام به موضوعی به نام "collect_securities" هستیم.

در این مورد، پیام به پارتیشن 6 رفت - می توانید با رفتن به kafdrop on این را بررسی کنید localhost:9000

با رفتن به پنجره ترمینال با کارگر خود، پیام شادی را خواهیم دید که با استفاده از loguru ارسال شده است:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

همچنین می‌توانیم به mongo نگاهی بیندازیم (با استفاده از Robo3T یا Studio3T) و ببینیم که اوراق بهادار در پایگاه داده هستند:

من میلیاردر نیستم و بنابراین به اولین گزینه مشاهده بسنده می کنیم.

وظایف پیشینه فاوست، قسمت دوم: عوامل و تیم هاوظایف پیشینه فاوست، قسمت دوم: عوامل و تیم ها

شادی و شادی - اولین نماینده آماده است :)

نماینده آماده است، زنده باد نماینده جدید!

بله، آقایان، ما فقط 1/3 از مسیری که در این مقاله تهیه شده است را طی کرده ایم، اما ناامید نباشید، زیرا اکنون آسانتر خواهد شد.

بنابراین اکنون ما به یک عامل نیاز داریم که اطلاعات متا را جمع آوری کرده و در یک سند مجموعه قرار دهد:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

از آنجایی که این عامل اطلاعات مربوط به یک امنیت خاص را پردازش می کند، باید علامت (نماد) این امنیت را در پیام نشان دهیم. برای این منظور در فاوست وجود دارد سوابق - کلاس هایی که طرح پیام را در موضوع عامل اعلام می کنند.

در این مورد، اجازه دهید به records.py و توضیح دهید که پیام این موضوع چگونه باید باشد:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

همانطور که ممکن است حدس بزنید، فاست از حاشیه نویسی نوع پایتون برای توصیف طرح پیام استفاده می کند، به همین دلیل است که حداقل نسخه پشتیبانی شده توسط کتابخانه 3.6.

بیایید به عامل برگردیم، انواع را تنظیم کرده و اضافه کنیم:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

همانطور که می بینید، ما یک پارامتر جدید را با یک طرح به روش مقداردهی_تاپیک ارسال می کنیم. علاوه بر این، همه چیز از همان طرح پیروی می کند، بنابراین من هیچ فایده ای در مورد چیز دیگری نمی بینم.

خوب، آخرین لمس اضافه کردن یک تماس به عامل جمع‌آوری اطلاعات متا برای collect_securitites است:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

ما از طرح اعلام شده قبلی برای پیام استفاده می کنیم. در این مورد از روش .cast استفاده کردم چون نیازی نیست منتظر نتیجه از طرف نماینده باشیم، اما قابل ذکر است که راه ها ارسال پیام به موضوع:

  1. بازیگران - مسدود نمی شود زیرا انتظار نتیجه را ندارد. شما نمی توانید نتیجه را به عنوان پیام به موضوع دیگری ارسال کنید.

  2. ارسال - مسدود نمی کند زیرا انتظار نتیجه را ندارد. شما می توانید یک عامل را در موضوع مشخص کنید که نتیجه به آن خواهد رفت.

  3. بپرس - منتظر نتیجه است. شما می توانید یک عامل را در موضوع مشخص کنید که نتیجه به آن خواهد رفت.

بنابراین، این همه با نمایندگان برای امروز!

تیم رویایی

آخرین چیزی که قول دادم در این قسمت بنویسم دستورات است. همانطور که قبلا ذکر شد، دستورات در faust یک بسته بندی در اطراف کلیک هستند. در واقع، فاست به سادگی دستور سفارشی ما را هنگام تعیین کلید -A به رابط خود متصل می کند

پس از اعلام عوامل در agents.py با دکوراتور یک تابع اضافه کنید app.commandفراخوانی روش انداختن у جمع آوری_ اوراق بهادار:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

بنابراین، اگر لیستی از دستورات را فراخوانی کنیم، دستور جدید ما در آن خواهد بود:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

ما می‌توانیم مانند هر کس دیگری از آن استفاده کنیم، بنابراین بیایید faust worker را مجدداً راه‌اندازی کنیم و مجموعه‌ای کامل از اوراق بهادار را شروع کنیم:

> faust -A horton.agents start-collect-securities

بعد از این چه خواهد شد؟

در قسمت بعدی با استفاده از عوامل باقیمانده به عنوان مثال، مکانیسم سینک برای جستجوی افراط در قیمت های پایانی معاملات برای سال و راه اندازی کرون نمایندگان را در نظر خواهیم گرفت.

برای امروز کافی است! ممنون که خواندید :)

کد این قسمت

وظایف پیشینه فاوست، قسمت دوم: عوامل و تیم ها

PS در قسمت آخر از من در مورد فاوست و کافکای متقابل پرسیده شد (همرو چه ویژگی هایی دارد؟). به نظر می رسد که confluent از بسیاری جهات کاربردی تر است، اما واقعیت این است که faust پشتیبانی کامل مشتری برای confluent ندارد - این نتیجه از شرح محدودیت های مشتری در سند.

منبع: www.habr.com

اضافه کردن نظر