اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

سلام، حبر! نن ورځ به موږ یو سیسټم جوړ کړو چې د سپارک سټریمینګ په کارولو سره به د اپاچي کافکا پیغام جریانونه پروسس کړي او د پروسس پایلې د AWS RDS کلاوډ ډیټابیس ته ولیکئ.

راځئ تصور وکړو چې یو ځانګړی کریډیټ اداره موږ ته د خپلو ټولو څانګو په اوږدو کې "په الوتنه کې" د راتلونکو لیږدونو پروسس کولو دنده ټاکي. دا د دې هدف لپاره ترسره کیدی شي چې په سمدستي توګه د خزانې لپاره د خلاص اسعارو موقعیت محاسبه شي ، د لیږد لپاره محدودیتونه یا مالي پایلې ، او داسې نور.

دا قضیه څنګه د جادو او جادو جادو کارولو پرته پلي کول - د کټ لاندې ولولئ! لاړ شه!

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره
(د انځور سرچینه)

پېژندنه

البته، په ریښتیني وخت کې د ډیټا لوی مقدار پروسس کول په عصري سیسټمونو کې د کارولو لپاره کافي فرصتونه چمتو کوي. د دې لپاره یو له خورا مشهور ترکیبونو څخه د اپاچي کافکا او سپارک سټریمینګ ټنډیم دی ، چیرې چې کافکا د راتلونکو پیغامونو کڅوړو جریان رامینځته کوي ، او سپارک سټریمینګ دا پاکټونه په یو ټاکلي وخت وقفه کې پروسس کوي.

د غوښتنلیک د خطا زغم زیاتولو لپاره، موږ به د پوستې څخه کار واخلو. د دې میکانیزم سره، کله چې د سپارک سټریمینګ انجن د ورک شوي ډاټا بیرته ترلاسه کولو ته اړتیا لري، دا یوازې اړتیا لري چې بیرته وروستي پوستې ته لاړ شي او له هغه ځایه حسابونه بیا پیل کړي.

د پرمختللي سیسټم جوړښت

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

کارول شوي اجزا:

  • اپاپي کافيکا د توزیع شوي خپرولو - ګډون کولو پیغام رسولو سیسټم دی. د دواړو آفلاین او آنلاین پیغام مصرف لپاره مناسب. د معلوماتو د ضایع کیدو مخنیوي لپاره، د کافکا پیغامونه په ډیسک کې زیرمه شوي او په کلستر کې نقل شوي. د کافکا سیسټم د ZooKeeper synchronization خدمت په سر کې جوړ شوی دی؛
  • د اپاچي سپارک جریان - د سټیمینګ ډیټا پروسس کولو لپاره سپارک برخه. د سپارک سټریمینګ ماډل د مایکرو بیچ جوړښت په کارولو سره جوړ شوی ، چیرې چې د ډیټا جریان د وړو ډیټا پاکټونو دوامداره ترتیب په توګه تشریح کیږي. سپارک سټریمینګ د مختلفو سرچینو څخه ډاټا اخلي او په کوچنیو کڅوړو کې یې یوځای کوي. نوي کڅوړې په منظم وقفونو کې رامینځته کیږي. د هر وخت وقفې په پیل کې، یو نوی پاکټ رامینځته کیږي، او د دې وقفې په جریان کې ترلاسه شوي معلومات په کڅوړه کې شامل دي. د وقفې په پای کې، د کڅوړې وده ودریږي. د وقفې اندازه د پیرامیټر لخوا ټاکل کیږي چې د بیچ وقفه په نوم یادیږي؛
  • اپاچی سپارک SQL - د سپارک فنکشنل برنامه کولو سره اړونده پروسس کول ترکیب کوي. جوړښت شوي ډاټا معنی لري هغه معلومات چې سکیما لري، دا د ټولو ریکارډونو لپاره د ساحو یو واحد سیټ دی. سپارک ایس کیو ایل د مختلف جوړښت شوي ډیټا سرچینو څخه د معلوماتو ملاتړ کوي او د سکیما معلوماتو شتون څخه مننه ، دا کولی شي په مؤثره توګه یوازې د ریکارډونو اړین ساحې بیرته ترلاسه کړي ، او د ډیټا فریم APIs هم چمتو کوي؛
  • AWS RDS یو نسبتا ارزانه کلاوډ میشته اړونده ډیټابیس دی، ویب خدمت چې تنظیم، عملیات او اندازه کول ساده کوي، او په مستقیم ډول د ایمیزون لخوا اداره کیږي.

د کافکا سرور نصب او چلول

د کافکا مستقیم کارولو دمخه، تاسو اړتیا لرئ ډاډ ترلاسه کړئ چې تاسو جاوا لرئ، ځکه چې ... JVM د کار لپاره کارول کیږي:

sudo apt-get update 
sudo apt-get install default-jre
java -version

راځئ چې د کافکا سره کار کولو لپاره یو نوی کارن جوړ کړو:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

بل ، د اپاچي کافکا رسمي ویب پا fromې څخه توزیع ډاونلوډ کړئ:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ډاونلوډ شوی آرشیف خلاص کړئ:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

بل ګام اختیاري دی. حقیقت دا دی چې ډیفالټ ترتیبات تاسو ته اجازه نه ورکوي چې د اپاچی کافکا ټولې وړتیاوې په بشپړه توګه وکاروي. د مثال په توګه، یوه موضوع، کټګورۍ، ګروپ حذف کړئ چې پیغامونه یې خپاره شي. د دې بدلولو لپاره، راځئ چې د ترتیب کولو فایل ترمیم کړو:

vim ~/kafka/config/server.properties

د فایل په پای کې لاندې اضافه کړئ:

delete.topic.enable = true

د کافکا سرور پیل کولو دمخه، تاسو اړتیا لرئ د زوکیپر سرور پیل کړئ؛ موږ به هغه معاون سکریپټ وکاروو چې د کافکا توزیع سره راځي:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

وروسته له دې چې ZooKeeper په بریالیتوب سره پیل شو، د کافکا سرور په جلا ترمینل کې پیل کړئ:

bin/kafka-server-start.sh config/server.properties

راځئ چې د راکړې ورکړې په نوم یوه نوې موضوع جوړه کړو:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

راځئ ډاډ ترلاسه کړو چې د اړین شمیر برخو او نقل سره یوه موضوع رامینځته شوې:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

راځئ چې د نوي رامینځته شوي موضوع لپاره د تولید کونکي او مصرف کونکي ازموینې شیبې له لاسه ورکړو. د دې په اړه نور توضیحات چې تاسو څنګه کولی شئ د پیغامونو لیږلو او ترلاسه کولو ازموینه وکړئ په رسمي اسنادو کې لیکل شوي - ځینې ​​​​پیغامونه واستوئ. ښه، موږ د KafkaProducer API په کارولو سره په پایتون کې د تولید کونکي لیکلو ته ځو.

د تولیدونکي لیکنه

تولید کونکی به تصادفي ډاټا تولید کړي - په هره ثانیه کې 100 پیغامونه. د تصادفي معلوماتو له مخې موږ یو قاموس معنی لرو چې درې برخې لري:

  • څانګه - د کریډیټ موسسې د پلور ځای نوم؛
  • د اسعارو - د راکړې ورکړې اسعارو؛
  • اندازه - د راکړې ورکړې اندازه. رقم به مثبته شمیره وي که چیرې دا د بانک لخوا د اسعارو پیرود وي ، او منفي شمیره که چیرې پلور وي.

د تولید کونکي لپاره کوډ داسې ښکاري:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

بیا، د لیږلو میتود په کارولو سره، موږ سرور ته یو پیغام لیږو، هغه موضوع ته چې موږ ورته اړتیا لرو، د JSON بڼه کې:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

کله چې سکریپټ چلوي، موږ په ترمینل کې لاندې پیغامونه ترلاسه کوو:

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

دا پدې مانا ده چې هر څه کار کوي لکه څنګه چې موږ غوښتل - تولید کونکی هغه موضوع ته پیغامونه لیږي چې موږ ورته اړتیا لرو.
بل ګام د سپارک نصب کول او د دې پیغام جریان پروسس کول دي.

د اپاچی سپارک نصب کول

اپاپي سپارک یو نړیوال او د لوړ فعالیت کلستر کمپیوټري پلیټ فارم دی.

سپارک د MapReduce ماډل مشهور پلي کولو څخه غوره فعالیت کوي پداسې حال کې چې د کمپیوټري ډولونو پراخه لړۍ ملاتړ کوي ، پشمول د متقابل پوښتنو او جریان پروسس کول. سرعت مهم رول لوبوي کله چې د ډیټا لوی مقدار پروسس کوي ، ځکه چې دا سرعت دی چې تاسو ته اجازه درکوي د دقیقو یا ساعتونو انتظار کولو پرته په متقابل عمل کار وکړئ. د سپارک یو له لوی ځواک څخه چې دا خورا ګړندی کوي د حافظې دننه محاسبې ترسره کولو وړتیا ده.

دا چوکاټ په سکالا کې لیکل شوی، نو تاسو باید لومړی دا نصب کړئ:

sudo apt-get install scala

د رسمي ویب پاڼې څخه د سپارک توزیع ډاونلوډ کړئ:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

آرشیف خلاص کړئ:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

د بش فایل ته سپارک ته لاره اضافه کړئ:

vim ~/.bashrc

د مدیر له لارې لاندې کرښې اضافه کړئ:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

په bashrc کې د بدلونونو وروسته لاندې کمانډ چل کړئ:

source ~/.bashrc

د AWS PostgreSQL ځای په ځای کول

ټول هغه څه چې پاتې دي د ډیټابیس ځای په ځای کول دي چیرې چې موږ به د جریانونو څخه پروسس شوي معلومات اپلوډ کړو. د دې لپاره به موږ د AWS RDS خدمت وکاروو.

د AWS کنسول ته لاړ شئ -> AWS RDS -> ډیټابیس -> ډیټابیس جوړ کړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

PostgreSQL غوره کړئ او بل کلیک وکړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

ځکه دا مثال یوازې د تعلیمي موخو لپاره دی؛ موږ به یو وړیا سرور وکاروو "لږترلږه" (وړیا درجه):
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

بیا، موږ په وړیا ټیر بلاک کې یو ټیک واچوو، او له هغې وروسته به موږ ته په اتوماتيک ډول د t2.micro ټولګي یوه بیلګه وړاندې کړو - که څه هم کمزوری، دا وړیا او زموږ د دندې لپاره مناسب دی:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

بیا ډیر مهم شیان راځي: د ډیټابیس مثال نوم، د ماسټر کارونکي نوم او د هغه پټنوم. راځئ چې د مثال نوم ورکړو: myHabrTest، ماسټر کارن: habr، رمز: habr12345 او په راتلونکی تڼۍ کلیک وکړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

په بله پاڼه کې داسې پیرامیټونه شتون لري چې زموږ د ډیټابیس سرور ته د بهر څخه د لاسرسي لپاره مسؤل دي (عامه لاسرسي) ​​او د بندر شتون:

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

راځئ چې د VPC امنیت ګروپ لپاره یو نوی ترتیب جوړ کړو، کوم چې به د پورټ 5432 (PostgreSQL) له لارې زموږ ډیټابیس سرور ته بهرني لاسرسي ته اجازه ورکړي.
راځئ چې د AWS کنسول په جلا براوزر کړکۍ کې د VPC ډشبورډ ته لاړ شو -> امنیتي ګروپونه -> د امنیت ګروپ برخه جوړه کړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

موږ د امنیت ګروپ لپاره نوم ترتیب کړ - PostgreSQL، یو توضیح، په ګوته کوي چې دا ګروپ باید د کوم VPC سره تړاو ولري او د جوړونې تڼۍ کلیک وکړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

د نوي جوړ شوي ګروپ لپاره د پورټ 5432 لپاره د داخلي قواعدو ډک کړئ، لکه څنګه چې په لاندې انځور کې ښودل شوي. تاسو نشئ کولی په لاسي ډول پورټ مشخص کړئ ، مګر د ډول ډراپ ډاون لیست څخه PostgreSQL غوره کړئ.

په کلکه خبرې کول ، ارزښت ::/0 د ټولې نړۍ څخه سرور ته د راتلوونکي ترافیک شتون معنی لري ، کوم چې په بشپړ ډول ریښتیني ندي ، مګر د مثال تحلیل کولو لپاره ، راځئ ځان ته اجازه ورکړو چې دا طریقه وکاروو:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

موږ د براوزر پا pageې ته راستون شو ، چیرې چې موږ د "پرمختللي ترتیباتو تنظیم کول" خلاص لرو او د VPC امنیت ګروپونو برخه کې غوره کړئ -> موجوده VPC امنیتي ګروپونه غوره کړئ -> PostgreSQL:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

بیا، د ډیټابیس په اختیارونو کې -> د ډیټابیس نوم -> نوم ترتیب کړئ - habrDB.

موږ کولی شو پاتې پیرامیټونه پریږدو ، د بیک اپ غیر فعال کولو استثنا سره (د بیک اپ ساتلو موده - 0 ورځې) ، نظارت او د فعالیت بصیرت ، په ډیفالټ. په تڼۍ کلیک وکړئ ډیټابیس جوړ کړئ:
اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

تار سمبالونکی

وروستنۍ مرحله به د سپارک دندې پراختیا وي، کوم چې به په هرو دوو ثانیو کې د کافکا څخه نوي معلومات پروسس کړي او پایلې به ډیټابیس ته داخل کړي.

لکه څنګه چې پورته یادونه وشوه، پوستې په سپارک سټریمینګ کې یو اصلي میکانیزم دی چې باید د غلطی زغم ډاډمن کولو لپاره ترتیب شي. موږ به د پوستې څخه کار واخلو او، که چیرې کړنلاره ناکامه شي، د سپارک سټیمینګ ماډل به یوازې وروستۍ پوستې ته راستانه شي او د ورک شوي ډاټا بیرته ترلاسه کولو لپاره له هغې څخه حسابونه بیا پیل کړي.

د چک پوائنټ کول د غلطۍ زغمونکي، د اعتبار وړ فایل سیسټم (لکه HDFS، S3، او نور) کې د ډایرکټر په ترتیب کولو سره فعال کیدی شي چې د پوستې معلومات به زیرمه شي. دا په کارولو سره ترسره کیږي، د بیلګې په توګه:

streamingContext.checkpoint(checkpointDirectory)

زموږ په مثال کې، موږ به لاندې طریقه وکاروو، د بیلګې په توګه، که چیرې د پوستې ډایرکټر شتون ولري، نو شرایط به د پوستې ډاټا څخه بیا جوړ شي. که ډایرکټر شتون ونلري (د بیلګې په توګه د لومړي ځل لپاره اعدام شوی)، نو د فنکشن ټو کریټ کانټیکټ ته ویل کیږي چې نوي شرایط رامینځته کړي او DStreams تنظیم کړي:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

موږ د KafkaUtils کتابتون د CreateDirectStream میتود په کارولو سره د "معاملې" موضوع سره د نښلولو لپاره یو DirectStream څیز رامینځته کوو:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

په JSON بڼه کې د راتلونکو معلوماتو تحلیل کول:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

د سپارک ایس کیو ایل په کارولو سره، موږ یو ساده ګروپ جوړوو او پایله یې په کنسول کې ښکاره کوو:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

د پوښتنې متن ترلاسه کول او د سپارک ایس کیو ایل له لارې یې چلول:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

او بیا موږ پایله شوي راټول شوي معلومات په AWS RDS کې په جدول کې خوندي کوو. د ډیټابیس میز ته د راټولولو پایلې خوندي کولو لپاره، موږ به د ډیټا فریم اعتراض لیکلو طریقه وکاروو:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

د AWS RDS سره د پیوستون تنظیم کولو په اړه یو څو ټکي. موږ د دې لپاره کارن او پټنوم د "AWS PostgreSQL ځای په ځای کول" مرحله کې رامینځته کړی. تاسو باید د پای ټکی د ډیټابیس سرور url په توګه وکاروئ، کوم چې د ارتباط او امنیت برخه کې ښودل کیږي:

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

د دې لپاره چې سپارک او کافکا په سمه توګه وصل کړئ، تاسو باید د هنري اثارو په کارولو سره د smark-submit له لارې دنده پرمخ وړئ spark-streaming-kafka-0-8_2.11. برسیره پردې، موږ به د PostgreSQL ډیټابیس سره د تعامل لپاره یو هنر هم وکاروو؛ موږ به یې د --packages له لارې انتقال کړو.

د سکریپټ د انعطاف لپاره، موږ به د ان پټ پیرامیټونو په توګه د پیغام سرور نوم او هغه موضوع هم شامل کړو چې موږ یې ډاټا ترلاسه کول غواړو.

نو، دا وخت دی چې د سیسټم فعالیت پیل او وګورئ:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

هرڅه سم شول! لکه څنګه چې تاسو په لاندې عکس کې لیدلی شئ، پداسې حال کې چې اپلیکیشن روان دی، د راټولولو نوې پایلې په هر 2 ثانیو کې تولید کیږي، ځکه چې موږ د بیچ کولو وقفه 2 ثانیو ته ټاکلې کله چې موږ د StreamingContext اعتراض جوړ کړ:

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

بیا، موږ ډیټابیس ته یوه ساده پوښتنه کوو ترڅو په میز کې د ریکارډونو شتون وڅیړو راکړه ورکړه:

اپاچی کافکا او د سټریمینګ ډیټا پروسس د سپارک سټریمینګ سره

پایلې

دا مقاله د اپاچي کافکا او PostgreSQL سره په ګډه د سپارک سټریمینګ په کارولو سره د معلوماتو جریان پروسس کولو مثال ته وکتل. د مختلفو سرچینو څخه د معلوماتو وده سره، دا ستونزمنه ده چې د سټیمینګ او ریښتیني وخت غوښتنلیکونو رامینځته کولو لپاره د سپارک سټریمینګ عملي ارزښت ډیر کړئ.

تاسو کولی شئ بشپړ سرچینې کوډ زما په ذخیره کې ومومئ GitHub.

زه د دې مقالې په اړه خوشحاله یم، زه ستاسو نظرونو ته سترګې په لار یم، او زه د ټولو پام وړ لوستونکو څخه د رغنده نیوکې هیله لرم.

بریالیتوب مو غواړم!

پی ایس. په پیل کې دا پلان شوی و چې د محلي PostgreSQL ډیټابیس وکاروئ، مګر د AWS سره زما مینه په پام کې نیولو سره، ما پریکړه وکړه چې ډیټابیس بادل ته انتقال کړم. د دې موضوع په راتلونکې مقاله کې، زه به وښیم چې څنګه په AWS کې د AWS Kinesis او AWS EMR په کارولو سره پورته تشریح شوي ټول سیسټم پلي کول. خبرونه تعقیب کړئ!

سرچینه: www.habr.com

Add a comment