مهام الخلفية في Faust ، الجزء الثاني: الوكلاء والفرق

مهام الخلفية في Faust ، الجزء الثاني: الوكلاء والفرق

جدول المحتويات

  1. الجزء الأول: المقدمة

  2. الجزء الثاني: الوكلاء والفرق

ماذا نفعل هنا؟

إذن الجزء الثاني. كما كتبنا سابقًا، سنقوم بما يلي:

  1. لنكتب عميلًا صغيرًا لـ alphavantage على aiohttp مع طلبات لنقاط النهاية التي نحتاجها.

  2. لنقم بإنشاء وكيل يقوم بجمع البيانات حول الأوراق المالية والمعلومات التعريفية الخاصة بها.

ولكن، هذا ما سنفعله للمشروع نفسه، وفيما يتعلق بالبحث الدقيق، سنتعلم كيفية كتابة الوكلاء الذين يعالجون أحداث الدفق من كافكا، وكذلك كيفية كتابة الأوامر (انقر فوق المجمع)، في حالتنا - لرسائل الدفع اليدوية إلى الموضوع الذي يراقبه الوكيل.

تدريب

عميل AlphaVantage

أولاً، لنكتب عميل aiohttp صغيرًا لطلبات alphavantage.

alphavantage.py

المفسد

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

في الواقع كل شيء واضح منه:

  1. تم تصميم AlphaVantage API بكل بساطة وبشكل جميل، لذلك قررت تقديم جميع الطلبات من خلال هذه الطريقة construct_query حيث يوجد بدوره اتصال http.

  2. أحمل كل الحقول إلى snake_case للراحة.

  3. حسنًا، زخرفة logger.catch للحصول على نتائج تتبع جميلة وغنية بالمعلومات.

ملحوظة: لا تنس إضافة رمز alphavantage محليًا إلى config.yml، أو تصدير متغير البيئة HORTON_SERVICE_APIKEY. نحصل على رمز مميز هنا.

فئة الخام

سيكون لدينا مجموعة من الأوراق المالية لتخزين المعلومات الوصفية حول الأوراق المالية.

قاعدة البيانات/security.py

في رأيي، ليست هناك حاجة لشرح أي شيء هنا، والطبقة الأساسية نفسها بسيطة للغاية.

get_app()

دعونا نضيف وظيفة لإنشاء كائن تطبيق فيه app.py

المفسد

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

في الوقت الحالي، سيكون لدينا أبسط إنشاء تطبيق، وبعد قليل سنقوم بتوسيعه، ومع ذلك، حتى لا نجعلك تنتظر، هنا مراجع إلى فئة التطبيق. أنصحك أيضًا بإلقاء نظرة على فئة الإعدادات، لأنها مسؤولة عن معظم الإعدادات.

الجزء الرئيسي

وكيل لجمع والحفاظ على قائمة الأوراق المالية

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

لذا، أولاً نحصل على كائن تطبيق فاوست - إنه بسيط جدًا. بعد ذلك، نعلن بوضوح عن موضوع لوكيلنا... هنا تجدر الإشارة إلى ما هو عليه، وما هي المعلمة الداخلية وكيف يمكن ترتيب ذلك بشكل مختلف.

  1. موضوعات في كافكا، إذا أردنا معرفة التعريف الدقيق، فمن الأفضل أن نقرأها عن. وثيقة، أو يمكنك القراءة خلاصة وافية على حبري باللغة الروسية، حيث ينعكس كل شيء أيضًا بدقة تامة :)

  2. المعلمة داخلية، الموصوف جيدًا في مستند Faust، يسمح لنا بتكوين الموضوع مباشرة في الكود، وهذا يعني بالطبع المعلمات التي يوفرها مطورو Faust، على سبيل المثال: الاحتفاظ، وسياسة الاحتفاظ (حذف افتراضي، ولكن يمكنك تعيين اتفاق)، عدد الأقسام لكل موضوع (عشراتللقيام، على سبيل المثال، أقل من أهمية عالمية التطبيقات فاوست).

  3. بشكل عام، يمكن للوكيل إنشاء موضوع مُدار بقيم عالمية، ومع ذلك، أحب أن أعلن كل شيء بشكل صريح. بالإضافة إلى ذلك، لا يمكن تكوين بعض المعلمات (على سبيل المثال، عدد الأقسام أو سياسة الاستبقاء) للموضوع في إعلان الوكيل.

    إليك ما قد يبدو عليه دون تحديد الموضوع يدويًا:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

حسنًا، لنصف الآن ما سيفعله وكيلنا :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

لذلك، في بداية الوكيل، نفتح جلسة aiohttp للطلبات من خلال عميلنا. وبالتالي، عند بدء عامل، عند تشغيل وكيلنا، سيتم فتح جلسة على الفور - جلسة واحدة طوال الوقت الذي يعمل فيه العامل (أو عدة جلسات، إذا قمت بتغيير المعلمة التزامن من وكيل بوحدة افتراضية).

بعد ذلك، نتبع الدفق (نضع الرسالة في _، نظرًا لأننا في هذا الوكيل لا نهتم بمحتوى) الرسائل من موضوعنا، إذا كانت موجودة في الإزاحة الحالية، وإلا فستنتظر دورتنا وصولها. حسنًا، داخل حلقتنا، نقوم بتسجيل استلام الرسالة، والحصول على قائمة بالأوراق المالية النشطة (get_securities تُرجع فقط النشطة افتراضيًا، راجع رمز العميل) وحفظها في قاعدة البيانات، والتحقق مما إذا كان هناك أمان بنفس المؤشر و تبادل في قاعدة البيانات، إذا كان هناك، فسيتم تحديثه (الورقة) ببساطة.

دعونا نطلق خلقنا!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

ميزات PS مكون الويب لن أعتبر فاوست في المقالات، لذلك قمنا بوضع العلم المناسب.

في أمر الإطلاق، أخبرنا فاوست بمكان البحث عن كائن التطبيق وما يجب فعله به (تشغيل عامل) بمستوى إخراج سجل المعلومات. نحصل على الإخراج التالي:

المفسد

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

انه حي!!!

دعونا نلقي نظرة على مجموعة الأقسام. كما نرى، تم إنشاء موضوع بالاسم الذي حددناه في الكود، العدد الافتراضي للأقسام (8، مأخوذ من theme_partitions - معلمة كائن التطبيق)، لأننا لم نحدد قيمة فردية لموضوعنا (عبر الأقسام). يتم تعيين كافة الأقسام الثمانية للوكيل الذي تم إطلاقه في العامل، نظرًا لأنه الوحيد، ولكن سيتم مناقشة ذلك بمزيد من التفصيل في الجزء الخاص بالتجميع.

حسنًا، يمكننا الآن الانتقال إلى نافذة طرفية أخرى وإرسال رسالة فارغة إلى موضوعنا:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

ملاحظة باستخدام @ نظهر أننا نرسل رسالة إلى موضوع اسمه "collect_securities".

في هذه الحالة، انتقلت الرسالة إلى القسم 6 - يمكنك التحقق من ذلك بالانتقال إلى kafdrop on localhost:9000

بالذهاب إلى نافذة المحطة مع العامل لدينا، سنرى رسالة سعيدة مرسلة باستخدام loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

يمكننا أيضًا النظر في mongo (باستخدام Robo3T أو Studio3T) ونرى أن الأوراق المالية موجودة في قاعدة البيانات:

أنا لست مليارديرا، وبالتالي نحن راضون عن خيار المشاهدة الأول.

مهام الخلفية في Faust ، الجزء الثاني: الوكلاء والفرقمهام الخلفية في Faust ، الجزء الثاني: الوكلاء والفرق

السعادة والفرح - الوكيل الأول جاهز :)

الوكيل جاهز، يعيش الوكيل الجديد!

نعم أيها السادة، لقد غطينا ثلث المسار الذي أعدته هذه المقالة فقط، لكن لا تثبطوا، لأن الأمر الآن سيكون أسهل.

إذن نحن الآن بحاجة إلى وكيل يجمع المعلومات التعريفية ويضعها في مستند التجميع:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

نظرًا لأن هذا الوكيل سيقوم بمعالجة معلومات حول أمان معين، فنحن بحاجة إلى الإشارة إلى شريط (رمز) هذا الأمان في الرسالة. لهذا الغرض في فاوست هناك تسجيل — الفئات التي تعلن عن مخطط الرسائل في موضوع الوكيل.

في هذه الحالة، دعنا نذهب إلى Records.py ووصف الشكل الذي يجب أن تبدو عليه الرسالة الخاصة بهذا الموضوع:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

كما كنت قد خمنت، يستخدم فاوست التعليق التوضيحي من نوع python لوصف مخطط الرسالة، ولهذا السبب فإن الحد الأدنى للإصدار الذي تدعمه المكتبة هو 3.6.

دعنا نعود إلى الوكيل ونضبط الأنواع ونضيفها:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

كما ترون، نقوم بتمرير معلمة جديدة بمخطط إلى طريقة تهيئة الموضوع - value_type. علاوة على ذلك، كل شيء يتبع نفس المخطط، لذلك لا أرى أي فائدة في الخوض في أي شيء آخر.

حسنًا، اللمسة الأخيرة هي إضافة استدعاء إلى وكيل جمع المعلومات التعريفية لـ Collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

نستخدم المخطط المعلن مسبقًا للرسالة. في هذه الحالة استخدمت طريقة .cast لأننا لا نحتاج لانتظار النتيجة من الوكيل، لكن الجدير بالذكر أن طرق أرسل رسالة إلى الموضوع:

  1. يلقي - لا يمنع لأنه لا يتوقع نتيجة. لا يمكنك إرسال النتيجة إلى موضوع آخر كرسالة.

  2. إرسال - لا يحظر لأنه لا يتوقع نتيجة. يمكنك تحديد وكيل في الموضوع الذي ستنتقل إليه النتيجة.

  3. اسأل - ينتظر النتيجة. يمكنك تحديد وكيل في الموضوع الذي ستنتقل إليه النتيجة.

هذا كل ما في الأمر مع الوكلاء لهذا اليوم!

فريق الحلم

آخر شيء وعدت بكتابته في هذا الجزء هو الأوامر. كما ذكرنا سابقًا، فإن الأوامر الموجودة في فاوست عبارة عن غلاف حول النقر. في الواقع، يقوم فاوست ببساطة بإرفاق أمرنا المخصص بالواجهة الخاصة به عند تحديد المفتاح -A

بعد إعلان الوكلاء في agents.py إضافة وظيفة مع الديكور app.commandاستدعاء الطريقة ألقى у Collect_Securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

وبالتالي، إذا استدعينا قائمة الأوامر، فسيكون أمرنا الجديد فيها:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

يمكننا استخدامه مثل أي شخص آخر، لذلك دعونا نعيد تشغيل العامل الفاسد ونبدأ في مجموعة كاملة من الأوراق المالية:

> faust -A horton.agents start-collect-securities

ماذا سيحدث بعد ذلك؟

في الجزء التالي، باستخدام الوكلاء المتبقين كمثال، سننظر في آلية الحوض للبحث عن الحدود المتطرفة في أسعار إغلاق التداول لهذا العام وإطلاق الوكلاء.

هذا كل شيء لهذا اليوم! شكرا للقراءة :)

رمز لهذا الجزء

مهام الخلفية في Faust ، الجزء الثاني: الوكلاء والفرق

ملاحظة: في الجزء الأخير سُئلت عن كافكا فاوست ومتكدس (ما هي الميزات لا متموجة لديها؟). يبدو أن المتكدسة أكثر وظيفية في العديد من النواحي، ولكن الحقيقة هي أن فاوست لا يتمتع بدعم كامل من قبل العميل - وهذا يتبع من وصف قيود العميل في الوثيقة.

المصدر: www.habr.com

إضافة تعليق