إذن الجزء الثاني. كما كتبنا سابقًا، سنقوم بما يلي:
لنكتب عميلًا صغيرًا لـ alphavantage على aiohttp مع طلبات لنقاط النهاية التي نحتاجها.
لنقم بإنشاء وكيل يقوم بجمع البيانات حول الأوراق المالية والمعلومات التعريفية الخاصة بها.
ولكن، هذا ما سنفعله للمشروع نفسه، وفيما يتعلق بالبحث الدقيق، سنتعلم كيفية كتابة الوكلاء الذين يعالجون أحداث الدفق من كافكا، وكذلك كيفية كتابة الأوامر (انقر فوق المجمع)، في حالتنا - لرسائل الدفع اليدوية إلى الموضوع الذي يراقبه الوكيل.
في الوقت الحالي، سيكون لدينا أبسط إنشاء تطبيق، وبعد قليل سنقوم بتوسيعه، ومع ذلك، حتى لا نجعلك تنتظر، هنا مراجع إلى فئة التطبيق. أنصحك أيضًا بإلقاء نظرة على فئة الإعدادات، لأنها مسؤولة عن معظم الإعدادات.
لذا، أولاً نحصل على كائن تطبيق فاوست - إنه بسيط جدًا. بعد ذلك، نعلن بوضوح عن موضوع لوكيلنا... هنا تجدر الإشارة إلى ما هو عليه، وما هي المعلمة الداخلية وكيف يمكن ترتيب ذلك بشكل مختلف.
موضوعات في كافكا، إذا أردنا معرفة التعريف الدقيق، فمن الأفضل أن نقرأها عن. وثيقة، أو يمكنك القراءة خلاصة وافية على حبري باللغة الروسية، حيث ينعكس كل شيء أيضًا بدقة تامة :)
المعلمة داخلية، الموصوف جيدًا في مستند Faust، يسمح لنا بتكوين الموضوع مباشرة في الكود، وهذا يعني بالطبع المعلمات التي يوفرها مطورو Faust، على سبيل المثال: الاحتفاظ، وسياسة الاحتفاظ (حذف افتراضي، ولكن يمكنك تعيين اتفاق)، عدد الأقسام لكل موضوع (عشراتللقيام، على سبيل المثال، أقل من أهمية عالمية التطبيقات فاوست).
بشكل عام، يمكن للوكيل إنشاء موضوع مُدار بقيم عالمية، ومع ذلك، أحب أن أعلن كل شيء بشكل صريح. بالإضافة إلى ذلك، لا يمكن تكوين بعض المعلمات (على سبيل المثال، عدد الأقسام أو سياسة الاستبقاء) للموضوع في إعلان الوكيل.
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
لذلك، في بداية الوكيل، نفتح جلسة aiohttp للطلبات من خلال عميلنا. وبالتالي، عند بدء عامل، عند تشغيل وكيلنا، سيتم فتح جلسة على الفور - جلسة واحدة طوال الوقت الذي يعمل فيه العامل (أو عدة جلسات، إذا قمت بتغيير المعلمة التزامن من وكيل بوحدة افتراضية).
بعد ذلك، نتبع الدفق (نضع الرسالة في _، نظرًا لأننا في هذا الوكيل لا نهتم بمحتوى) الرسائل من موضوعنا، إذا كانت موجودة في الإزاحة الحالية، وإلا فستنتظر دورتنا وصولها. حسنًا، داخل حلقتنا، نقوم بتسجيل استلام الرسالة، والحصول على قائمة بالأوراق المالية النشطة (get_securities تُرجع فقط النشطة افتراضيًا، راجع رمز العميل) وحفظها في قاعدة البيانات، والتحقق مما إذا كان هناك أمان بنفس المؤشر و تبادل في قاعدة البيانات، إذا كان هناك، فسيتم تحديثه (الورقة) ببساطة.
دعونا نطلق خلقنا!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
ميزات PS مكون الويب لن أعتبر فاوست في المقالات، لذلك قمنا بوضع العلم المناسب.
في أمر الإطلاق، أخبرنا فاوست بمكان البحث عن كائن التطبيق وما يجب فعله به (تشغيل عامل) بمستوى إخراج سجل المعلومات. نحصل على الإخراج التالي:
دعونا نلقي نظرة على مجموعة الأقسام. كما نرى، تم إنشاء موضوع بالاسم الذي حددناه في الكود، العدد الافتراضي للأقسام (8، مأخوذ من theme_partitions - معلمة كائن التطبيق)، لأننا لم نحدد قيمة فردية لموضوعنا (عبر الأقسام). يتم تعيين كافة الأقسام الثمانية للوكيل الذي تم إطلاقه في العامل، نظرًا لأنه الوحيد، ولكن سيتم مناقشة ذلك بمزيد من التفصيل في الجزء الخاص بالتجميع.
حسنًا، يمكننا الآن الانتقال إلى نافذة طرفية أخرى وإرسال رسالة فارغة إلى موضوعنا:
ملاحظة باستخدام @ نظهر أننا نرسل رسالة إلى موضوع اسمه "collect_securities".
في هذه الحالة، انتقلت الرسالة إلى القسم 6 - يمكنك التحقق من ذلك بالانتقال إلى kafdrop on localhost:9000
بالذهاب إلى نافذة المحطة مع العامل لدينا، سنرى رسالة سعيدة مرسلة باستخدام loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
يمكننا أيضًا النظر في mongo (باستخدام Robo3T أو Studio3T) ونرى أن الأوراق المالية موجودة في قاعدة البيانات:
أنا لست مليارديرا، وبالتالي نحن راضون عن خيار المشاهدة الأول.
السعادة والفرح - الوكيل الأول جاهز :)
الوكيل جاهز، يعيش الوكيل الجديد!
نعم أيها السادة، لقد غطينا ثلث المسار الذي أعدته هذه المقالة فقط، لكن لا تثبطوا، لأن الأمر الآن سيكون أسهل.
إذن نحن الآن بحاجة إلى وكيل يجمع المعلومات التعريفية ويضعها في مستند التجميع:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
نظرًا لأن هذا الوكيل سيقوم بمعالجة معلومات حول أمان معين، فنحن بحاجة إلى الإشارة إلى شريط (رمز) هذا الأمان في الرسالة. لهذا الغرض في فاوست هناك تسجيل — الفئات التي تعلن عن مخطط الرسائل في موضوع الوكيل.
في هذه الحالة، دعنا نذهب إلى Records.pyووصف الشكل الذي يجب أن تبدو عليه الرسالة الخاصة بهذا الموضوع:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
كما كنت قد خمنت، يستخدم فاوست التعليق التوضيحي من نوع python لوصف مخطط الرسالة، ولهذا السبب فإن الحد الأدنى للإصدار الذي تدعمه المكتبة هو 3.6.
كما ترون، نقوم بتمرير معلمة جديدة بمخطط إلى طريقة تهيئة الموضوع - value_type. علاوة على ذلك، كل شيء يتبع نفس المخطط، لذلك لا أرى أي فائدة في الخوض في أي شيء آخر.
حسنًا، اللمسة الأخيرة هي إضافة استدعاء إلى وكيل جمع المعلومات التعريفية لـ Collect_securitites:
نستخدم المخطط المعلن مسبقًا للرسالة. في هذه الحالة استخدمت طريقة .cast لأننا لا نحتاج لانتظار النتيجة من الوكيل، لكن الجدير بالذكر أن طرق أرسل رسالة إلى الموضوع:
يلقي - لا يمنع لأنه لا يتوقع نتيجة. لا يمكنك إرسال النتيجة إلى موضوع آخر كرسالة.
إرسال - لا يحظر لأنه لا يتوقع نتيجة. يمكنك تحديد وكيل في الموضوع الذي ستنتقل إليه النتيجة.
اسأل - ينتظر النتيجة. يمكنك تحديد وكيل في الموضوع الذي ستنتقل إليه النتيجة.
هذا كل ما في الأمر مع الوكلاء لهذا اليوم!
فريق الحلم
آخر شيء وعدت بكتابته في هذا الجزء هو الأوامر. كما ذكرنا سابقًا، فإن الأوامر الموجودة في فاوست عبارة عن غلاف حول النقر. في الواقع، يقوم فاوست ببساطة بإرفاق أمرنا المخصص بالواجهة الخاصة به عند تحديد المفتاح -A
بعد إعلان الوكلاء في agents.py إضافة وظيفة مع الديكور app.commandاستدعاء الطريقة ألقى у Collect_Securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
وبالتالي، إذا استدعينا قائمة الأوامر، فسيكون أمرنا الجديد فيها:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
يمكننا استخدامه مثل أي شخص آخر، لذلك دعونا نعيد تشغيل العامل الفاسد ونبدأ في مجموعة كاملة من الأوراق المالية:
> faust -A horton.agents start-collect-securities
ماذا سيحدث بعد ذلك؟
في الجزء التالي، باستخدام الوكلاء المتبقين كمثال، سننظر في آلية الحوض للبحث عن الحدود المتطرفة في أسعار إغلاق التداول لهذا العام وإطلاق الوكلاء.
ملاحظة: في الجزء الأخير سُئلت عن كافكا فاوست ومتكدس (ما هي الميزات لا متموجة لديها؟). يبدو أن المتكدسة أكثر وظيفية في العديد من النواحي، ولكن الحقيقة هي أن فاوست لا يتمتع بدعم كامل من قبل العميل - وهذا يتبع من وصف قيود العميل في الوثيقة.