Салом, Хабр! Имрӯз мо системаеро месозем, ки ҷараёни паёмҳои Apache Kafka-ро бо истифода аз Spark Streaming коркард мекунад ва натиҷаҳои коркардро ба махзани абрии AWS RDS нависед.
Биёед тасаввур кунем, ки як муассисаи кредитии муайян ба мо вазифаи коркарди амалиёти воридотӣ дар тамоми филиалҳои худро "дар парвоз" мегузорад. Ин метавонад бо мақсади ба таври фаврӣ ҳисоб кардани мавқеи кушодаи асъор барои хазинадорӣ, лимити ё натиҷаҳои молиявӣ барои муомилот ва ғайра анҷом дода шавад.
Чӣ тавр ин ҳолатро бе истифодаи ҷодугарӣ ва ҷодугарӣ амалӣ кардан мумкин аст - дар зери набуред! Бирав!
Муқаддима
Албатта, коркарди миқдори зиёди маълумот дар вақти воқеӣ барои истифода дар системаҳои муосир имкониятҳои васеъ фароҳам меорад. Яке аз маъмултарин комбинатсияҳо барои ин тандеми Apache Kafka ва Spark Streaming мебошад, ки дар он Кафка ҷараёни бастаҳои паёмҳои воридотӣ эҷод мекунад ва Spark Streaming ин бастаҳоро дар фосилаи муайяни вақт коркард мекунад.
Барои баланд бардоштани таҳаммулпазирии хатогиҳои барнома, мо гузаргоҳҳоро истифода мебарем. Бо ин механизм, вақте ки муҳаррики Spark Streaming бояд маълумоти гумшударо барқарор кунад, он танҳо бояд ба нуқтаи охирин баргардад ва ҳисобҳоро аз он ҷо идома диҳад.
Архитектураи системаи таҳияшуда
Компонентҳои истифодашуда:
Апач Кафка системаи паёмнависии паҳншуда-обуна аст. Муносиб барои истеъмоли паёмҳои офлайн ва онлайн. Барои пешгирии талафи маълумот, паёмҳои Кафка дар диск нигоҳ дошта мешаванд ва дар дохили кластер такрор карда мешаванд. Системаи Кафка дар болои хидмати ҳамоҳангсозии ZooKeeper сохта шудааст;Ҷараёни Apache Spark - Қисмати Spark барои коркарди маълумоти ҷараён. Модули Spark Streaming бо истифода аз меъмории микро-партия сохта шудааст, ки дар он ҷараёни додаҳо ҳамчун пайдарпайии пайвастаи бастаҳои хурди додаҳо тафсир карда мешавад. Spark Streaming маълумотро аз сарчашмаҳои гуногун мегирад ва онҳоро ба бастаҳои хурд муттаҳид мекунад. Бастаҳои нав бо фосилаҳои муқаррарӣ сохта мешаванд. Дар оғози ҳар як фосилаи вақт бастаи нав эҷод карда мешавад ва ҳама маълумоти дар ин фосила гирифташуда ба баста дохил карда мешаванд. Дар охири фосила, афзоиши бастаҳо қатъ мегардад. Андозаи фосила бо параметре муайян карда мешавад, ки фосилаи партия номида мешавад;Apache Spark SQL - коркарди реляциониро бо барномасозии функсионалии Spark муттаҳид мекунад. Маълумоти сохторӣ маънои маълумотеро дорад, ки схема дорад, яъне маҷмӯи ягонаи майдонҳо барои ҳама сабтҳо. Spark SQL воридотро аз сарчашмаҳои сохтории маълумот дастгирӣ мекунад ва ба шарофати мавҷудияти иттилооти схемавӣ, он метавонад танҳо майдонҳои зарурии сабтҳоро ба таври муассир дарёфт кунад ва инчунин API-ҳои DataFrame-ро таъмин намояд;AWS RDS як пойгоҳи нисбатан арзони релятсионӣ дар асоси абр аст, хидмати веб, ки танзим, амалиёт ва миқёсро содда мекунад ва мустақиман аз ҷониби Amazon идора карда мешавад.
Насб ва иҷро кардани сервери Кафка
Пеш аз истифодаи бевоситаи Кафка, шумо бояд боварӣ ҳосил кунед, ки шумо Java доред, зеро... JVM барои кор истифода мешавад:
sudo apt-get update
sudo apt-get install default-jre
java -version
Биёед корбари нав эҷод кунем, то бо Кафка кор кунад:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Баъдан, тақсимотро аз вебсайти расмии Apache Kafka зеркашӣ кунед:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Архиви зеркашидашударо кушоед:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Қадами навбатӣ ихтиёрӣ аст. Гап дар он аст, ки танзимоти пешфарз ба шумо имкон намедиҳад, ки тамоми имкониятҳои Apache Kafka-ро пурра истифода баред. Масалан, мавзӯъ, категория, гурӯҳеро, ки паёмҳо ба онҳо интишор мешаванд, нест кунед. Барои тағир додани ин, биёед файли конфигуратсияро таҳрир кунем:
vim ~/kafka/config/server.properties
Ба охири файл инҳоро илова кунед:
delete.topic.enable = true
Пеш аз оғози сервери Кафка, шумо бояд сервери ZooKeeper-ро оғоз кунед; мо скрипти ёрирасонеро, ки бо тақсимоти Кафка меояд, истифода хоҳем кард:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Пас аз он ки ZooKeeper бомуваффақият оғоз шуд, сервери Кафкаро дар терминали алоҳида оғоз кунед:
bin/kafka-server-start.sh config/server.properties
Биёед мавзӯи наверо бо номи транзаксия эҷод кунем:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Биёед боварӣ ҳосил кунем, ки мавзӯъ бо миқдори зарурии қисмҳо ва такрорӣ сохта шудааст:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Биёед, лаҳзаҳои санҷиши истеҳсолкунанда ва истеъмолкунандаро барои мавзӯи навтаъсис аз даст диҳем. Тафсилоти бештар дар бораи чӣ гуна шумо метавонед фиристодан ва қабули паёмҳоро санҷед, дар ҳуҷҷатҳои расмӣ навишта шудааст -
Навиштани истеҳсолкунанда
Истеҳсолкунанда маълумоти тасодуфӣ тавлид мекунад - ҳар сония 100 паём. Бо маълумоти тасодуфӣ мо луғатро дар назар дорем, ки аз се соҳа иборат аст:
- Филиали — номи нуқтаи фурӯши ташкилоти қарзӣ;
- асъор - асъори муомилот;
- Љамъ - маблағи муомилот. Маблағ рақами мусбат хоҳад буд, агар ин хариди асъор аз ҷониби Бонк бошад ва агар ин фурӯш бошад, рақами манфӣ бошад.
Рамзи истеҳсолкунанда чунин менамояд:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
Баъдан, бо истифода аз усули ирсол, мо ба сервер ба мавзӯъе, ки ба мо лозим аст, дар формати JSON паём мефиристем:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Ҳангоми иҷро кардани скрипт, мо дар терминал паёмҳои зеринро мегирем:
Ин маънои онро дорад, ки ҳама чиз тавре кор мекунад, ки мо мехостем - истеҳсолкунанда паёмҳоро ба мавзӯъе, ки ба мо лозим аст, тавлид ва мефиристад.
Қадами навбатӣ насб кардани Spark ва коркарди ин ҷараёни паём аст.
Насб кардани Apache Spark
Apache Spark платформаи ҳисоббарории кластери универсалӣ ва сермахсул аст.
Spark нисбат ба татбиқи маъмули модели MapReduce беҳтар кор мекунад ва дар ҳоле ки доираи васеи намудҳои ҳисоббарорӣ, аз ҷумла дархостҳои интерактивӣ ва коркарди ҷараёнро дастгирӣ мекунад. Суръат ҳангоми коркарди миқдори зиёди маълумот нақши муҳим мебозад, зеро он суръатест, ки ба шумо имкон медиҳад интерактивӣ бидуни сарф кардани дақиқаҳо ё соатҳои интизорӣ кор кунед. Яке аз ҷиҳатҳои бузургтарини Spark, ки онро ин қадар зуд мекунад, қобилияти иҷрои ҳисобҳои дохили хотира мебошад.
Ин чаҳорчӯба дар Scala навишта шудааст, бинобар ин шумо бояд аввал онро насб кунед:
sudo apt-get install scala
Тақсимоти Spark-ро аз вебсайти расмӣ зеркашӣ кунед:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
Архивро кушоед:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Роҳро ба Spark ба файли bash илова кунед:
vim ~/.bashrc
Ба воситаи муҳаррир сатрҳои зеринро илова кунед:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Пас аз ворид кардани тағйирот ба bashrc фармони зерро иҷро кунед:
source ~/.bashrc
Ҷойгиркунии AWS PostgreSQL
Ҳама чизи боқӣ гузоштани пойгоҳи додаҳост, ки мо маълумоти коркардшударо аз ҷараёнҳо бор мекунем. Барои ин мо хидмати AWS RDS-ро истифода хоҳем бурд.
Ба консоли AWS -> AWS RDS -> Пойгоҳҳо -> Эҷоди пойгоҳи додаҳо гузаред:
PostgreSQL-ро интихоб кунед ва Баъдан клик кунед:
Зеро Ин мисол танҳо барои мақсадҳои таълимӣ аст; мо сервери ройгонро "ҳадди ақалл" (сатҳи ройгон) истифода хоҳем кард:
Баъдан, мо дар блоки Free Tier аломат мегузорем ва баъд аз он ба мо ба таври худкор намунаи синфи t2.micro пешниҳод карда мешавад - гарчанде заиф бошад ҳам, он ройгон аст ва барои вазифаи мо комилан мувофиқ аст:
Баъдан чизҳои хеле муҳим меоянд: номи мисоли пойгоҳи додаҳо, номи корбари асосӣ ва пароли ӯ. Биёед мисолро номбар кунем: myHabrTest, корбари асосӣ: хабр, парол: habr12345 ва тугмаи Next -ро пахш кунед:
Дар саҳифаи навбатӣ параметрҳое мавҷуданд, ки барои дастрасии сервери пойгоҳи додаҳои мо аз берун (дастрасии ҷамъиятӣ) ва дастрасии порт масъуланд:
Биёед як танзимоти навро барои гурӯҳи амнияти VPC созем, ки дастрасии беруна ба сервери базаи мо тавассути порти 5432 (PostgreSQL) имкон медиҳад.
Биёед ба консоли AWS дар равзанаи браузери алоҳида ба панели VPC -> Гурӯҳҳои амниятӣ -> Эҷоди бахши гурӯҳи амниятӣ равем:
Мо номи гурӯҳи Амниятро муқаррар кардем - PostgreSQL, тавсиф, нишон медиҳад, ки ин гурӯҳ бояд бо кадом VPC алоқаманд бошад ва тугмаи Эҷод -ро пахш кунед:
Қоидаҳои воридотӣ барои бандари 5432-ро барои гурӯҳи навтаъсис, тавре ки дар расми зер нишон дода шудааст, пур кунед. Шумо портро дастӣ муайян карда наметавонед, аммо аз рӯйхати афтанда Намуди PostgreSQL -ро интихоб кунед.
Ба таври қатъӣ, арзиши ::/0 маънои мавҷудияти трафики воридотӣ ба сервер аз тамоми ҷаҳонро дорад, ки аз ҷиҳати қонунӣ комилан дуруст нест, аммо барои таҳлили мисол, биёед ба худамон иҷозат диҳем, ки ин равишро истифода барем:
Мо ба саҳифаи браузер бармегардем, ки дар он ҷо мо "Танзимоти пешрафта" -ро кушоем ва дар қисмати гурӯҳҳои амнияти VPC -ро интихоб кунед -> Гурӯҳҳои мавҷудаи амнияти VPC -ро интихоб кунед -> PostgreSQL:
Баъдан, дар имконоти Пойгоҳи додаҳо -> Номи пойгоҳи додаҳо -> номро таъин кунед - habrDB.
Мо метавонем параметрҳои боқимондаро ба истиснои ғайрифаъол кардани нусхаи эҳтиётӣ (мӯҳлати нигоҳдории нусха - 0 рӯз), мониторинг ва Insights Performance, ба таври нобаёнӣ тарк кунем. Тугмаро пахш кунед Сохтани пойгоҳи додаҳо:
Коркарди ришта
Марҳилаи ниҳоӣ таҳияи кори Spark хоҳад буд, ки дар ҳар ду сония маълумоти нави аз Кафка омадаро коркард мекунад ва натиҷаро ба пойгоҳи додаҳо ворид мекунад.
Тавре ки дар боло қайд карда шуд, гузаргоҳҳо як механизми асосии SparkStreaming мебошанд, ки бояд барои таъмини таҳаммулпазирии хатоҳо танзим карда шаванд. Мо нуқтаҳои гузаргоҳро истифода хоҳем кард ва агар ин тартиб ноком шавад, модули Spark Streaming танҳо бояд ба нуқтаи охирин баргардад ва ҳисобҳоро аз он барқарор кунад, то маълумоти гумшударо барқарор кунад.
Нуктаи назоратиро тавассути гузоштани феҳрист дар системаи файлии ба хато тобовар ва боэътимод (ба монанди HDFS, S3 ва ғ.), ки дар он маълумоти нуқтаи назорат нигоҳ дошта мешавад, фаъол кардан мумкин аст. Ин амал бо истифода аз, масалан:
streamingContext.checkpoint(checkpointDirectory)
Дар мисоли мо, мо равиши зеринро истифода мебарем, яъне агар checkpointDirectory вуҷуд дошта бошад, пас контекст аз маълумоти нуқтаи назорат дубора эҷод карда мешавад. Агар директория мавҷуд набошад (яъне бори аввал иҷро карда шавад), он гоҳ functionToCreateContext барои сохтани контексти нав ва танзим кардани DStreams даъват карда мешавад:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Мо объекти DirectStreamро барои пайваст шудан ба мавзӯи "амалиёт" бо истифода аз усули createDirectStream китобхонаи KafkaUtils эҷод мекунем:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Таҳлили маълумоти воридотӣ дар формати JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Бо истифода аз Spark SQL, мо гурӯҳбандии оддиро анҷом медиҳем ва натиҷаро дар консол намоиш медиҳем:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Гирифтани матни дархост ва иҷро кардани он тавассути Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
Ва он гоҳ мо маълумоти ҷамъшудаи натиҷаро дар ҷадвал дар AWS RDS захира мекунем. Барои захира кардани натиҷаҳои ҷамъбаст дар ҷадвали пойгоҳи додаҳо, мо усули навиштани объекти DataFrame -ро истифода мебарем:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
Чанд сухан дар бораи насб кардани пайвастшавӣ ба AWS RDS. Мо корбар ва паролро барои он дар қадами "Тасдиқи AWS PostgreSQL" офаридаем. Шумо бояд Endpoint-ро ҳамчун URL-и сервери пойгоҳи додаҳо истифода баред, ки дар бахши Пайвастшавӣ ва амният нишон дода мешавад:
Барои дуруст пайваст кардани Spark ва Кафка, шумо бояд корро тавассути smark-submit бо истифода аз артефакт иҷро кунед spark-streaming-kafka-0-8_2.11. Илова бар ин, мо инчунин як артефактро барои ҳамкорӣ бо пойгоҳи додаҳои PostgreSQL истифода мебарем; мо онҳоро тавассути --packages интиқол медиҳем.
Барои чандирии скрипт, мо инчунин ҳамчун параметрҳои вуруд номи сервери паём ва мавзӯъеро, ки мо аз он маълумот гирифтан мехоҳем, дохил хоҳем кард.
Ҳамин тавр, вақти он расидааст, ки кори системаро оғоз кунед ва тафтиш кунед:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
Ҳама чиз кор кард! Тавре ки шумо дар расми зер мебинед, ҳангоми кор кардани барнома, ҳар 2 сония натиҷаҳои ҷамъоварии нав бароварда мешаванд, зеро ҳангоми сохтани объекти StreamingContext мо фосилаи бастабандиро ба 2 сония муқаррар кардем:
Минбаъд, мо ба базаи маълумот дархости оддӣ мекунем, то мавҷудияти сабтҳоро дар ҷадвал тафтиш кунем транзаксия_ҷараён:
хулоса
Ин мақола намунаи коркарди ҷараёни иттилоотро бо истифода аз Spark Streaming дар якҷоягӣ бо Apache Kafka ва PostgreSQL дида баромад. Бо афзоиши маълумот аз сарчашмаҳои гуногун, аз ҳад зиёд баҳо додан ба арзиши амалии Spark Streaming барои эҷоди ҷараён ва барномаҳои воқеӣ душвор аст.
Шумо метавонед рамзи пурраи сарчашмаро дар анбори ман дар ин ҷо пайдо кунед
Ман хурсандам, ки ин мақоларо баррасӣ мекунам, интизори шарҳҳои шумо ҳастам ва инчунин умедворам, ки интиқоди созанда аз ҳама хонандагони ғамхор.
Ба шумо муваффақият орзумандам!
Заб. Дар ибтидо истифодаи пойгоҳи маҳаллии PostgreSQL ба нақша гирифта шуда буд, аммо бо назардошти муҳаббати ман ба AWS, ман қарор додам, ки пойгоҳи додаҳоро ба абр интиқол диҳам. Дар мақолаи навбатӣ оид ба ин мавзӯъ, ман нишон медиҳам, ки чӣ гуна тамоми системаи дар боло тавсифшуда дар AWS бо истифода аз AWS Kinesis ва AWS EMR татбиқ карда шавад. Хабарҳоро пайгирӣ кунед!
Манбаъ: will.com