آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

سلام، هابر! امروز سیستمی خواهیم ساخت که جریان های پیام آپاچی کافکا را با استفاده از Spark Streaming پردازش کرده و نتایج پردازش را در پایگاه داده ابری AWS RDS می نویسد.

بیایید تصور کنیم که یک مؤسسه اعتباری خاص وظیفه پردازش تراکنش‌های دریافتی را «در حال پرواز» در همه شعبه‌هایش برای ما تعیین می‌کند. این می تواند به منظور محاسبه سریع موقعیت ارز باز برای خزانه، محدودیت ها یا نتایج مالی معاملات و غیره انجام شود.

نحوه اجرای این مورد بدون استفاده از جادو و طلسم جادویی - زیر برش بخوانید! برو!

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming
(منبع تصویر)

معرفی

البته پردازش حجم زیادی از داده ها در زمان واقعی فرصت های فراوانی را برای استفاده در سیستم های مدرن فراهم می کند. یکی از محبوب‌ترین ترکیب‌ها برای این کار، پشت سر هم آپاچی کافکا و اسپارک استریمینگ است، جایی که کافکا جریانی از بسته‌های پیام ورودی را ایجاد می‌کند و Spark Streaming این بسته‌ها را در یک بازه زمانی معین پردازش می‌کند.

برای افزایش تحمل خطای اپلیکیشن، از چک پوینت ها استفاده می کنیم. با این مکانیسم، زمانی که موتور Spark Streaming نیاز به بازیابی اطلاعات از دست رفته دارد، فقط باید به آخرین بازرسی برگردد و محاسبات را از آنجا از سر بگیرد.

معماری سیستم توسعه یافته

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

اجزای مورد استفاده:

  • آپاچی کافکا یک سیستم پیام رسانی انتشار-اشتراک توزیع شده است. مناسب برای مصرف پیام های آفلاین و آنلاین. برای جلوگیری از از دست دادن داده ها، پیام های کافکا روی دیسک ذخیره می شوند و در کلاستر تکرار می شوند. سیستم کافکا در بالای سرویس همگام سازی ZooKeeper ساخته شده است.
  • Apache Spark Streaming - جزء جرقه برای پردازش داده های جریان. ماژول Spark Streaming با استفاده از معماری میکرو دسته ای ساخته شده است، جایی که جریان داده به عنوان یک دنباله پیوسته از بسته های داده کوچک تفسیر می شود. Spark Streaming داده ها را از منابع مختلف می گیرد و آن ها را در بسته های کوچک ترکیب می کند. بسته های جدید در فواصل زمانی معین ایجاد می شوند. در ابتدای هر بازه زمانی، یک بسته جدید ایجاد می شود و هر داده ای که در آن بازه زمانی دریافت می شود در بسته گنجانده می شود. در پایان بازه، رشد بسته متوقف می شود. اندازه فاصله توسط پارامتری به نام فاصله دسته ای تعیین می شود.
  • Apache Spark SQL - ترکیبی از پردازش رابطه ای با برنامه نویسی تابعی Spark. داده‌های ساختاریافته به معنای داده‌هایی است که دارای یک طرحواره هستند، یعنی مجموعه‌ای از فیلدها برای همه رکوردها. Spark SQL از ورودی‌های مختلف منابع داده ساختاریافته پشتیبانی می‌کند و به لطف در دسترس بودن اطلاعات طرحواره، می‌تواند به طور موثر فقط فیلدهای مورد نیاز رکوردها را بازیابی کند و همچنین APIهای DataFrame را ارائه می‌کند.
  • AWS RDS یک پایگاه داده رابطه‌ای مبتنی بر ابر نسبتاً ارزان، سرویس وب است که راه‌اندازی، عملیات و مقیاس‌بندی را ساده می‌کند و مستقیماً توسط آمازون مدیریت می‌شود.

نصب و راه اندازی سرور کافکا

قبل از استفاده مستقیم از کافکا، باید مطمئن شوید که جاوا دارید، زیرا... JVM برای کارهای زیر استفاده می شود:

sudo apt-get update 
sudo apt-get install default-jre
java -version

بیایید یک کاربر جدید برای کار با کافکا ایجاد کنیم:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

در ادامه، توزیع را از وب سایت رسمی آپاچی کافکا دانلود کنید:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

آرشیو دانلود شده را باز کنید:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

مرحله بعدی اختیاری است. واقعیت این است که تنظیمات پیش فرض به شما امکان استفاده کامل از تمام ویژگی های آپاچی کافکا را نمی دهد. به عنوان مثال، یک موضوع، دسته، گروهی را که می توان پیام ها را برای آنها منتشر کرد حذف کنید. برای تغییر این، اجازه دهید فایل پیکربندی را ویرایش کنیم:

vim ~/kafka/config/server.properties

موارد زیر را به انتهای فایل اضافه کنید:

delete.topic.enable = true

قبل از راه اندازی سرور کافکا، باید سرور ZooKeeper را راه اندازی کنید؛ ما از اسکریپت کمکی همراه با توزیع کافکا استفاده خواهیم کرد:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

پس از اینکه ZooKeeper با موفقیت راه اندازی شد، سرور کافکا را در یک ترمینال جداگانه راه اندازی کنید:

bin/kafka-server-start.sh config/server.properties

بیایید یک موضوع جدید به نام تراکنش ایجاد کنیم:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

بیایید مطمئن شویم که موضوعی با تعداد پارتیشن و تکرار مورد نیاز ایجاد شده است:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

بیایید لحظه های آزمایش تولید کننده و مصرف کننده برای موضوع جدید ایجاد شده را از دست بدهیم. جزئیات بیشتر در مورد نحوه آزمایش ارسال و دریافت پیام در اسناد رسمی نوشته شده است - چند پیام بفرست. خوب، ما به نوشتن یک تولید کننده در پایتون با استفاده از KafkaProducer API می رویم.

نگارش تهیه کننده

تولید کننده داده های تصادفی تولید می کند - 100 پیام در هر ثانیه. منظور ما از داده های تصادفی یک فرهنگ لغت متشکل از سه قسمت است:

  • شاخه - نام محل فروش موسسه اعتباری؛
  • واحد پول - ارز معامله؛
  • میزان - مقدار تراکنش. مبلغ در صورت خرید ارز توسط بانک عدد مثبت و در صورت فروش عدد منفی خواهد بود.

کد تولید کننده به صورت زیر است:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

در مرحله بعد، با استفاده از روش ارسال، پیامی را به سرور، به موضوع مورد نیاز خود، با فرمت JSON ارسال می کنیم:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

هنگام اجرای اسکریپت، پیام های زیر را در ترمینال دریافت می کنیم:

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

این بدان معنی است که همه چیز همانطور که ما می خواستیم کار می کند - تولید کننده پیام هایی را به موضوع مورد نیاز ما تولید می کند و ارسال می کند.
مرحله بعدی نصب Spark و پردازش این جریان پیام است.

نصب آپاچی اسپارک

جرقه آپاچی یک پلت فرم محاسباتی خوشه ای جهانی و با کارایی بالا است.

Spark بهتر از پیاده‌سازی‌های محبوب مدل MapReduce عمل می‌کند در حالی که از طیف وسیع‌تری از انواع محاسبات، از جمله پرس و جوهای تعاملی و پردازش جریانی پشتیبانی می‌کند. سرعت در پردازش مقادیر زیادی داده نقش مهمی ایفا می کند، زیرا سرعت است که به شما امکان می دهد بدون صرف دقیقه یا ساعت انتظار به صورت تعاملی کار کنید. یکی از بزرگترین نقاط قوت Spark که آن را بسیار سریع می کند، توانایی آن در انجام محاسبات درون حافظه است.

این فریم ورک در اسکالا نوشته شده است، بنابراین ابتدا باید آن را نصب کنید:

sudo apt-get install scala

توزیع Spark را از وب سایت رسمی دانلود کنید:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

آرشیو را باز کنید:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

مسیر Spark را به فایل bash اضافه کنید:

vim ~/.bashrc

خطوط زیر را از طریق ویرایشگر اضافه کنید:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

پس از ایجاد تغییرات در bashrc دستور زیر را اجرا کنید:

source ~/.bashrc

استقرار AWS PostgreSQL

تنها چیزی که باقی می ماند این است که پایگاه داده را مستقر کنیم که در آن اطلاعات پردازش شده را از جریان ها بارگذاری کنیم. برای این کار از سرویس AWS RDS استفاده خواهیم کرد.

به کنسول AWS -> AWS RDS -> پایگاه‌های داده -> ایجاد پایگاه داده بروید:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

PostgreSQL را انتخاب کرده و روی Next کلیک کنید:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

زیرا این مثال فقط برای اهداف آموزشی است؛ ما از یک سرور رایگان "حداقل" (سطح رایگان) استفاده خواهیم کرد:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

در مرحله بعد، یک تیک در بلوک Free Tier قرار می دهیم و پس از آن به طور خودکار نمونه ای از کلاس t2.micro به ما پیشنهاد می شود - اگرچه ضعیف است، اما رایگان است و برای کار ما کاملاً مناسب است:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

موارد بسیار مهمی هستند: نام نمونه پایگاه داده، نام کاربر اصلی و رمز عبور او. بیایید نمونه را نامگذاری کنیم: myHabrTest، کاربر اصلی: هبر، کلمه عبور: habr12345 و روی دکمه Next کلیک کنید:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

در صفحه بعد پارامترهایی وجود دارد که مسئول دسترسی به سرور پایگاه داده ما از خارج (دسترسی عمومی) و در دسترس بودن پورت هستند:

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

بیایید یک تنظیم جدید برای گروه امنیتی VPC ایجاد کنیم، که اجازه دسترسی خارجی به سرور پایگاه داده ما از طریق پورت 5432 (PostgreSQL) را می دهد.
بیایید در یک پنجره مرورگر جداگانه به کنسول AWS برویم به داشبورد VPC -> گروه‌های امنیتی -> بخش ایجاد گروه امنیتی:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

ما نام گروه Security - PostgreSQL را تعیین می کنیم، یک توضیح، نشان می دهد که این گروه باید با کدام VPC مرتبط باشد و روی دکمه ایجاد کلیک کنید:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

همانطور که در تصویر زیر نشان داده شده است، قوانین ورودی پورت 5432 را برای گروه تازه ایجاد شده پر کنید. شما نمی توانید پورت را به صورت دستی تعیین کنید، اما PostgreSQL را از لیست کشویی Type انتخاب کنید.

به بیان دقیق، مقدار ::/0 به معنای در دسترس بودن ترافیک ورودی به سرور از سرتاسر جهان است، که بطور متعارف کاملاً درست نیست، اما برای تجزیه و تحلیل مثال، اجازه دهید از این رویکرد استفاده کنیم:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

ما به صفحه مرورگر باز می گردیم، جایی که "پیکربندی تنظیمات پیشرفته" را باز کرده ایم و در بخش گروه های امنیتی VPC -> انتخاب گروه های امنیتی VPC موجود -> PostgreSQL را انتخاب می کنیم:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

بعد، در گزینه های پایگاه داده -> نام پایگاه داده -> نام را تنظیم کنید - habrDB.

می‌توانیم پارامترهای باقی‌مانده را به استثنای غیرفعال کردن پشتیبان‌گیری (دوره نگهداری پشتیبان - 0 روز)، نظارت و بینش عملکرد، به‌طور پیش‌فرض رها کنیم. روی دکمه کلیک کنید پایگاه داده ایجاد کنید:
آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

کنترل کننده نخ

مرحله آخر توسعه یک کار اسپارک است که داده های جدیدی را که از کافکا هر دو ثانیه یکبار پردازش می کند و نتیجه را در پایگاه داده وارد می کند.

همانطور که در بالا ذکر شد، نقاط بازرسی مکانیزم اصلی در SparkStreaming هستند که باید برای اطمینان از تحمل خطا پیکربندی شوند. ما از چک پوینت‌ها استفاده می‌کنیم و در صورت عدم موفقیت رویه، ماژول Spark Streaming فقط باید به آخرین چکپوینت برگردد و محاسبات را از سر بگیرد تا اطلاعات از دست رفته را بازیابی کند.

چک پوینت را می توان با تنظیم دایرکتوری روی یک سیستم فایل قابل تحمل و قابل اعتماد (مانند HDFS، S3 و غیره) که در آن اطلاعات ایست بازرسی ذخیره می شود، فعال کرد. این کار با استفاده از موارد زیر انجام می شود:

streamingContext.checkpoint(checkpointDirectory)

در مثال ما از روش زیر استفاده خواهیم کرد، یعنی اگر checkpointDirectory وجود داشته باشد، پس زمینه از داده های چک پوینت دوباره ایجاد می شود. اگر دایرکتوری وجود نداشته باشد (به عنوان مثال برای اولین بار اجرا می شود)، سپس functionToCreateContext برای ایجاد یک زمینه جدید و پیکربندی DStreams فراخوانی می شود:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

ما یک شی DirectStream برای اتصال به مبحث تراکنش با استفاده از متد createDirectStream کتابخانه KafkaUtils ایجاد می کنیم:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

تجزیه داده های ورودی در قالب JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

با استفاده از Spark SQL، یک گروه بندی ساده انجام می دهیم و نتیجه را در کنسول نمایش می دهیم:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

دریافت متن پرس و جو و اجرای آن از طریق Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

و سپس داده های جمع آوری شده را در جدولی در AWS RDS ذخیره می کنیم. برای ذخیره نتایج تجمیع در جدول پایگاه داده، از روش نوشتن شی DataFrame استفاده می کنیم:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

چند کلمه در مورد راه اندازی اتصال به AWS RDS. ما کاربر و رمز عبور آن را در مرحله «استقرار AWS PostgreSQL» ایجاد کردیم. شما باید از Endpoint به عنوان url سرور پایگاه داده استفاده کنید که در بخش Connectivity & Security نمایش داده می شود:

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

برای اتصال صحیح اسپارک و کافکا، باید کار را از طریق smark-submit با استفاده از آرتیفکت اجرا کنید. جرقه-استریم-کافکا-0-8_2.11. علاوه بر این، ما همچنین از یک مصنوع برای تعامل با پایگاه داده PostgreSQL استفاده خواهیم کرد؛ ما آنها را از طریق بسته های -- منتقل خواهیم کرد.

برای انعطاف‌پذیری اسکریپت، نام سرور پیام و موضوعی را که می‌خواهیم از آن داده دریافت کنیم، به عنوان پارامترهای ورودی نیز درج می‌کنیم.

بنابراین، زمان راه اندازی و بررسی عملکرد سیستم است:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

همه چیز درست شد! همانطور که در تصویر زیر می بینید، در حالی که برنامه در حال اجرا است، نتایج جمع آوری جدید هر 2 ثانیه یکبار خروجی می شود، زیرا زمانی که شی StreamingContext را ایجاد کردیم، فاصله دسته بندی را روی 2 ثانیه تنظیم کردیم:

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

در مرحله بعد، یک پرس و جو ساده به پایگاه داده برای بررسی وجود رکوردها در جدول انجام می دهیم تراکنش_جریان:

آپاچی کافکا و پردازش داده های جریانی با Spark Streaming

نتیجه

این مقاله به نمونه‌ای از پردازش جریانی اطلاعات با استفاده از Spark Streaming در ارتباط با Apache Kafka و PostgreSQL پرداخته است. با رشد داده‌ها از منابع مختلف، برآورد ارزش عملی Spark Streaming برای ایجاد برنامه‌های پخش جریانی و بلادرنگ دشوار است.

شما می توانید کد منبع کامل را در مخزن من پیدا کنید GitHub.

خوشحالم که در مورد این مقاله بحث می کنم، منتظر نظرات شما هستم و همچنین امیدوارم انتقاد سازنده همه خوانندگان دلسوز را دریافت کنم.

برای شما آرزوی موفقیت دارم!

ص. در ابتدا قرار بود از یک پایگاه داده محلی PostgreSQL استفاده شود، اما با توجه به عشق من به AWS، تصمیم گرفتم پایگاه داده را به ابر منتقل کنم. در مقاله بعدی در مورد این موضوع، نحوه پیاده سازی کل سیستم توضیح داده شده در بالا در AWS با استفاده از AWS Kinesis و AWS EMR را نشان خواهم داد. اخبار را دنبال کنید!

منبع: www.habr.com

اضافه کردن نظر