Apache Кафка ва коркарди маълумот бо Spark Streaming

Салом, Хабр! Имрӯз мо системаеро месозем, ки ҷараёни паёмҳои Apache Kafka-ро бо истифода аз Spark Streaming коркард мекунад ва натиҷаҳои коркардро ба махзани абрии AWS RDS нависед.

Биёед тасаввур кунем, ки як муассисаи кредитии муайян ба мо вазифаи коркарди амалиёти воридотӣ дар тамоми филиалҳои худро "дар парвоз" мегузорад. Ин метавонад бо мақсади ба таври фаврӣ ҳисоб кардани мавқеи кушодаи асъор барои хазинадорӣ, лимити ё натиҷаҳои молиявӣ барои муомилот ва ғайра анҷом дода шавад.

Чӣ тавр ин ҳолатро бе истифодаи ҷодугарӣ ва ҷодугарӣ амалӣ кардан мумкин аст - дар зери набуред! Бирав!

Apache Кафка ва коркарди маълумот бо Spark Streaming
(Манбаи тасвир)

Муқаддима

Албатта, коркарди миқдори зиёди маълумот дар вақти воқеӣ барои истифода дар системаҳои муосир имкониятҳои васеъ фароҳам меорад. Яке аз маъмултарин комбинатсияҳо барои ин тандеми Apache Kafka ва Spark Streaming мебошад, ки дар он Кафка ҷараёни бастаҳои паёмҳои воридотӣ эҷод мекунад ва Spark Streaming ин бастаҳоро дар фосилаи муайяни вақт коркард мекунад.

Барои баланд бардоштани таҳаммулпазирии хатогиҳои барнома, мо гузаргоҳҳоро истифода мебарем. Бо ин механизм, вақте ки муҳаррики Spark Streaming бояд маълумоти гумшударо барқарор кунад, он танҳо бояд ба нуқтаи охирин баргардад ва ҳисобҳоро аз он ҷо идома диҳад.

Архитектураи системаи таҳияшуда

Apache Кафка ва коркарди маълумот бо Spark Streaming

Компонентҳои истифодашуда:

  • Апач Кафка системаи паёмнависии паҳншуда-обуна аст. Муносиб барои истеъмоли паёмҳои офлайн ва онлайн. Барои пешгирии талафи маълумот, паёмҳои Кафка дар диск нигоҳ дошта мешаванд ва дар дохили кластер такрор карда мешаванд. Системаи Кафка дар болои хидмати ҳамоҳангсозии ZooKeeper сохта шудааст;
  • Ҷараёни Apache Spark - Қисмати Spark барои коркарди маълумоти ҷараён. Модули Spark Streaming бо истифода аз меъмории микро-партия сохта шудааст, ки дар он ҷараёни додаҳо ҳамчун пайдарпайии пайвастаи бастаҳои хурди додаҳо тафсир карда мешавад. Spark Streaming маълумотро аз сарчашмаҳои гуногун мегирад ва онҳоро ба бастаҳои хурд муттаҳид мекунад. Бастаҳои нав бо фосилаҳои муқаррарӣ сохта мешаванд. Дар оғози ҳар як фосилаи вақт бастаи нав эҷод карда мешавад ва ҳама маълумоти дар ин фосила гирифташуда ба баста дохил карда мешаванд. Дар охири фосила, афзоиши бастаҳо қатъ мегардад. Андозаи фосила бо параметре муайян карда мешавад, ки фосилаи партия номида мешавад;
  • Apache Spark SQL - коркарди реляциониро бо барномасозии функсионалии Spark муттаҳид мекунад. Маълумоти сохторӣ маънои маълумотеро дорад, ки схема дорад, яъне маҷмӯи ягонаи майдонҳо барои ҳама сабтҳо. Spark SQL воридотро аз сарчашмаҳои сохтории маълумот дастгирӣ мекунад ва ба шарофати мавҷудияти иттилооти схемавӣ, он метавонад танҳо майдонҳои зарурии сабтҳоро ба таври муассир дарёфт кунад ва инчунин API-ҳои DataFrame-ро таъмин намояд;
  • AWS RDS як пойгоҳи нисбатан арзони релятсионӣ дар асоси абр аст, хидмати веб, ки танзим, амалиёт ва миқёсро содда мекунад ва мустақиман аз ҷониби Amazon идора карда мешавад.

Насб ва иҷро кардани сервери Кафка

Пеш аз истифодаи бевоситаи Кафка, шумо бояд боварӣ ҳосил кунед, ки шумо Java доред, зеро... JVM барои кор истифода мешавад:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Биёед корбари нав эҷод кунем, то бо Кафка кор кунад:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Баъдан, тақсимотро аз вебсайти расмии Apache Kafka зеркашӣ кунед:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Архиви зеркашидашударо кушоед:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Қадами навбатӣ ихтиёрӣ аст. Гап дар он аст, ки танзимоти пешфарз ба шумо имкон намедиҳад, ки тамоми имкониятҳои Apache Kafka-ро пурра истифода баред. Масалан, мавзӯъ, категория, гурӯҳеро, ки паёмҳо ба онҳо интишор мешаванд, нест кунед. Барои тағир додани ин, биёед файли конфигуратсияро таҳрир кунем:

vim ~/kafka/config/server.properties

Ба охири файл инҳоро илова кунед:

delete.topic.enable = true

Пеш аз оғози сервери Кафка, шумо бояд сервери ZooKeeper-ро оғоз кунед; мо скрипти ёрирасонеро, ки бо тақсимоти Кафка меояд, истифода хоҳем кард:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Пас аз он ки ZooKeeper бомуваффақият оғоз шуд, сервери Кафкаро дар терминали алоҳида оғоз кунед:

bin/kafka-server-start.sh config/server.properties

Биёед мавзӯи наверо бо номи транзаксия эҷод кунем:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Биёед боварӣ ҳосил кунем, ки мавзӯъ бо миқдори зарурии қисмҳо ва такрорӣ сохта шудааст:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Кафка ва коркарди маълумот бо Spark Streaming

Биёед, лаҳзаҳои санҷиши истеҳсолкунанда ва истеъмолкунандаро барои мавзӯи навтаъсис аз даст диҳем. Тафсилоти бештар дар бораи чӣ гуна шумо метавонед фиристодан ва қабули паёмҳоро санҷед, дар ҳуҷҷатҳои расмӣ навишта шудааст - Якчанд паём фиристед. Хуб, мо бо истифода аз API KafkaProducer ба навиштани продюсер дар Python мегузарем.

Навиштани истеҳсолкунанда

Истеҳсолкунанда маълумоти тасодуфӣ тавлид мекунад - ҳар сония 100 паём. Бо маълумоти тасодуфӣ мо луғатро дар назар дорем, ки аз се соҳа иборат аст:

  • Филиали — номи нуқтаи фурӯши ташкилоти қарзӣ;
  • асъор - асъори муомилот;
  • Љамъ - маблағи муомилот. Маблағ рақами мусбат хоҳад буд, агар ин хариди асъор аз ҷониби Бонк бошад ва агар ин фурӯш бошад, рақами манфӣ бошад.

Рамзи истеҳсолкунанда чунин менамояд:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Баъдан, бо истифода аз усули ирсол, мо ба сервер ба мавзӯъе, ки ба мо лозим аст, дар формати JSON паём мефиристем:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Ҳангоми иҷро кардани скрипт, мо дар терминал паёмҳои зеринро мегирем:

Apache Кафка ва коркарди маълумот бо Spark Streaming

Ин маънои онро дорад, ки ҳама чиз тавре кор мекунад, ки мо мехостем - истеҳсолкунанда паёмҳоро ба мавзӯъе, ки ба мо лозим аст, тавлид ва мефиристад.
Қадами навбатӣ насб кардани Spark ва коркарди ин ҷараёни паём аст.

Насб кардани Apache Spark

Apache Spark платформаи ҳисоббарории кластери универсалӣ ва сермахсул аст.

Spark нисбат ба татбиқи маъмули модели MapReduce беҳтар кор мекунад ва дар ҳоле ки доираи васеи намудҳои ҳисоббарорӣ, аз ҷумла дархостҳои интерактивӣ ва коркарди ҷараёнро дастгирӣ мекунад. Суръат ҳангоми коркарди миқдори зиёди маълумот нақши муҳим мебозад, зеро он суръатест, ки ба шумо имкон медиҳад интерактивӣ бидуни сарф кардани дақиқаҳо ё соатҳои интизорӣ кор кунед. Яке аз ҷиҳатҳои бузургтарини Spark, ки онро ин қадар зуд мекунад, қобилияти иҷрои ҳисобҳои дохили хотира мебошад.

Ин чаҳорчӯба дар Scala навишта шудааст, бинобар ин шумо бояд аввал онро насб кунед:

sudo apt-get install scala

Тақсимоти Spark-ро аз вебсайти расмӣ зеркашӣ кунед:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Архивро кушоед:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Роҳро ба Spark ба файли bash илова кунед:

vim ~/.bashrc

Ба воситаи муҳаррир сатрҳои зеринро илова кунед:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Пас аз ворид кардани тағйирот ба bashrc фармони зерро иҷро кунед:

source ~/.bashrc

Ҷойгиркунии AWS PostgreSQL

Ҳама чизи боқӣ гузоштани пойгоҳи додаҳост, ки мо маълумоти коркардшударо аз ҷараёнҳо бор мекунем. Барои ин мо хидмати AWS RDS-ро истифода хоҳем бурд.

Ба консоли AWS -> AWS RDS -> Пойгоҳҳо -> Эҷоди пойгоҳи додаҳо гузаред:
Apache Кафка ва коркарди маълумот бо Spark Streaming

PostgreSQL-ро интихоб кунед ва Баъдан клик кунед:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Зеро Ин мисол танҳо барои мақсадҳои таълимӣ аст; мо сервери ройгонро "ҳадди ақалл" (сатҳи ройгон) истифода хоҳем кард:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Баъдан, мо дар блоки Free Tier аломат мегузорем ва баъд аз он ба мо ба таври худкор намунаи синфи t2.micro пешниҳод карда мешавад - гарчанде заиф бошад ҳам, он ройгон аст ва барои вазифаи мо комилан мувофиқ аст:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Баъдан чизҳои хеле муҳим меоянд: номи мисоли пойгоҳи додаҳо, номи корбари асосӣ ва пароли ӯ. Биёед мисолро номбар кунем: myHabrTest, корбари асосӣ: хабр, парол: habr12345 ва тугмаи Next -ро пахш кунед:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Дар саҳифаи навбатӣ параметрҳое мавҷуданд, ки барои дастрасии сервери пойгоҳи додаҳои мо аз берун (дастрасии ҷамъиятӣ) ва дастрасии порт масъуланд:

Apache Кафка ва коркарди маълумот бо Spark Streaming

Биёед як танзимоти навро барои гурӯҳи амнияти VPC созем, ки дастрасии беруна ба сервери базаи мо тавассути порти 5432 (PostgreSQL) имкон медиҳад.
Биёед ба консоли AWS дар равзанаи браузери алоҳида ба панели VPC -> Гурӯҳҳои амниятӣ -> Эҷоди бахши гурӯҳи амниятӣ равем:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Мо номи гурӯҳи Амниятро муқаррар кардем - PostgreSQL, тавсиф, нишон медиҳад, ки ин гурӯҳ бояд бо кадом VPC алоқаманд бошад ва тугмаи Эҷод -ро пахш кунед:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Қоидаҳои воридотӣ барои бандари 5432-ро барои гурӯҳи навтаъсис, тавре ки дар расми зер нишон дода шудааст, пур кунед. Шумо портро дастӣ муайян карда наметавонед, аммо аз рӯйхати афтанда Намуди PostgreSQL -ро интихоб кунед.

Ба таври қатъӣ, арзиши ::/0 маънои мавҷудияти трафики воридотӣ ба сервер аз тамоми ҷаҳонро дорад, ки аз ҷиҳати қонунӣ комилан дуруст нест, аммо барои таҳлили мисол, биёед ба худамон иҷозат диҳем, ки ин равишро истифода барем:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Мо ба саҳифаи браузер бармегардем, ки дар он ҷо мо "Танзимоти пешрафта" -ро кушоем ва дар қисмати гурӯҳҳои амнияти VPC -ро интихоб кунед -> Гурӯҳҳои мавҷудаи амнияти VPC -ро интихоб кунед -> PostgreSQL:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Баъдан, дар имконоти Пойгоҳи додаҳо -> Номи пойгоҳи додаҳо -> номро таъин кунед - habrDB.

Мо метавонем параметрҳои боқимондаро ба истиснои ғайрифаъол кардани нусхаи эҳтиётӣ (мӯҳлати нигоҳдории нусха - 0 рӯз), мониторинг ва Insights Performance, ба таври нобаёнӣ тарк кунем. Тугмаро пахш кунед Сохтани пойгоҳи додаҳо:
Apache Кафка ва коркарди маълумот бо Spark Streaming

Коркарди ришта

Марҳилаи ниҳоӣ таҳияи кори Spark хоҳад буд, ки дар ҳар ду сония маълумоти нави аз Кафка омадаро коркард мекунад ва натиҷаро ба пойгоҳи додаҳо ворид мекунад.

Тавре ки дар боло қайд карда шуд, гузаргоҳҳо як механизми асосии SparkStreaming мебошанд, ки бояд барои таъмини таҳаммулпазирии хатоҳо танзим карда шаванд. Мо нуқтаҳои гузаргоҳро истифода хоҳем кард ва агар ин тартиб ноком шавад, модули Spark Streaming танҳо бояд ба нуқтаи охирин баргардад ва ҳисобҳоро аз он барқарор кунад, то маълумоти гумшударо барқарор кунад.

Нуктаи назоратиро тавассути гузоштани феҳрист дар системаи файлии ба хато тобовар ва боэътимод (ба монанди HDFS, S3 ва ғ.), ки дар он маълумоти нуқтаи назорат нигоҳ дошта мешавад, фаъол кардан мумкин аст. Ин амал бо истифода аз, масалан:

streamingContext.checkpoint(checkpointDirectory)

Дар мисоли мо, мо равиши зеринро истифода мебарем, яъне агар checkpointDirectory вуҷуд дошта бошад, пас контекст аз маълумоти нуқтаи назорат дубора эҷод карда мешавад. Агар директория мавҷуд набошад (яъне бори аввал иҷро карда шавад), он гоҳ functionToCreateContext барои сохтани контексти нав ва танзим кардани DStreams даъват карда мешавад:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Мо объекти DirectStreamро барои пайваст шудан ба мавзӯи "амалиёт" бо истифода аз усули createDirectStream китобхонаи KafkaUtils эҷод мекунем:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Таҳлили маълумоти воридотӣ дар формати JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Бо истифода аз Spark SQL, мо гурӯҳбандии оддиро анҷом медиҳем ва натиҷаро дар консол намоиш медиҳем:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Гирифтани матни дархост ва иҷро кардани он тавассути Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ва он гоҳ мо маълумоти ҷамъшудаи натиҷаро дар ҷадвал дар AWS RDS захира мекунем. Барои захира кардани натиҷаҳои ҷамъбаст дар ҷадвали пойгоҳи додаҳо, мо усули навиштани объекти DataFrame -ро истифода мебарем:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Чанд сухан дар бораи насб кардани пайвастшавӣ ба AWS RDS. Мо корбар ва паролро барои он дар қадами "Тасдиқи AWS PostgreSQL" офаридаем. Шумо бояд Endpoint-ро ҳамчун URL-и сервери пойгоҳи додаҳо истифода баред, ки дар бахши Пайвастшавӣ ва амният нишон дода мешавад:

Apache Кафка ва коркарди маълумот бо Spark Streaming

Барои дуруст пайваст кардани Spark ва Кафка, шумо бояд корро тавассути smark-submit бо истифода аз артефакт иҷро кунед spark-streaming-kafka-0-8_2.11. Илова бар ин, мо инчунин як артефактро барои ҳамкорӣ бо пойгоҳи додаҳои PostgreSQL истифода мебарем; мо онҳоро тавассути --packages интиқол медиҳем.

Барои чандирии скрипт, мо инчунин ҳамчун параметрҳои вуруд номи сервери паём ва мавзӯъеро, ки мо аз он маълумот гирифтан мехоҳем, дохил хоҳем кард.

Ҳамин тавр, вақти он расидааст, ки кори системаро оғоз кунед ва тафтиш кунед:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ҳама чиз кор кард! Тавре ки шумо дар расми зер мебинед, ҳангоми кор кардани барнома, ҳар 2 сония натиҷаҳои ҷамъоварии нав бароварда мешаванд, зеро ҳангоми сохтани объекти StreamingContext мо фосилаи бастабандиро ба 2 сония муқаррар кардем:

Apache Кафка ва коркарди маълумот бо Spark Streaming

Минбаъд, мо ба базаи маълумот дархости оддӣ мекунем, то мавҷудияти сабтҳоро дар ҷадвал тафтиш кунем транзаксия_ҷараён:

Apache Кафка ва коркарди маълумот бо Spark Streaming

хулоса

Ин мақола намунаи коркарди ҷараёни иттилоотро бо истифода аз Spark Streaming дар якҷоягӣ бо Apache Kafka ва PostgreSQL дида баромад. Бо афзоиши маълумот аз сарчашмаҳои гуногун, аз ҳад зиёд баҳо додан ба арзиши амалии Spark Streaming барои эҷоди ҷараён ва барномаҳои воқеӣ душвор аст.

Шумо метавонед рамзи пурраи сарчашмаро дар анбори ман дар ин ҷо пайдо кунед GitHub.

Ман хурсандам, ки ин мақоларо баррасӣ мекунам, интизори шарҳҳои шумо ҳастам ва инчунин умедворам, ки интиқоди созанда аз ҳама хонандагони ғамхор.

Ба шумо муваффақият орзумандам!

Заб. Дар ибтидо истифодаи пойгоҳи маҳаллии PostgreSQL ба нақша гирифта шуда буд, аммо бо назардошти муҳаббати ман ба AWS, ман қарор додам, ки пойгоҳи додаҳоро ба абр интиқол диҳам. Дар мақолаи навбатӣ оид ба ин мавзӯъ, ман нишон медиҳам, ки чӣ гуна тамоми системаи дар боло тавсифшуда дар AWS бо истифода аз AWS Kinesis ва AWS EMR татбиқ карда шавад. Хабарҳоро пайгирӣ кунед!

Манбаъ: will.com

Илова Эзоҳ