ProHoster > بلوق > إدارة > أباتشي كافكا وتدفق البيانات مع Spark Streaming
أباتشي كافكا وتدفق البيانات مع Spark Streaming
يا هبر! سنقوم اليوم ببناء نظام يعالج تدفقات رسائل Apache Kafka باستخدام Spark Streaming وكتابة نتيجة المعالجة إلى قاعدة بيانات AWS RDS السحابية.
لنتخيل أن مؤسسة ائتمانية معينة تضع أمامنا مهمة معالجة المعاملات الواردة "سريعًا" لجميع فروعها. يمكن القيام بذلك لغرض الحساب السريع لمركز العملة المفتوح للخزانة أو الحدود أو النتيجة المالية للمعاملات ، إلخ.
كيفية تنفيذ هذه الحالة دون استخدام السحر والتعاويذ السحرية - اقرأ تحت القص! يذهب!
بالطبع ، توفر المعالجة في الوقت الفعلي لكمية كبيرة من البيانات فرصًا كبيرة للاستخدام في الأنظمة الحديثة. أحد أكثر التركيبات شيوعًا لهذا هو ترادف Apache Kafka و Spark Streaming ، حيث ينشئ كافكا دفقًا من حزم الرسائل الواردة ، ويقوم Spark Streaming بمعالجة هذه الحزم في فترة زمنية محددة.
لتحسين التسامح مع الخطأ في التطبيق ، سوف نستخدم نقاط التفتيش - نقاط التفتيش. باستخدام هذه الآلية ، عندما تحتاج وحدة Spark Streaming إلى استعادة البيانات المفقودة ، فإنها تحتاج فقط إلى العودة إلى آخر نقطة تفتيش واستئناف الحسابات من هناك.
هندسة النظام المتطور
المكونات المستخدمة:
اباتشي كافكا هو نظام مراسلة موزع للنشر والاشتراك. مناسب لكل من استهلاك الرسائل عبر الإنترنت وغير متصل. لمنع فقدان البيانات ، يتم تخزين رسائل كافكا على القرص وتكرارها داخل الكتلة. تم بناء نظام كافكا على رأس خدمة مزامنة ZooKeeper ؛
اباتشي سبارك الجري - مكون شرارة لمعالجة تدفق البيانات. تم تصميم وحدة Spark Streaming باستخدام بنية الدُفعات الصغيرة ، عندما يتم تفسير دفق البيانات على أنه تسلسل مستمر لحزم البيانات الصغيرة. يأخذ Spark Streaming البيانات من مصادر مختلفة ويجمعها في دفعات صغيرة. يتم إنشاء حزم جديدة على فترات منتظمة. في بداية كل فترة زمنية ، يتم إنشاء حزمة جديدة ، ويتم تضمين أي بيانات يتم تلقيها خلال تلك الفترة في الحزمة. في نهاية الفاصل الزمني ، يتوقف نمو الحزمة. يتم تحديد حجم الفاصل الزمني بواسطة معلمة تسمى الفاصل الزمني للدفعة ؛
أباتشي سبارك SQL - يجمع بين المعالجة العلائقية والبرمجة الوظيفية Spark. تشير البيانات المهيكلة إلى البيانات التي تحتوي على مخطط ، أي مجموعة واحدة من الحقول لجميع السجلات. يدعم Spark SQL الإدخال من مجموعة متنوعة من مصادر البيانات المنظمة ، وبسبب وجود معلومات المخطط ، يمكنه استرداد حقول السجلات المطلوبة بكفاءة فقط ، كما يوفر واجهات برمجة تطبيقات DataFrame ؛
أوس رديز هي قاعدة بيانات علائقية قائمة على السحابة وغير مكلفة نسبيًا ، وهي خدمة ويب تبسط الإعداد والتشغيل والتوسيع ، وتديرها أمازون مباشرة.
تثبيت وتشغيل خادم كافكا
قبل استخدام كافكا مباشرة ، عليك التأكد من أن لديك جافا ، لأن يستخدم JVM للعمل:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
الخطوة التالية اختيارية. الحقيقة هي أن الإعدادات الافتراضية لا تسمح لك باستخدام جميع ميزات Apache Kafka بشكل كامل. على سبيل المثال ، احذف موضوعًا أو فئة أو مجموعة يمكن نشر الرسائل عليها. لتغيير هذا ، دعنا نعدل ملف التكوين:
vim ~/kafka/config/server.properties
يضاف ما يلي إلى نهاية الملف:
delete.topic.enable = true
قبل بدء تشغيل خادم كافكا ، تحتاج إلى بدء تشغيل خادم ZooKeeper ، وسنستخدم النص المساعد الذي يأتي مع توزيعة كافكا:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
بعد أن بدأ ZooKeeper بنجاح ، بدأنا تشغيل خادم كافكا في محطة طرفية منفصلة:
دعنا نفوت لحظات اختبار المنتج والمستهلك للموضوع الذي تم إنشاؤه حديثًا. مزيد من التفاصيل حول كيفية اختبار إرسال الرسائل واستلامها مكتوبة في الوثائق الرسمية - أرسل بعض الرسائل. حسنًا ، نحن ننتقل إلى كتابة منتج في Python باستخدام KafkaProducer API.
كتابة المنتج
سينشئ المنتج بيانات عشوائية - 100 رسالة كل ثانية. نعني بالبيانات العشوائية قاموس يتكون من ثلاثة حقول:
الفرع - اسم نقطة البيع للمؤسسة الائتمانية ؛
العملة - عملة تداولية؛
المقدار - قيمة التحويل. سيكون المبلغ موجبًا إذا كانت عملية شراء عملة من قبل البنك ، وسالبة إذا كانت عملية بيع.
بعد ذلك ، باستخدام طريقة الإرسال ، نرسل رسالة إلى الخادم ، إلى الموضوع الذي نحتاجه ، بتنسيق JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
عند تشغيل البرنامج النصي ، نحصل على الرسائل التالية في المحطة:
هذا يعني أن كل شيء يعمل كما أردنا - يقوم المنتج بإنشاء وإرسال الرسائل إلى الموضوع الذي نحتاجه.
الخطوة التالية هي تثبيت Spark ومعالجة تدفق هذه الرسالة.
تثبيت اباتشي سبارك
أباتشي سبارك هي عبارة عن منصة حوسبة عنقودية متعددة الاستخدامات وعالية الأداء.
يتفوق Spark على عمليات التنفيذ الشائعة لنموذج MapReduce من حيث الأداء ، مع توفير الدعم لمجموعة واسعة من أنواع الحسابات ، بما في ذلك الاستعلامات التفاعلية والتدفق. تلعب السرعة دورًا مهمًا عند معالجة كميات كبيرة من البيانات ، حيث إنها السرعة التي تتيح لك العمل بشكل تفاعلي دون قضاء دقائق أو ساعات في الانتظار. واحدة من أكبر نقاط القوة في Spark لتقديم هذه السرعة هي قدرتها على إجراء العمليات الحسابية في الذاكرة.
هذا الإطار مكتوب بلغة Scala ، لذلك تحتاج إلى تثبيته أولاً:
قم بتشغيل الأمر أدناه بعد إجراء التغييرات على bashrc:
source ~/.bashrc
نشر AWS PostgreSQL
يبقى توسيع قاعدة البيانات ، حيث سنقوم بملء المعلومات المعالجة من التدفقات. للقيام بذلك ، سوف نستخدم خدمة AWS RDS.
انتقل إلى وحدة تحكم AWS -> AWS RDS -> قواعد البيانات -> إنشاء قاعدة بيانات:
حدد PostgreSQL وانقر على زر التالي:
لأن تم تحليل هذا المثال للأغراض التعليمية فقط ، وسوف نستخدم خادمًا مجانيًا "على الأقل" (المستوى المجاني):
بعد ذلك ، حدد المربع Free Tier ، وبعد ذلك سنعرض تلقائيًا مثيلًا لفئة t2.micro - على الرغم من ضعفها ، إلا أنها مجانية ومناسبة تمامًا لمهمتنا:
تتبع أشياء مهمة جدًا: اسم مثيل قاعدة البيانات ، واسم المستخدم الرئيسي وكلمة المرور الخاصة به. دعنا نسمي المثال: myHabrTest ، مستخدم رئيسي: هابر، كلمة المرور: habr 12345 وانقر على زر التالي:
تحتوي الصفحة التالية على المعلمات المسؤولة عن إمكانية الوصول إلى خادم قاعدة البيانات الخاص بنا من الخارج (الوصول العام) وتوافر المنافذ:
لنقم بإنشاء إعداد جديد لمجموعة أمان VPC ، والذي سيسمح بالوصول الخارجي إلى خادم قاعدة البيانات الخاص بنا من خلال المنفذ 5432 (PostgreSQL).
دعنا نذهب في نافذة متصفح منفصلة إلى وحدة تحكم AWS في لوحة معلومات VPC -> مجموعات الأمان -> قسم إنشاء مجموعة أمان:
قمنا بتعيين اسم مجموعة الأمان - PostgreSQL ، وصفًا ، وتحديد VPC الذي يجب أن ترتبط به هذه المجموعة ، ثم انقر فوق الزر إنشاء:
نقوم بملء قواعد Inbound للمجموعة المنشأة حديثًا للمنفذ 5432 ، كما هو موضح في الصورة أدناه. لا يمكنك تحديد المنفذ يدويًا ، ولكن حدد PostgreSQL من القائمة المنسدلة "النوع".
بالمعنى الدقيق للكلمة ، القيمة :: / 0 تعني توفر حركة المرور الواردة للخادم من جميع أنحاء العالم ، وهذا ليس صحيحًا تمامًا من الناحية القانونية ، ولكن لتحليل المثال ، دعنا نستخدم هذا النهج:
نعود إلى صفحة المتصفح ، حيث يتم فتح "تكوين الإعدادات المتقدمة" وتحديد مجموعات أمان VPC -> اختر مجموعات أمان VPC الحالية -> PostgreSQL في القسم:
بعد ذلك ، في قسم خيارات قاعدة البيانات -> اسم قاعدة البيانات -> عيِّن الاسم - habrDB.
يمكننا ترك باقي المعلمات ، باستثناء تعطيل النسخ الاحتياطي (فترة الاحتفاظ بالنسخ الاحتياطي - 0 أيام) والمراقبة وإحصاءات الأداء افتراضيًا. انقر فوق الزر إنشاء قاعدة البيانات:
معالج الدفق
ستكون المرحلة الأخيرة هي تطوير وظيفة Spark ، والتي ستعالج البيانات الجديدة من كافكا كل ثانيتين وتدخل النتيجة في قاعدة البيانات.
كما هو مذكور أعلاه ، فإن نقاط التفتيش هي الآلية الرئيسية في SparkStreaming التي يجب تكوينها لتوفير التسامح مع الخطأ. سنستخدم نقاط التفتيش ، وفي حالة فشل الإجراء ، ستحتاج وحدة Spark Streaming فقط إلى العودة إلى آخر نقطة تفتيش واستئناف الحسابات منها لاستعادة البيانات المفقودة.
يمكن تمكين نقطة تفتيش عن طريق تعيين دليل على نظام ملفات موثوق به ومتسامح مع الأخطاء (مثل HDFS و S3 وما إلى ذلك) حيث سيتم تخزين معلومات نقطة التفتيش. يتم ذلك ، على سبيل المثال:
streamingContext.checkpoint(checkpointDirectory)
في مثالنا ، سنستخدم النهج التالي ، أي إذا كان دليل نقطة التفتيش موجودًا ، فسيتم إعادة إنشاء السياق من بيانات نقطة التفتيش. إذا لم يكن الدليل موجودًا (أي أنه يتم تنفيذه لأول مرة) ، فسيتم استدعاء وظيفة functionToCreateContext لإنشاء سياق جديد وإعداد DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
نقوم بإنشاء كائن DirectStream للاتصال بموضوع "المعاملة" باستخدام طريقة createDirectStream لمكتبة KafkaUtils:
باستخدام Spark SQL ، نقوم بتجميع بسيط وإخراج النتيجة إلى وحدة التحكم:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
الحصول على نص الاستعلام وتشغيله من خلال Spark SQL:
بضع كلمات حول إعداد اتصال بـ AWS RDS. أنشأنا المستخدم وكلمة المرور له في خطوة "نشر AWS PostgreSQL". بصفتك عنوان url لخادم قاعدة البيانات ، يجب عليك استخدام نقطة النهاية ، والتي يتم عرضها في قسم الاتصال والأمان:
من أجل توصيل Spark و Kafka بشكل صحيح ، يجب عليك تشغيل الوظيفة من خلال smark-submit باستخدام الأداة شرارة تدفق كافكا 0-8_2.11. بالإضافة إلى ذلك ، سنستخدم أيضًا أداة للتفاعل مع قاعدة بيانات PostgreSQL ، وسنمررها عبر الحزم.
لمرونة البرنامج النصي ، سنقوم أيضًا بإخراج اسم خادم الرسائل والموضوع الذي نريد تلقي البيانات منه كمعلمات إدخال.
كل شيء على ما يرام! كما ترى في الصورة أدناه ، أثناء تشغيل التطبيق ، يتم عرض نتائج تجميع جديدة كل ثانيتين ، لأننا قمنا بتعيين فاصل التجميع على ثانيتين عندما أنشأنا كائن StreamingContext:
بعد ذلك ، نقوم بإجراء استعلام بسيط لقاعدة البيانات للتحقق من السجلات في الجدول تدفق_المعاملة:
اختتام
في هذه المقالة ، تم النظر في مثال على دفق معالجة المعلومات باستخدام Spark Streaming بالتزامن مع Apache Kafka و PostgreSQL. مع النمو في أحجام البيانات من مصادر مختلفة ، من الصعب المبالغة في تقدير القيمة العملية لـ Spark Streaming لإنشاء تطبيقات في الوقت الفعلي وبث مباشر.
يمكنك العثور على كود المصدر الكامل في مستودعي على GitHub جيثب:.
يسعدني مناقشة هذا المقال ، وأتطلع إلى تعليقاتكم ، وأيضًا ، آمل أن أتلقى النقد البناء من جميع القراء المعنيين.
وأتمنى لكم النجاح!
فرع فلسطين. كان من المخطط في الأصل استخدام قاعدة بيانات PostgreSQL محلية ، ولكن نظرًا لحبي لـ AWS ، قررت نقل قاعدة البيانات إلى السحابة. في المقالة التالية حول هذا الموضوع ، سأوضح لك كيفية تنفيذ النظام بالكامل الموضح أعلاه في AWS باستخدام AWS Kinesis و AWS EMR. تابع الأخبار!