أباتشي كافكا وتدفق البيانات مع Spark Streaming

يا هبر! سنقوم اليوم ببناء نظام يعالج تدفقات رسائل Apache Kafka باستخدام Spark Streaming وكتابة نتيجة المعالجة إلى قاعدة بيانات AWS RDS السحابية.

لنتخيل أن مؤسسة ائتمانية معينة تضع أمامنا مهمة معالجة المعاملات الواردة "سريعًا" لجميع فروعها. يمكن القيام بذلك لغرض الحساب السريع لمركز العملة المفتوح للخزانة أو الحدود أو النتيجة المالية للمعاملات ، إلخ.

كيفية تنفيذ هذه الحالة دون استخدام السحر والتعاويذ السحرية - اقرأ تحت القص! يذهب!

أباتشي كافكا وتدفق البيانات مع Spark Streaming
(مصدر الصورة)

مقدمة

بالطبع ، توفر المعالجة في الوقت الفعلي لكمية كبيرة من البيانات فرصًا كبيرة للاستخدام في الأنظمة الحديثة. أحد أكثر التركيبات شيوعًا لهذا هو ترادف Apache Kafka و Spark Streaming ، حيث ينشئ كافكا دفقًا من حزم الرسائل الواردة ، ويقوم Spark Streaming بمعالجة هذه الحزم في فترة زمنية محددة.

لتحسين التسامح مع الخطأ في التطبيق ، سوف نستخدم نقاط التفتيش - نقاط التفتيش. باستخدام هذه الآلية ، عندما تحتاج وحدة Spark Streaming إلى استعادة البيانات المفقودة ، فإنها تحتاج فقط إلى العودة إلى آخر نقطة تفتيش واستئناف الحسابات من هناك.

هندسة النظام المتطور

أباتشي كافكا وتدفق البيانات مع Spark Streaming

المكونات المستخدمة:

  • اباتشي كافكا هو نظام مراسلة موزع للنشر والاشتراك. مناسب لكل من استهلاك الرسائل عبر الإنترنت وغير متصل. لمنع فقدان البيانات ، يتم تخزين رسائل كافكا على القرص وتكرارها داخل الكتلة. تم بناء نظام كافكا على رأس خدمة مزامنة ZooKeeper ؛
  • اباتشي سبارك الجري - مكون شرارة لمعالجة تدفق البيانات. تم تصميم وحدة Spark Streaming باستخدام بنية الدُفعات الصغيرة ، عندما يتم تفسير دفق البيانات على أنه تسلسل مستمر لحزم البيانات الصغيرة. يأخذ Spark Streaming البيانات من مصادر مختلفة ويجمعها في دفعات صغيرة. يتم إنشاء حزم جديدة على فترات منتظمة. في بداية كل فترة زمنية ، يتم إنشاء حزمة جديدة ، ويتم تضمين أي بيانات يتم تلقيها خلال تلك الفترة في الحزمة. في نهاية الفاصل الزمني ، يتوقف نمو الحزمة. يتم تحديد حجم الفاصل الزمني بواسطة معلمة تسمى الفاصل الزمني للدفعة ؛
  • أباتشي سبارك SQL - يجمع بين المعالجة العلائقية والبرمجة الوظيفية Spark. تشير البيانات المهيكلة إلى البيانات التي تحتوي على مخطط ، أي مجموعة واحدة من الحقول لجميع السجلات. يدعم Spark SQL الإدخال من مجموعة متنوعة من مصادر البيانات المنظمة ، وبسبب وجود معلومات المخطط ، يمكنه استرداد حقول السجلات المطلوبة بكفاءة فقط ، كما يوفر واجهات برمجة تطبيقات DataFrame ؛
  • أوس رديز هي قاعدة بيانات علائقية قائمة على السحابة وغير مكلفة نسبيًا ، وهي خدمة ويب تبسط الإعداد والتشغيل والتوسيع ، وتديرها أمازون مباشرة.

تثبيت وتشغيل خادم كافكا

قبل استخدام كافكا مباشرة ، عليك التأكد من أن لديك جافا ، لأن يستخدم JVM للعمل:

sudo apt-get update 
sudo apt-get install default-jre
java -version

لنقم بإنشاء مستخدم جديد للعمل مع كافكا:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

بعد ذلك ، قم بتنزيل مجموعة أدوات التوزيع من موقع Apache Kafka الرسمي:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

فك ضغط الأرشيف الذي تم تنزيله:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

الخطوة التالية اختيارية. الحقيقة هي أن الإعدادات الافتراضية لا تسمح لك باستخدام جميع ميزات Apache Kafka بشكل كامل. على سبيل المثال ، احذف موضوعًا أو فئة أو مجموعة يمكن نشر الرسائل عليها. لتغيير هذا ، دعنا نعدل ملف التكوين:

vim ~/kafka/config/server.properties

يضاف ما يلي إلى نهاية الملف:

delete.topic.enable = true

قبل بدء تشغيل خادم كافكا ، تحتاج إلى بدء تشغيل خادم ZooKeeper ، وسنستخدم النص المساعد الذي يأتي مع توزيعة كافكا:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

بعد أن بدأ ZooKeeper بنجاح ، بدأنا تشغيل خادم كافكا في محطة طرفية منفصلة:

bin/kafka-server-start.sh config/server.properties

لنقم بإنشاء موضوع جديد يسمى المعاملات:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

دعنا نتأكد من إنشاء موضوع بالعدد المطلوب من الأقسام والنسخ المتماثل:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

أباتشي كافكا وتدفق البيانات مع Spark Streaming

دعنا نفوت لحظات اختبار المنتج والمستهلك للموضوع الذي تم إنشاؤه حديثًا. مزيد من التفاصيل حول كيفية اختبار إرسال الرسائل واستلامها مكتوبة في الوثائق الرسمية - أرسل بعض الرسائل. حسنًا ، نحن ننتقل إلى كتابة منتج في Python باستخدام KafkaProducer API.

كتابة المنتج

سينشئ المنتج بيانات عشوائية - 100 رسالة كل ثانية. نعني بالبيانات العشوائية قاموس يتكون من ثلاثة حقول:

  • الفرع - اسم نقطة البيع للمؤسسة الائتمانية ؛
  • العملة - عملة تداولية؛
  • المقدار - قيمة التحويل. سيكون المبلغ موجبًا إذا كانت عملية شراء عملة من قبل البنك ، وسالبة إذا كانت عملية بيع.

يبدو رمز المنتج كما يلي:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

بعد ذلك ، باستخدام طريقة الإرسال ، نرسل رسالة إلى الخادم ، إلى الموضوع الذي نحتاجه ، بتنسيق JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

عند تشغيل البرنامج النصي ، نحصل على الرسائل التالية في المحطة:

أباتشي كافكا وتدفق البيانات مع Spark Streaming

هذا يعني أن كل شيء يعمل كما أردنا - يقوم المنتج بإنشاء وإرسال الرسائل إلى الموضوع الذي نحتاجه.
الخطوة التالية هي تثبيت Spark ومعالجة تدفق هذه الرسالة.

تثبيت اباتشي سبارك

أباتشي سبارك هي عبارة عن منصة حوسبة عنقودية متعددة الاستخدامات وعالية الأداء.

يتفوق Spark على عمليات التنفيذ الشائعة لنموذج MapReduce من حيث الأداء ، مع توفير الدعم لمجموعة واسعة من أنواع الحسابات ، بما في ذلك الاستعلامات التفاعلية والتدفق. تلعب السرعة دورًا مهمًا عند معالجة كميات كبيرة من البيانات ، حيث إنها السرعة التي تتيح لك العمل بشكل تفاعلي دون قضاء دقائق أو ساعات في الانتظار. واحدة من أكبر نقاط القوة في Spark لتقديم هذه السرعة هي قدرتها على إجراء العمليات الحسابية في الذاكرة.

هذا الإطار مكتوب بلغة Scala ، لذلك تحتاج إلى تثبيته أولاً:

sudo apt-get install scala

قم بتنزيل توزيع Spark من الموقع الرسمي:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

فك الأرشيف:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

أضف المسار إلى Spark إلى ملف bash:

vim ~/.bashrc

أضف الأسطر التالية من خلال المحرر:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

قم بتشغيل الأمر أدناه بعد إجراء التغييرات على bashrc:

source ~/.bashrc

نشر AWS PostgreSQL

يبقى توسيع قاعدة البيانات ، حيث سنقوم بملء المعلومات المعالجة من التدفقات. للقيام بذلك ، سوف نستخدم خدمة AWS RDS.

انتقل إلى وحدة تحكم AWS -> AWS RDS -> قواعد البيانات -> إنشاء قاعدة بيانات:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

حدد PostgreSQL وانقر على زر التالي:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

لأن تم تحليل هذا المثال للأغراض التعليمية فقط ، وسوف نستخدم خادمًا مجانيًا "على الأقل" (المستوى المجاني):
أباتشي كافكا وتدفق البيانات مع Spark Streaming

بعد ذلك ، حدد المربع Free Tier ، وبعد ذلك سنعرض تلقائيًا مثيلًا لفئة t2.micro - على الرغم من ضعفها ، إلا أنها مجانية ومناسبة تمامًا لمهمتنا:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

تتبع أشياء مهمة جدًا: اسم مثيل قاعدة البيانات ، واسم المستخدم الرئيسي وكلمة المرور الخاصة به. دعنا نسمي المثال: myHabrTest ، مستخدم رئيسي: هابر، كلمة المرور: habr 12345 وانقر على زر التالي:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

تحتوي الصفحة التالية على المعلمات المسؤولة عن إمكانية الوصول إلى خادم قاعدة البيانات الخاص بنا من الخارج (الوصول العام) وتوافر المنافذ:

أباتشي كافكا وتدفق البيانات مع Spark Streaming

لنقم بإنشاء إعداد جديد لمجموعة أمان VPC ، والذي سيسمح بالوصول الخارجي إلى خادم قاعدة البيانات الخاص بنا من خلال المنفذ 5432 (PostgreSQL).
دعنا نذهب في نافذة متصفح منفصلة إلى وحدة تحكم AWS في لوحة معلومات VPC -> مجموعات الأمان -> قسم إنشاء مجموعة أمان:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

قمنا بتعيين اسم مجموعة الأمان - PostgreSQL ، وصفًا ، وتحديد VPC الذي يجب أن ترتبط به هذه المجموعة ، ثم انقر فوق الزر إنشاء:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

نقوم بملء قواعد Inbound للمجموعة المنشأة حديثًا للمنفذ 5432 ، كما هو موضح في الصورة أدناه. لا يمكنك تحديد المنفذ يدويًا ، ولكن حدد PostgreSQL من القائمة المنسدلة "النوع".

بالمعنى الدقيق للكلمة ، القيمة :: / 0 تعني توفر حركة المرور الواردة للخادم من جميع أنحاء العالم ، وهذا ليس صحيحًا تمامًا من الناحية القانونية ، ولكن لتحليل المثال ، دعنا نستخدم هذا النهج:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

نعود إلى صفحة المتصفح ، حيث يتم فتح "تكوين الإعدادات المتقدمة" وتحديد مجموعات أمان VPC -> اختر مجموعات أمان VPC الحالية -> PostgreSQL في القسم:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

بعد ذلك ، في قسم خيارات قاعدة البيانات -> اسم قاعدة البيانات -> عيِّن الاسم - habrDB.

يمكننا ترك باقي المعلمات ، باستثناء تعطيل النسخ الاحتياطي (فترة الاحتفاظ بالنسخ الاحتياطي - 0 أيام) والمراقبة وإحصاءات الأداء افتراضيًا. انقر فوق الزر إنشاء قاعدة البيانات:
أباتشي كافكا وتدفق البيانات مع Spark Streaming

معالج الدفق

ستكون المرحلة الأخيرة هي تطوير وظيفة Spark ، والتي ستعالج البيانات الجديدة من كافكا كل ثانيتين وتدخل النتيجة في قاعدة البيانات.

كما هو مذكور أعلاه ، فإن نقاط التفتيش هي الآلية الرئيسية في SparkStreaming التي يجب تكوينها لتوفير التسامح مع الخطأ. سنستخدم نقاط التفتيش ، وفي حالة فشل الإجراء ، ستحتاج وحدة Spark Streaming فقط إلى العودة إلى آخر نقطة تفتيش واستئناف الحسابات منها لاستعادة البيانات المفقودة.

يمكن تمكين نقطة تفتيش عن طريق تعيين دليل على نظام ملفات موثوق به ومتسامح مع الأخطاء (مثل HDFS و S3 وما إلى ذلك) حيث سيتم تخزين معلومات نقطة التفتيش. يتم ذلك ، على سبيل المثال:

streamingContext.checkpoint(checkpointDirectory)

في مثالنا ، سنستخدم النهج التالي ، أي إذا كان دليل نقطة التفتيش موجودًا ، فسيتم إعادة إنشاء السياق من بيانات نقطة التفتيش. إذا لم يكن الدليل موجودًا (أي أنه يتم تنفيذه لأول مرة) ، فسيتم استدعاء وظيفة functionToCreateContext لإنشاء سياق جديد وإعداد DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

نقوم بإنشاء كائن DirectStream للاتصال بموضوع "المعاملة" باستخدام طريقة createDirectStream لمكتبة KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

تحليل البيانات الواردة بتنسيق JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

باستخدام Spark SQL ، نقوم بتجميع بسيط وإخراج النتيجة إلى وحدة التحكم:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

الحصول على نص الاستعلام وتشغيله من خلال Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

ثم نقوم بحفظ البيانات المجمعة المستلمة في جدول في AWS RDS. لحفظ نتائج التجميع في جدول قاعدة بيانات ، سنستخدم طريقة الكتابة لكائن DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

بضع كلمات حول إعداد اتصال بـ AWS RDS. أنشأنا المستخدم وكلمة المرور له في خطوة "نشر AWS PostgreSQL". بصفتك عنوان url لخادم قاعدة البيانات ، يجب عليك استخدام نقطة النهاية ، والتي يتم عرضها في قسم الاتصال والأمان:

أباتشي كافكا وتدفق البيانات مع Spark Streaming

من أجل توصيل Spark و Kafka بشكل صحيح ، يجب عليك تشغيل الوظيفة من خلال smark-submit باستخدام الأداة شرارة تدفق كافكا 0-8_2.11. بالإضافة إلى ذلك ، سنستخدم أيضًا أداة للتفاعل مع قاعدة بيانات PostgreSQL ، وسنمررها عبر الحزم.

لمرونة البرنامج النصي ، سنقوم أيضًا بإخراج اسم خادم الرسائل والموضوع الذي نريد تلقي البيانات منه كمعلمات إدخال.

لذا حان وقت تشغيل النظام واختباره:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

كل شيء على ما يرام! كما ترى في الصورة أدناه ، أثناء تشغيل التطبيق ، يتم عرض نتائج تجميع جديدة كل ثانيتين ، لأننا قمنا بتعيين فاصل التجميع على ثانيتين عندما أنشأنا كائن StreamingContext:

أباتشي كافكا وتدفق البيانات مع Spark Streaming

بعد ذلك ، نقوم بإجراء استعلام بسيط لقاعدة البيانات للتحقق من السجلات في الجدول تدفق_المعاملة:

أباتشي كافكا وتدفق البيانات مع Spark Streaming

اختتام

في هذه المقالة ، تم النظر في مثال على دفق معالجة المعلومات باستخدام Spark Streaming بالتزامن مع Apache Kafka و PostgreSQL. مع النمو في أحجام البيانات من مصادر مختلفة ، من الصعب المبالغة في تقدير القيمة العملية لـ Spark Streaming لإنشاء تطبيقات في الوقت الفعلي وبث مباشر.

يمكنك العثور على كود المصدر الكامل في مستودعي على GitHub جيثب:.

يسعدني مناقشة هذا المقال ، وأتطلع إلى تعليقاتكم ، وأيضًا ، آمل أن أتلقى النقد البناء من جميع القراء المعنيين.

وأتمنى لكم النجاح!

فرع فلسطين. كان من المخطط في الأصل استخدام قاعدة بيانات PostgreSQL محلية ، ولكن نظرًا لحبي لـ AWS ، قررت نقل قاعدة البيانات إلى السحابة. في المقالة التالية حول هذا الموضوع ، سأوضح لك كيفية تنفيذ النظام بالكامل الموضح أعلاه في AWS باستخدام AWS Kinesis و AWS EMR. تابع الأخبار!

المصدر: www.habr.com

إضافة تعليق