بنابراین، بنابراین، بخش دوم. همانطور که قبلاً نوشته شد، در آن موارد زیر را انجام خواهیم داد:
بیایید یک کلاینت کوچک برای alphavantage در aiohttp با درخواست هایی برای نقاط پایانی مورد نیاز خود بنویسیم.
بیایید یک عامل ایجاد کنیم که دادههای مربوط به اوراق بهادار و متا اطلاعات مربوط به آنها را جمعآوری کند.
اما، این همان کاری است که ما برای خود پروژه انجام خواهیم داد، و از نظر تحقیقات فاست، نحوه نوشتن عواملی که رویدادها را از کافکا پردازش می کنند و همچنین نحوه نوشتن دستورات (کلیک wrapper) را در مورد ما یاد خواهیم گرفت - برای پیامهای فشار دستی به موضوعی که عامل نظارت میکند.
پرورش
کلاینت AlphaVantage
ابتدا، بیایید یک مشتری aiohttp کوچک برای درخواستهای alphavantage بنویسیم.
API AlphaVantage کاملاً ساده و زیبا طراحی شده است، بنابراین تصمیم گرفتم تمام درخواست ها را از طریق این روش انجام دهم construct_query جایی که به نوبه خود یک تماس http وجود دارد.
من همه زمینه ها را به snake_case برای آسودگی.
خوب، دکوراسیون logger.catch برای خروجی ردیابی زیبا و آموزنده.
PS فراموش نکنید که توکن alphavantage را به صورت محلی به config.yml اضافه کنید یا متغیر محیطی را صادر کنید. HORTON_SERVICE_APIKEY. ما یک نشانه دریافت می کنیم اینجا.
کلاس CRUD
ما یک مجموعه اوراق بهادار برای ذخیره اطلاعات متا در مورد اوراق بهادار خواهیم داشت.
در حال حاضر ما ساده ترین ایجاد برنامه را خواهیم داشت، کمی بعد آن را گسترش خواهیم داد، اما برای اینکه شما را منتظر نگذاریم، در اینجا منابع به کلاس App. من همچنین به شما توصیه می کنم به کلاس تنظیمات نگاهی بیندازید، زیرا این کلاس مسئول اکثر تنظیمات است.
بنابراین، ابتدا شی برنامه faust را دریافت می کنیم - بسیار ساده است. در مرحله بعد، ما به صراحت یک موضوع را برای نماینده خود اعلام می کنیم... در اینجا لازم به ذکر است که چیست، پارامتر داخلی چیست و چگونه می توان این موضوع را متفاوت ترتیب داد.
موضوعات در کافکا، اگر بخواهیم تعریف دقیق آن را بدانیم، بهتر است بخوانیم خاموش سند، یا می توانید بخوانید چکیده در Habré به زبان روسی، جایی که همه چیز نیز کاملاً دقیق منعکس شده است :)
پارامتر داخلی، که به خوبی در faust doc توضیح داده شده است، به ما امکان می دهد موضوع را مستقیماً در کد پیکربندی کنیم، البته، این به معنای پارامترهای ارائه شده توسط توسعه دهندگان faust است، به عنوان مثال: حفظ، سیاست حفظ (به طور پیش فرض حذف، اما می توانید تنظیم کنید جمع و جورتعداد پارتیشن ها در هر موضوع (نمراتبرای انجام، برای مثال، کمتر از اهمیت جهانی برنامه های کاربردی فاست).
به طور کلی، عامل می تواند یک موضوع مدیریت شده با مقادیر جهانی ایجاد کند، با این حال، من دوست دارم همه چیز را به صراحت اعلام کنم. علاوه بر این، برخی از پارامترها (به عنوان مثال، تعداد پارتیشن ها یا خط مشی حفظ) موضوع در تبلیغ نماینده قابل پیکربندی نیستند.
در اینجا ممکن است بدون تعریف دستی موضوع به نظر برسد:
خب، حالا بیایید توضیح دهیم که نماینده ما چه خواهد کرد :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
بنابراین، در ابتدای کارگزار، یک جلسه aiohttp برای درخواست ها از طریق مشتری خود باز می کنیم. بنابراین، هنگام راهاندازی یک Worker، هنگامی که عامل ما راهاندازی میشود، بلافاصله یک جلسه باز میشود - یک جلسه، برای تمام مدت زمانی که کارگر در حال اجرا است (یا چندین، اگر پارامتر را تغییر دهید. همزمانی از یک عامل با واحد پیش فرض).
در مرحله بعد، جریان را دنبال می کنیم (پیام را در آن قرار می دهیم _از آنجایی که ما در این عامل به محتوای) پیامهای موضوع خود اهمیت نمیدهیم، در صورتی که در افست فعلی وجود داشته باشند، در غیر این صورت چرخه ما منتظر رسیدن آنها خواهد بود. خوب، در داخل حلقه خود، دریافت پیام را ثبت می کنیم، لیستی از اوراق بهادار فعال (get_securities فقط به طور پیش فرض فعال است، کد مشتری را ببینید) دریافت می کنیم و آن را در پایگاه داده ذخیره می کنیم، بررسی می کنیم که آیا امنیتی با همان تیک وجود دارد یا خیر و تبادل در پایگاه داده، اگر وجود داشته باشد، آن (کاغذ) به سادگی به روز می شود.
بیایید خلقت خود را راه اندازی کنیم!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
ویژگی های PS جزء وب من فاوست را در مقالات در نظر نخواهم گرفت، بنابراین ما پرچم مناسب را تعیین می کنیم.
در دستور راهاندازی، به فاست گفتیم که کجا به دنبال شی برنامه باشد و با آن چه کاری انجام دهد (کارگر را راه اندازی کند) با سطح خروجی گزارش اطلاعات. خروجی زیر را دریافت می کنیم:
بیایید به مجموعه پارتیشن نگاه کنیم. همانطور که می بینیم، یک موضوع با نامی که در کد تعیین کردیم، تعداد پیش فرض پارتیشن ها ایجاد شد (8، برگرفته از موضوع_پارتیشن ها - پارامتر شی برنامه)، از آنجایی که ما یک مقدار جداگانه برای موضوع خود (از طریق پارتیشن ها) مشخص نکرده ایم. عامل راهاندازی شده در worker به هر 8 پارتیشن اختصاص داده میشود، زیرا تنها پارتیشن است، اما در قسمت مربوط به خوشهبندی با جزئیات بیشتر در مورد آن صحبت خواهد شد.
خوب، اکنون می توانیم به پنجره ترمینال دیگری برویم و یک پیام خالی به موضوع خود ارسال کنیم:
PS با استفاده از @ ما نشان میدهیم که در حال ارسال پیام به موضوعی به نام "collect_securities" هستیم.
در این مورد، پیام به پارتیشن 6 رفت - می توانید با رفتن به kafdrop on این را بررسی کنید localhost:9000
با رفتن به پنجره ترمینال با کارگر خود، پیام شادی را خواهیم دید که با استفاده از loguru ارسال شده است:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
همچنین میتوانیم به mongo نگاهی بیندازیم (با استفاده از Robo3T یا Studio3T) و ببینیم که اوراق بهادار در پایگاه داده هستند:
من میلیاردر نیستم و بنابراین به اولین گزینه مشاهده بسنده می کنیم.
شادی و شادی - اولین نماینده آماده است :)
نماینده آماده است، زنده باد نماینده جدید!
بله، آقایان، ما فقط 1/3 از مسیری که در این مقاله تهیه شده است را طی کرده ایم، اما ناامید نباشید، زیرا اکنون آسانتر خواهد شد.
بنابراین اکنون ما به یک عامل نیاز داریم که اطلاعات متا را جمع آوری کرده و در یک سند مجموعه قرار دهد:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
از آنجایی که این عامل اطلاعات مربوط به یک امنیت خاص را پردازش می کند، باید علامت (نماد) این امنیت را در پیام نشان دهیم. برای این منظور در فاوست وجود دارد سوابق - کلاس هایی که طرح پیام را در موضوع عامل اعلام می کنند.
در این مورد، اجازه دهید به records.pyو توضیح دهید که پیام این موضوع چگونه باید باشد:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
همانطور که ممکن است حدس بزنید، فاست از حاشیه نویسی نوع پایتون برای توصیف طرح پیام استفاده می کند، به همین دلیل است که حداقل نسخه پشتیبانی شده توسط کتابخانه 3.6.
بیایید به عامل برگردیم، انواع را تنظیم کرده و اضافه کنیم:
همانطور که می بینید، ما یک پارامتر جدید را با یک طرح به روش مقداردهی_تاپیک ارسال می کنیم. علاوه بر این، همه چیز از همان طرح پیروی می کند، بنابراین من هیچ فایده ای در مورد چیز دیگری نمی بینم.
خوب، آخرین لمس اضافه کردن یک تماس به عامل جمعآوری اطلاعات متا برای collect_securitites است:
ما از طرح اعلام شده قبلی برای پیام استفاده می کنیم. در این مورد از روش .cast استفاده کردم چون نیازی نیست منتظر نتیجه از طرف نماینده باشیم، اما قابل ذکر است که راه ها ارسال پیام به موضوع:
بازیگران - مسدود نمی شود زیرا انتظار نتیجه را ندارد. شما نمی توانید نتیجه را به عنوان پیام به موضوع دیگری ارسال کنید.
ارسال - مسدود نمی کند زیرا انتظار نتیجه را ندارد. شما می توانید یک عامل را در موضوع مشخص کنید که نتیجه به آن خواهد رفت.
بپرس - منتظر نتیجه است. شما می توانید یک عامل را در موضوع مشخص کنید که نتیجه به آن خواهد رفت.
بنابراین، این همه با نمایندگان برای امروز!
تیم رویایی
آخرین چیزی که قول دادم در این قسمت بنویسم دستورات است. همانطور که قبلا ذکر شد، دستورات در faust یک بسته بندی در اطراف کلیک هستند. در واقع، فاست به سادگی دستور سفارشی ما را هنگام تعیین کلید -A به رابط خود متصل می کند
پس از اعلام عوامل در agents.py با دکوراتور یک تابع اضافه کنید app.commandفراخوانی روش انداختن у جمع آوری_ اوراق بهادار:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
بنابراین، اگر لیستی از دستورات را فراخوانی کنیم، دستور جدید ما در آن خواهد بود:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
ما میتوانیم مانند هر کس دیگری از آن استفاده کنیم، بنابراین بیایید faust worker را مجدداً راهاندازی کنیم و مجموعهای کامل از اوراق بهادار را شروع کنیم:
> faust -A horton.agents start-collect-securities
بعد از این چه خواهد شد؟
در قسمت بعدی با استفاده از عوامل باقیمانده به عنوان مثال، مکانیسم سینک برای جستجوی افراط در قیمت های پایانی معاملات برای سال و راه اندازی کرون نمایندگان را در نظر خواهیم گرفت.
PS در قسمت آخر از من در مورد فاوست و کافکای متقابل پرسیده شد (همرو چه ویژگی هایی دارد؟). به نظر می رسد که confluent از بسیاری جهات کاربردی تر است، اما واقعیت این است که faust پشتیبانی کامل مشتری برای confluent ندارد - این نتیجه از شرح محدودیت های مشتری در سند.