سلام، حبر! نن ورځ به موږ یو سیسټم جوړ کړو چې د سپارک سټریمینګ په کارولو سره به د اپاچي کافکا پیغام جریانونه پروسس کړي او د پروسس پایلې د AWS RDS کلاوډ ډیټابیس ته ولیکئ.
راځئ تصور وکړو چې یو ځانګړی کریډیټ اداره موږ ته د خپلو ټولو څانګو په اوږدو کې "په الوتنه کې" د راتلونکو لیږدونو پروسس کولو دنده ټاکي. دا د دې هدف لپاره ترسره کیدی شي چې په سمدستي توګه د خزانې لپاره د خلاص اسعارو موقعیت محاسبه شي ، د لیږد لپاره محدودیتونه یا مالي پایلې ، او داسې نور.
دا قضیه څنګه د جادو او جادو جادو کارولو پرته پلي کول - د کټ لاندې ولولئ! لاړ شه!
پېژندنه
البته، په ریښتیني وخت کې د ډیټا لوی مقدار پروسس کول په عصري سیسټمونو کې د کارولو لپاره کافي فرصتونه چمتو کوي. د دې لپاره یو له خورا مشهور ترکیبونو څخه د اپاچي کافکا او سپارک سټریمینګ ټنډیم دی ، چیرې چې کافکا د راتلونکو پیغامونو کڅوړو جریان رامینځته کوي ، او سپارک سټریمینګ دا پاکټونه په یو ټاکلي وخت وقفه کې پروسس کوي.
د غوښتنلیک د خطا زغم زیاتولو لپاره، موږ به د پوستې څخه کار واخلو. د دې میکانیزم سره، کله چې د سپارک سټریمینګ انجن د ورک شوي ډاټا بیرته ترلاسه کولو ته اړتیا لري، دا یوازې اړتیا لري چې بیرته وروستي پوستې ته لاړ شي او له هغه ځایه حسابونه بیا پیل کړي.
د پرمختللي سیسټم جوړښت
کارول شوي اجزا:
اپاپي کافيکا د توزیع شوي خپرولو - ګډون کولو پیغام رسولو سیسټم دی. د دواړو آفلاین او آنلاین پیغام مصرف لپاره مناسب. د معلوماتو د ضایع کیدو مخنیوي لپاره، د کافکا پیغامونه په ډیسک کې زیرمه شوي او په کلستر کې نقل شوي. د کافکا سیسټم د ZooKeeper synchronization خدمت په سر کې جوړ شوی دی؛د اپاچي سپارک جریان - د سټیمینګ ډیټا پروسس کولو لپاره سپارک برخه. د سپارک سټریمینګ ماډل د مایکرو بیچ جوړښت په کارولو سره جوړ شوی ، چیرې چې د ډیټا جریان د وړو ډیټا پاکټونو دوامداره ترتیب په توګه تشریح کیږي. سپارک سټریمینګ د مختلفو سرچینو څخه ډاټا اخلي او په کوچنیو کڅوړو کې یې یوځای کوي. نوي کڅوړې په منظم وقفونو کې رامینځته کیږي. د هر وخت وقفې په پیل کې، یو نوی پاکټ رامینځته کیږي، او د دې وقفې په جریان کې ترلاسه شوي معلومات په کڅوړه کې شامل دي. د وقفې په پای کې، د کڅوړې وده ودریږي. د وقفې اندازه د پیرامیټر لخوا ټاکل کیږي چې د بیچ وقفه په نوم یادیږي؛اپاچی سپارک SQL - د سپارک فنکشنل برنامه کولو سره اړونده پروسس کول ترکیب کوي. جوړښت شوي ډاټا معنی لري هغه معلومات چې سکیما لري، دا د ټولو ریکارډونو لپاره د ساحو یو واحد سیټ دی. سپارک ایس کیو ایل د مختلف جوړښت شوي ډیټا سرچینو څخه د معلوماتو ملاتړ کوي او د سکیما معلوماتو شتون څخه مننه ، دا کولی شي په مؤثره توګه یوازې د ریکارډونو اړین ساحې بیرته ترلاسه کړي ، او د ډیټا فریم APIs هم چمتو کوي؛AWS RDS یو نسبتا ارزانه کلاوډ میشته اړونده ډیټابیس دی، ویب خدمت چې تنظیم، عملیات او اندازه کول ساده کوي، او په مستقیم ډول د ایمیزون لخوا اداره کیږي.
د کافکا سرور نصب او چلول
د کافکا مستقیم کارولو دمخه، تاسو اړتیا لرئ ډاډ ترلاسه کړئ چې تاسو جاوا لرئ، ځکه چې ... JVM د کار لپاره کارول کیږي:
sudo apt-get update
sudo apt-get install default-jre
java -version
راځئ چې د کافکا سره کار کولو لپاره یو نوی کارن جوړ کړو:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
بل ، د اپاچي کافکا رسمي ویب پا fromې څخه توزیع ډاونلوډ کړئ:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
ډاونلوډ شوی آرشیف خلاص کړئ:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
بل ګام اختیاري دی. حقیقت دا دی چې ډیفالټ ترتیبات تاسو ته اجازه نه ورکوي چې د اپاچی کافکا ټولې وړتیاوې په بشپړه توګه وکاروي. د مثال په توګه، یوه موضوع، کټګورۍ، ګروپ حذف کړئ چې پیغامونه یې خپاره شي. د دې بدلولو لپاره، راځئ چې د ترتیب کولو فایل ترمیم کړو:
vim ~/kafka/config/server.properties
د فایل په پای کې لاندې اضافه کړئ:
delete.topic.enable = true
د کافکا سرور پیل کولو دمخه، تاسو اړتیا لرئ د زوکیپر سرور پیل کړئ؛ موږ به هغه معاون سکریپټ وکاروو چې د کافکا توزیع سره راځي:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
وروسته له دې چې ZooKeeper په بریالیتوب سره پیل شو، د کافکا سرور په جلا ترمینل کې پیل کړئ:
bin/kafka-server-start.sh config/server.properties
راځئ چې د راکړې ورکړې په نوم یوه نوې موضوع جوړه کړو:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
راځئ ډاډ ترلاسه کړو چې د اړین شمیر برخو او نقل سره یوه موضوع رامینځته شوې:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
راځئ چې د نوي رامینځته شوي موضوع لپاره د تولید کونکي او مصرف کونکي ازموینې شیبې له لاسه ورکړو. د دې په اړه نور توضیحات چې تاسو څنګه کولی شئ د پیغامونو لیږلو او ترلاسه کولو ازموینه وکړئ په رسمي اسنادو کې لیکل شوي -
د تولیدونکي لیکنه
تولید کونکی به تصادفي ډاټا تولید کړي - په هره ثانیه کې 100 پیغامونه. د تصادفي معلوماتو له مخې موږ یو قاموس معنی لرو چې درې برخې لري:
- څانګه - د کریډیټ موسسې د پلور ځای نوم؛
- د اسعارو - د راکړې ورکړې اسعارو؛
- اندازه - د راکړې ورکړې اندازه. رقم به مثبته شمیره وي که چیرې دا د بانک لخوا د اسعارو پیرود وي ، او منفي شمیره که چیرې پلور وي.
د تولید کونکي لپاره کوډ داسې ښکاري:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
بیا، د لیږلو میتود په کارولو سره، موږ سرور ته یو پیغام لیږو، هغه موضوع ته چې موږ ورته اړتیا لرو، د JSON بڼه کې:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
کله چې سکریپټ چلوي، موږ په ترمینل کې لاندې پیغامونه ترلاسه کوو:
دا پدې مانا ده چې هر څه کار کوي لکه څنګه چې موږ غوښتل - تولید کونکی هغه موضوع ته پیغامونه لیږي چې موږ ورته اړتیا لرو.
بل ګام د سپارک نصب کول او د دې پیغام جریان پروسس کول دي.
د اپاچی سپارک نصب کول
اپاپي سپارک یو نړیوال او د لوړ فعالیت کلستر کمپیوټري پلیټ فارم دی.
سپارک د MapReduce ماډل مشهور پلي کولو څخه غوره فعالیت کوي پداسې حال کې چې د کمپیوټري ډولونو پراخه لړۍ ملاتړ کوي ، پشمول د متقابل پوښتنو او جریان پروسس کول. سرعت مهم رول لوبوي کله چې د ډیټا لوی مقدار پروسس کوي ، ځکه چې دا سرعت دی چې تاسو ته اجازه درکوي د دقیقو یا ساعتونو انتظار کولو پرته په متقابل عمل کار وکړئ. د سپارک یو له لوی ځواک څخه چې دا خورا ګړندی کوي د حافظې دننه محاسبې ترسره کولو وړتیا ده.
دا چوکاټ په سکالا کې لیکل شوی، نو تاسو باید لومړی دا نصب کړئ:
sudo apt-get install scala
د رسمي ویب پاڼې څخه د سپارک توزیع ډاونلوډ کړئ:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
آرشیف خلاص کړئ:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
د بش فایل ته سپارک ته لاره اضافه کړئ:
vim ~/.bashrc
د مدیر له لارې لاندې کرښې اضافه کړئ:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
په bashrc کې د بدلونونو وروسته لاندې کمانډ چل کړئ:
source ~/.bashrc
د AWS PostgreSQL ځای په ځای کول
ټول هغه څه چې پاتې دي د ډیټابیس ځای په ځای کول دي چیرې چې موږ به د جریانونو څخه پروسس شوي معلومات اپلوډ کړو. د دې لپاره به موږ د AWS RDS خدمت وکاروو.
د AWS کنسول ته لاړ شئ -> AWS RDS -> ډیټابیس -> ډیټابیس جوړ کړئ:
PostgreSQL غوره کړئ او بل کلیک وکړئ:
ځکه دا مثال یوازې د تعلیمي موخو لپاره دی؛ موږ به یو وړیا سرور وکاروو "لږترلږه" (وړیا درجه):
بیا، موږ په وړیا ټیر بلاک کې یو ټیک واچوو، او له هغې وروسته به موږ ته په اتوماتيک ډول د t2.micro ټولګي یوه بیلګه وړاندې کړو - که څه هم کمزوری، دا وړیا او زموږ د دندې لپاره مناسب دی:
بیا ډیر مهم شیان راځي: د ډیټابیس مثال نوم، د ماسټر کارونکي نوم او د هغه پټنوم. راځئ چې د مثال نوم ورکړو: myHabrTest، ماسټر کارن: habr، رمز: habr12345 او په راتلونکی تڼۍ کلیک وکړئ:
په بله پاڼه کې داسې پیرامیټونه شتون لري چې زموږ د ډیټابیس سرور ته د بهر څخه د لاسرسي لپاره مسؤل دي (عامه لاسرسي) او د بندر شتون:
راځئ چې د VPC امنیت ګروپ لپاره یو نوی ترتیب جوړ کړو، کوم چې به د پورټ 5432 (PostgreSQL) له لارې زموږ ډیټابیس سرور ته بهرني لاسرسي ته اجازه ورکړي.
راځئ چې د AWS کنسول په جلا براوزر کړکۍ کې د VPC ډشبورډ ته لاړ شو -> امنیتي ګروپونه -> د امنیت ګروپ برخه جوړه کړئ:
موږ د امنیت ګروپ لپاره نوم ترتیب کړ - PostgreSQL، یو توضیح، په ګوته کوي چې دا ګروپ باید د کوم VPC سره تړاو ولري او د جوړونې تڼۍ کلیک وکړئ:
د نوي جوړ شوي ګروپ لپاره د پورټ 5432 لپاره د داخلي قواعدو ډک کړئ، لکه څنګه چې په لاندې انځور کې ښودل شوي. تاسو نشئ کولی په لاسي ډول پورټ مشخص کړئ ، مګر د ډول ډراپ ډاون لیست څخه PostgreSQL غوره کړئ.
په کلکه خبرې کول ، ارزښت ::/0 د ټولې نړۍ څخه سرور ته د راتلوونکي ترافیک شتون معنی لري ، کوم چې په بشپړ ډول ریښتیني ندي ، مګر د مثال تحلیل کولو لپاره ، راځئ ځان ته اجازه ورکړو چې دا طریقه وکاروو:
موږ د براوزر پا pageې ته راستون شو ، چیرې چې موږ د "پرمختللي ترتیباتو تنظیم کول" خلاص لرو او د VPC امنیت ګروپونو برخه کې غوره کړئ -> موجوده VPC امنیتي ګروپونه غوره کړئ -> PostgreSQL:
بیا، د ډیټابیس په اختیارونو کې -> د ډیټابیس نوم -> نوم ترتیب کړئ - habrDB.
موږ کولی شو پاتې پیرامیټونه پریږدو ، د بیک اپ غیر فعال کولو استثنا سره (د بیک اپ ساتلو موده - 0 ورځې) ، نظارت او د فعالیت بصیرت ، په ډیفالټ. په تڼۍ کلیک وکړئ ډیټابیس جوړ کړئ:
تار سمبالونکی
وروستنۍ مرحله به د سپارک دندې پراختیا وي، کوم چې به په هرو دوو ثانیو کې د کافکا څخه نوي معلومات پروسس کړي او پایلې به ډیټابیس ته داخل کړي.
لکه څنګه چې پورته یادونه وشوه، پوستې په سپارک سټریمینګ کې یو اصلي میکانیزم دی چې باید د غلطی زغم ډاډمن کولو لپاره ترتیب شي. موږ به د پوستې څخه کار واخلو او، که چیرې کړنلاره ناکامه شي، د سپارک سټیمینګ ماډل به یوازې وروستۍ پوستې ته راستانه شي او د ورک شوي ډاټا بیرته ترلاسه کولو لپاره له هغې څخه حسابونه بیا پیل کړي.
د چک پوائنټ کول د غلطۍ زغمونکي، د اعتبار وړ فایل سیسټم (لکه HDFS، S3، او نور) کې د ډایرکټر په ترتیب کولو سره فعال کیدی شي چې د پوستې معلومات به زیرمه شي. دا په کارولو سره ترسره کیږي، د بیلګې په توګه:
streamingContext.checkpoint(checkpointDirectory)
زموږ په مثال کې، موږ به لاندې طریقه وکاروو، د بیلګې په توګه، که چیرې د پوستې ډایرکټر شتون ولري، نو شرایط به د پوستې ډاټا څخه بیا جوړ شي. که ډایرکټر شتون ونلري (د بیلګې په توګه د لومړي ځل لپاره اعدام شوی)، نو د فنکشن ټو کریټ کانټیکټ ته ویل کیږي چې نوي شرایط رامینځته کړي او DStreams تنظیم کړي:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
موږ د KafkaUtils کتابتون د CreateDirectStream میتود په کارولو سره د "معاملې" موضوع سره د نښلولو لپاره یو DirectStream څیز رامینځته کوو:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
په JSON بڼه کې د راتلونکو معلوماتو تحلیل کول:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
د سپارک ایس کیو ایل په کارولو سره، موږ یو ساده ګروپ جوړوو او پایله یې په کنسول کې ښکاره کوو:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
د پوښتنې متن ترلاسه کول او د سپارک ایس کیو ایل له لارې یې چلول:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
او بیا موږ پایله شوي راټول شوي معلومات په AWS RDS کې په جدول کې خوندي کوو. د ډیټابیس میز ته د راټولولو پایلې خوندي کولو لپاره، موږ به د ډیټا فریم اعتراض لیکلو طریقه وکاروو:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
د AWS RDS سره د پیوستون تنظیم کولو په اړه یو څو ټکي. موږ د دې لپاره کارن او پټنوم د "AWS PostgreSQL ځای په ځای کول" مرحله کې رامینځته کړی. تاسو باید د پای ټکی د ډیټابیس سرور url په توګه وکاروئ، کوم چې د ارتباط او امنیت برخه کې ښودل کیږي:
د دې لپاره چې سپارک او کافکا په سمه توګه وصل کړئ، تاسو باید د هنري اثارو په کارولو سره د smark-submit له لارې دنده پرمخ وړئ spark-streaming-kafka-0-8_2.11. برسیره پردې، موږ به د PostgreSQL ډیټابیس سره د تعامل لپاره یو هنر هم وکاروو؛ موږ به یې د --packages له لارې انتقال کړو.
د سکریپټ د انعطاف لپاره، موږ به د ان پټ پیرامیټونو په توګه د پیغام سرور نوم او هغه موضوع هم شامل کړو چې موږ یې ډاټا ترلاسه کول غواړو.
نو، دا وخت دی چې د سیسټم فعالیت پیل او وګورئ:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
هرڅه سم شول! لکه څنګه چې تاسو په لاندې عکس کې لیدلی شئ، پداسې حال کې چې اپلیکیشن روان دی، د راټولولو نوې پایلې په هر 2 ثانیو کې تولید کیږي، ځکه چې موږ د بیچ کولو وقفه 2 ثانیو ته ټاکلې کله چې موږ د StreamingContext اعتراض جوړ کړ:
بیا، موږ ډیټابیس ته یوه ساده پوښتنه کوو ترڅو په میز کې د ریکارډونو شتون وڅیړو راکړه ورکړه:
پایلې
دا مقاله د اپاچي کافکا او PostgreSQL سره په ګډه د سپارک سټریمینګ په کارولو سره د معلوماتو جریان پروسس کولو مثال ته وکتل. د مختلفو سرچینو څخه د معلوماتو وده سره، دا ستونزمنه ده چې د سټیمینګ او ریښتیني وخت غوښتنلیکونو رامینځته کولو لپاره د سپارک سټریمینګ عملي ارزښت ډیر کړئ.
تاسو کولی شئ بشپړ سرچینې کوډ زما په ذخیره کې ومومئ
زه د دې مقالې په اړه خوشحاله یم، زه ستاسو نظرونو ته سترګې په لار یم، او زه د ټولو پام وړ لوستونکو څخه د رغنده نیوکې هیله لرم.
بریالیتوب مو غواړم!
پی ایس. په پیل کې دا پلان شوی و چې د محلي PostgreSQL ډیټابیس وکاروئ، مګر د AWS سره زما مینه په پام کې نیولو سره، ما پریکړه وکړه چې ډیټابیس بادل ته انتقال کړم. د دې موضوع په راتلونکې مقاله کې، زه به وښیم چې څنګه په AWS کې د AWS Kinesis او AWS EMR په کارولو سره پورته تشریح شوي ټول سیسټم پلي کول. خبرونه تعقیب کړئ!
سرچینه: www.habr.com