ProHoster > وبلاگ > اداره > آپاچی کافکا و پردازش داده های جریانی با Spark Streaming
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming
سلام، هابر! امروز سیستمی خواهیم ساخت که جریان های پیام آپاچی کافکا را با استفاده از Spark Streaming پردازش کرده و نتایج پردازش را در پایگاه داده ابری AWS RDS می نویسد.
بیایید تصور کنیم که یک مؤسسه اعتباری خاص وظیفه پردازش تراکنشهای دریافتی را «در حال پرواز» در همه شعبههایش برای ما تعیین میکند. این می تواند به منظور محاسبه سریع موقعیت ارز باز برای خزانه، محدودیت ها یا نتایج مالی معاملات و غیره انجام شود.
نحوه اجرای این مورد بدون استفاده از جادو و طلسم جادویی - زیر برش بخوانید! برو!
البته پردازش حجم زیادی از داده ها در زمان واقعی فرصت های فراوانی را برای استفاده در سیستم های مدرن فراهم می کند. یکی از محبوبترین ترکیبها برای این کار، پشت سر هم آپاچی کافکا و اسپارک استریمینگ است، جایی که کافکا جریانی از بستههای پیام ورودی را ایجاد میکند و Spark Streaming این بستهها را در یک بازه زمانی معین پردازش میکند.
برای افزایش تحمل خطای اپلیکیشن، از چک پوینت ها استفاده می کنیم. با این مکانیسم، زمانی که موتور Spark Streaming نیاز به بازیابی اطلاعات از دست رفته دارد، فقط باید به آخرین بازرسی برگردد و محاسبات را از آنجا از سر بگیرد.
معماری سیستم توسعه یافته
اجزای مورد استفاده:
آپاچی کافکا یک سیستم پیام رسانی انتشار-اشتراک توزیع شده است. مناسب برای مصرف پیام های آفلاین و آنلاین. برای جلوگیری از از دست دادن داده ها، پیام های کافکا روی دیسک ذخیره می شوند و در کلاستر تکرار می شوند. سیستم کافکا در بالای سرویس همگام سازی ZooKeeper ساخته شده است.
Apache Spark Streaming - جزء جرقه برای پردازش داده های جریان. ماژول Spark Streaming با استفاده از معماری میکرو دسته ای ساخته شده است، جایی که جریان داده به عنوان یک دنباله پیوسته از بسته های داده کوچک تفسیر می شود. Spark Streaming داده ها را از منابع مختلف می گیرد و آن ها را در بسته های کوچک ترکیب می کند. بسته های جدید در فواصل زمانی معین ایجاد می شوند. در ابتدای هر بازه زمانی، یک بسته جدید ایجاد می شود و هر داده ای که در آن بازه زمانی دریافت می شود در بسته گنجانده می شود. در پایان بازه، رشد بسته متوقف می شود. اندازه فاصله توسط پارامتری به نام فاصله دسته ای تعیین می شود.
Apache Spark SQL - ترکیبی از پردازش رابطه ای با برنامه نویسی تابعی Spark. دادههای ساختاریافته به معنای دادههایی است که دارای یک طرحواره هستند، یعنی مجموعهای از فیلدها برای همه رکوردها. Spark SQL از ورودیهای مختلف منابع داده ساختاریافته پشتیبانی میکند و به لطف در دسترس بودن اطلاعات طرحواره، میتواند به طور موثر فقط فیلدهای مورد نیاز رکوردها را بازیابی کند و همچنین APIهای DataFrame را ارائه میکند.
AWS RDS یک پایگاه داده رابطهای مبتنی بر ابر نسبتاً ارزان، سرویس وب است که راهاندازی، عملیات و مقیاسبندی را ساده میکند و مستقیماً توسط آمازون مدیریت میشود.
نصب و راه اندازی سرور کافکا
قبل از استفاده مستقیم از کافکا، باید مطمئن شوید که جاوا دارید، زیرا... JVM برای کارهای زیر استفاده می شود:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
مرحله بعدی اختیاری است. واقعیت این است که تنظیمات پیش فرض به شما امکان استفاده کامل از تمام ویژگی های آپاچی کافکا را نمی دهد. به عنوان مثال، یک موضوع، دسته، گروهی را که می توان پیام ها را برای آنها منتشر کرد حذف کنید. برای تغییر این، اجازه دهید فایل پیکربندی را ویرایش کنیم:
vim ~/kafka/config/server.properties
موارد زیر را به انتهای فایل اضافه کنید:
delete.topic.enable = true
قبل از راه اندازی سرور کافکا، باید سرور ZooKeeper را راه اندازی کنید؛ ما از اسکریپت کمکی همراه با توزیع کافکا استفاده خواهیم کرد:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
پس از اینکه ZooKeeper با موفقیت راه اندازی شد، سرور کافکا را در یک ترمینال جداگانه راه اندازی کنید:
بیایید لحظه های آزمایش تولید کننده و مصرف کننده برای موضوع جدید ایجاد شده را از دست بدهیم. جزئیات بیشتر در مورد نحوه آزمایش ارسال و دریافت پیام در اسناد رسمی نوشته شده است - چند پیام بفرست. خوب، ما به نوشتن یک تولید کننده در پایتون با استفاده از KafkaProducer API می رویم.
نگارش تهیه کننده
تولید کننده داده های تصادفی تولید می کند - 100 پیام در هر ثانیه. منظور ما از داده های تصادفی یک فرهنگ لغت متشکل از سه قسمت است:
شاخه - نام محل فروش موسسه اعتباری؛
واحد پول - ارز معامله؛
میزان - مقدار تراکنش. مبلغ در صورت خرید ارز توسط بانک عدد مثبت و در صورت فروش عدد منفی خواهد بود.
در مرحله بعد، با استفاده از روش ارسال، پیامی را به سرور، به موضوع مورد نیاز خود، با فرمت JSON ارسال می کنیم:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
هنگام اجرای اسکریپت، پیام های زیر را در ترمینال دریافت می کنیم:
این بدان معنی است که همه چیز همانطور که ما می خواستیم کار می کند - تولید کننده پیام هایی را به موضوع مورد نیاز ما تولید می کند و ارسال می کند.
مرحله بعدی نصب Spark و پردازش این جریان پیام است.
نصب آپاچی اسپارک
جرقه آپاچی یک پلت فرم محاسباتی خوشه ای جهانی و با کارایی بالا است.
Spark بهتر از پیادهسازیهای محبوب مدل MapReduce عمل میکند در حالی که از طیف وسیعتری از انواع محاسبات، از جمله پرس و جوهای تعاملی و پردازش جریانی پشتیبانی میکند. سرعت در پردازش مقادیر زیادی داده نقش مهمی ایفا می کند، زیرا سرعت است که به شما امکان می دهد بدون صرف دقیقه یا ساعت انتظار به صورت تعاملی کار کنید. یکی از بزرگترین نقاط قوت Spark که آن را بسیار سریع می کند، توانایی آن در انجام محاسبات درون حافظه است.
این فریم ورک در اسکالا نوشته شده است، بنابراین ابتدا باید آن را نصب کنید:
پس از ایجاد تغییرات در bashrc دستور زیر را اجرا کنید:
source ~/.bashrc
استقرار AWS PostgreSQL
تنها چیزی که باقی می ماند این است که پایگاه داده را مستقر کنیم که در آن اطلاعات پردازش شده را از جریان ها بارگذاری کنیم. برای این کار از سرویس AWS RDS استفاده خواهیم کرد.
به کنسول AWS -> AWS RDS -> پایگاههای داده -> ایجاد پایگاه داده بروید:
PostgreSQL را انتخاب کرده و روی Next کلیک کنید:
زیرا این مثال فقط برای اهداف آموزشی است؛ ما از یک سرور رایگان "حداقل" (سطح رایگان) استفاده خواهیم کرد:
در مرحله بعد، یک تیک در بلوک Free Tier قرار می دهیم و پس از آن به طور خودکار نمونه ای از کلاس t2.micro به ما پیشنهاد می شود - اگرچه ضعیف است، اما رایگان است و برای کار ما کاملاً مناسب است:
موارد بسیار مهمی هستند: نام نمونه پایگاه داده، نام کاربر اصلی و رمز عبور او. بیایید نمونه را نامگذاری کنیم: myHabrTest، کاربر اصلی: هبر، کلمه عبور: habr12345 و روی دکمه Next کلیک کنید:
در صفحه بعد پارامترهایی وجود دارد که مسئول دسترسی به سرور پایگاه داده ما از خارج (دسترسی عمومی) و در دسترس بودن پورت هستند:
بیایید یک تنظیم جدید برای گروه امنیتی VPC ایجاد کنیم، که اجازه دسترسی خارجی به سرور پایگاه داده ما از طریق پورت 5432 (PostgreSQL) را می دهد.
بیایید در یک پنجره مرورگر جداگانه به کنسول AWS برویم به داشبورد VPC -> گروههای امنیتی -> بخش ایجاد گروه امنیتی:
ما نام گروه Security - PostgreSQL را تعیین می کنیم، یک توضیح، نشان می دهد که این گروه باید با کدام VPC مرتبط باشد و روی دکمه ایجاد کلیک کنید:
همانطور که در تصویر زیر نشان داده شده است، قوانین ورودی پورت 5432 را برای گروه تازه ایجاد شده پر کنید. شما نمی توانید پورت را به صورت دستی تعیین کنید، اما PostgreSQL را از لیست کشویی Type انتخاب کنید.
به بیان دقیق، مقدار ::/0 به معنای در دسترس بودن ترافیک ورودی به سرور از سرتاسر جهان است، که بطور متعارف کاملاً درست نیست، اما برای تجزیه و تحلیل مثال، اجازه دهید از این رویکرد استفاده کنیم:
ما به صفحه مرورگر باز می گردیم، جایی که "پیکربندی تنظیمات پیشرفته" را باز کرده ایم و در بخش گروه های امنیتی VPC -> انتخاب گروه های امنیتی VPC موجود -> PostgreSQL را انتخاب می کنیم:
بعد، در گزینه های پایگاه داده -> نام پایگاه داده -> نام را تنظیم کنید - habrDB.
میتوانیم پارامترهای باقیمانده را به استثنای غیرفعال کردن پشتیبانگیری (دوره نگهداری پشتیبان - 0 روز)، نظارت و بینش عملکرد، بهطور پیشفرض رها کنیم. روی دکمه کلیک کنید پایگاه داده ایجاد کنید:
کنترل کننده نخ
مرحله آخر توسعه یک کار اسپارک است که داده های جدیدی را که از کافکا هر دو ثانیه یکبار پردازش می کند و نتیجه را در پایگاه داده وارد می کند.
همانطور که در بالا ذکر شد، نقاط بازرسی مکانیزم اصلی در SparkStreaming هستند که باید برای اطمینان از تحمل خطا پیکربندی شوند. ما از چک پوینتها استفاده میکنیم و در صورت عدم موفقیت رویه، ماژول Spark Streaming فقط باید به آخرین چکپوینت برگردد و محاسبات را از سر بگیرد تا اطلاعات از دست رفته را بازیابی کند.
چک پوینت را می توان با تنظیم دایرکتوری روی یک سیستم فایل قابل تحمل و قابل اعتماد (مانند HDFS، S3 و غیره) که در آن اطلاعات ایست بازرسی ذخیره می شود، فعال کرد. این کار با استفاده از موارد زیر انجام می شود:
streamingContext.checkpoint(checkpointDirectory)
در مثال ما از روش زیر استفاده خواهیم کرد، یعنی اگر checkpointDirectory وجود داشته باشد، پس زمینه از داده های چک پوینت دوباره ایجاد می شود. اگر دایرکتوری وجود نداشته باشد (به عنوان مثال برای اولین بار اجرا می شود)، سپس functionToCreateContext برای ایجاد یک زمینه جدید و پیکربندی DStreams فراخوانی می شود:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
ما یک شی DirectStream برای اتصال به مبحث تراکنش با استفاده از متد createDirectStream کتابخانه KafkaUtils ایجاد می کنیم:
با استفاده از Spark SQL، یک گروه بندی ساده انجام می دهیم و نتیجه را در کنسول نمایش می دهیم:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
و سپس داده های جمع آوری شده را در جدولی در AWS RDS ذخیره می کنیم. برای ذخیره نتایج تجمیع در جدول پایگاه داده، از روش نوشتن شی DataFrame استفاده می کنیم:
چند کلمه در مورد راه اندازی اتصال به AWS RDS. ما کاربر و رمز عبور آن را در مرحله «استقرار AWS PostgreSQL» ایجاد کردیم. شما باید از Endpoint به عنوان url سرور پایگاه داده استفاده کنید که در بخش Connectivity & Security نمایش داده می شود:
برای اتصال صحیح اسپارک و کافکا، باید کار را از طریق smark-submit با استفاده از آرتیفکت اجرا کنید. جرقه-استریم-کافکا-0-8_2.11. علاوه بر این، ما همچنین از یک مصنوع برای تعامل با پایگاه داده PostgreSQL استفاده خواهیم کرد؛ ما آنها را از طریق بسته های -- منتقل خواهیم کرد.
برای انعطافپذیری اسکریپت، نام سرور پیام و موضوعی را که میخواهیم از آن داده دریافت کنیم، به عنوان پارامترهای ورودی نیز درج میکنیم.
بنابراین، زمان راه اندازی و بررسی عملکرد سیستم است:
همه چیز درست شد! همانطور که در تصویر زیر می بینید، در حالی که برنامه در حال اجرا است، نتایج جمع آوری جدید هر 2 ثانیه یکبار خروجی می شود، زیرا زمانی که شی StreamingContext را ایجاد کردیم، فاصله دسته بندی را روی 2 ثانیه تنظیم کردیم:
در مرحله بعد، یک پرس و جو ساده به پایگاه داده برای بررسی وجود رکوردها در جدول انجام می دهیم تراکنش_جریان:
نتیجه
این مقاله به نمونهای از پردازش جریانی اطلاعات با استفاده از Spark Streaming در ارتباط با Apache Kafka و PostgreSQL پرداخته است. با رشد دادهها از منابع مختلف، برآورد ارزش عملی Spark Streaming برای ایجاد برنامههای پخش جریانی و بلادرنگ دشوار است.
شما می توانید کد منبع کامل را در مخزن من پیدا کنید GitHub.
خوشحالم که در مورد این مقاله بحث می کنم، منتظر نظرات شما هستم و همچنین امیدوارم انتقاد سازنده همه خوانندگان دلسوز را دریافت کنم.
برای شما آرزوی موفقیت دارم!
ص. در ابتدا قرار بود از یک پایگاه داده محلی PostgreSQL استفاده شود، اما با توجه به عشق من به AWS، تصمیم گرفتم پایگاه داده را به ابر منتقل کنم. در مقاله بعدی در مورد این موضوع، نحوه پیاده سازی کل سیستم توضیح داده شده در بالا در AWS با استفاده از AWS Kinesis و AWS EMR را نشان خواهم داد. اخبار را دنبال کنید!