Apache Kafka lan Streaming Data Processing karo Spark Streaming

Sugeng rawuh, Habr! Dina iki kita bakal mbangun sistem sing bakal ngolah stream pesen Apache Kafka nggunakake Spark Streaming lan nulis asil pangolahan menyang database awan AWS RDS.

Coba bayangake yen institusi kredit tartamtu nemtokake tugas kanggo ngolah transaksi sing mlebu "ing fly" ing kabeh cabang. Iki bisa ditindakake kanthi cepet kanggo ngitung posisi mata uang sing mbukak kanggo treasury, watesan utawa asil finansial kanggo transaksi, lsp.

Cara ngleksanakake kasus iki tanpa nggunakake sihir lan mantra sihir - maca ing ngisor potong! Tindak!

Apache Kafka lan Streaming Data Processing karo Spark Streaming
(Sumber gambar)

Pambuka

Mesthine, ngolah data sing akeh ing wektu nyata nyedhiyakake akeh kesempatan kanggo digunakake ing sistem modern. Salah sawijining kombinasi sing paling populer kanggo iki yaiku tandem Apache Kafka lan Spark Streaming, ing ngendi Kafka nggawe stream paket pesen sing mlebu, lan Spark Streaming ngolah paket kasebut ing interval wektu tartamtu.

Kanggo nambah toleransi fault saka aplikasi, kita bakal nggunakake checkpoints. Kanthi mekanisme iki, nalika mesin Spark Streaming kudu mbalekake data sing ilang, mung kudu bali menyang checkpoint pungkasan lan nerusake petungan saka kono.

Arsitektur sistem sing dikembangake

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Komponen sing digunakake:

  • Apache Kafka yaiku sistem olahpesen publish-subscribe sing disebarake. Cocog kanggo konsumsi pesen offline lan online. Kanggo nyegah mundhut data, pesen Kafka disimpen ing disk lan ditiru ing kluster. Sistem Kafka dibangun ing ndhuwur layanan sinkronisasi ZooKeeper;
  • Apache Spark Streaming - Komponen Spark kanggo ngolah data streaming. Modul Spark Streaming dibangun nggunakake arsitektur kumpulan mikro, ing ngendi aliran data diinterpretasikake minangka urutan terus-terusan saka paket data cilik. Spark Streaming njupuk data saka macem-macem sumber lan nggabungake menyang paket cilik. Paket anyar digawe kanthi interval biasa. Ing wiwitan saben interval wektu, paket anyar digawe, lan data apa wae sing ditampa sajrone interval kasebut kalebu ing paket kasebut. Ing pungkasan interval, wutah paket mandheg. Ukuran interval ditemtokake dening parameter sing disebut interval kumpulan;
  • Apache Spark SQL - nggabungake pangolahan hubungan karo pemrograman fungsional Spark. Data terstruktur tegese data sing nduweni skema, yaiku, siji set kolom kanggo kabeh rekaman. Spark SQL ndhukung input saka macem-macem sumber data kabentuk lan, thanks kanggo kasedhiyan informasi skema, iku bisa irit njupuk mung kothak dibutuhake cathetan, lan uga nyedhiyani API DataFrame;
  • AWS RDS minangka basis data relasional basis maya sing relatif murah, layanan web sing nyederhanakake persiyapan, operasi lan skala, lan dikelola langsung dening Amazon.

Nginstal lan mbukak server Kafka

Sadurunge nggunakake Kafka langsung, sampeyan kudu nggawe manawa sampeyan duwe Jawa, amarga ... JVM digunakake kanggo karya:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ayo nggawe pangguna anyar kanggo nggarap Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Sabanjure, download distribusi saka situs web Apache Kafka resmi:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Bukak arsip sing diundhuh:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Langkah sabanjure opsional. Kasunyatane yaiku setelan gawan ora ngidini sampeyan nggunakake kabeh kemampuan Apache Kafka. Contone, mbusak topik, kategori, grup sing pesen bisa diterbitake. Kanggo ngganti iki, ayo ngowahi file konfigurasi:

vim ~/kafka/config/server.properties

Tambah ing ngisor iki menyang mburi file:

delete.topic.enable = true

Sadurunge miwiti server Kafka, sampeyan kudu miwiti server ZooKeeper; kita bakal nggunakake skrip tambahan sing kasedhiya karo distribusi Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Sawise ZooKeeper wis sukses, bukak server Kafka ing terminal sing kapisah:

bin/kafka-server-start.sh config/server.properties

Ayo nggawe topik anyar sing diarani Transaksi:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Priksa manawa topik kanthi jumlah partisi lan replikasi sing dibutuhake wis digawe:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Ayo kantun wektu nyoba produser lan konsumen kanggo topik sing mentas digawe. Rincian liyane babagan carane sampeyan bisa nyoba ngirim lan nampa pesen ditulis ing dokumentasi resmi - Kirim sawetara pesen. Inggih, kita nerusake kanggo nulis produser ing Python nggunakake KafkaProducer API.

Tulisan produser

Produser bakal ngasilake data acak - 100 pesen saben detik. Miturut data acak, tegese kamus sing dumadi saka telung lapangan:

  • Branch - jeneng titik jual institusi kredit;
  • Currency - mata uang transaksi;
  • jumlah - jumlah transaksi. Jumlah kasebut bakal dadi nomer positif yen tuku mata uang dening Bank, lan nomer negatif yen adol.

Kode kanggo produser katon kaya iki:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Sabanjure, nggunakake metode kirim, kita ngirim pesen menyang server, menyang topik sing dibutuhake, ing format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Nalika mbukak skrip, kita nampa pesen ing ngisor iki ing terminal:

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Iki tegese kabeh bisa kaya sing dikarepake - produser ngasilake lan ngirim pesen menyang topik sing dibutuhake.
Langkah sabanjure kanggo nginstal Spark lan proses stream pesen iki.

Nginstal Apache Spark

Apache Spark minangka platform komputasi kluster universal lan kinerja dhuwur.

Spark nindakake luwih apik tinimbang implementasi populer saka model MapReduce nalika ndhukung macem-macem jinis komputasi, kalebu pitakon interaktif lan pangolahan stream. Kacepetan nduweni peran penting nalika ngolah data sing akeh, amarga kacepetan sing ngidini sampeyan bisa kerja kanthi interaktif tanpa ngenteni menit utawa jam. Salah sawijining kekiyatan paling gedhe Spark sing ndadekake cepet banget yaiku kemampuan kanggo nindakake petungan ing memori.

Kerangka iki ditulis ing Scala, dadi sampeyan kudu nginstal dhisik:

sudo apt-get install scala

Ngundhuh distribusi Spark saka situs web resmi:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Mbukak arsip:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tambah path menyang Spark menyang file bash:

vim ~/.bashrc

Tambah baris ing ngisor iki liwat editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Jalanake perintah ing ngisor iki sawise nggawe pangowahan menyang bashrc:

source ~/.bashrc

Nggunakake AWS PostgreSQL

Kabeh sing isih ana yaiku masang basis data sing bakal diunggahake informasi sing wis diproses saka aliran kasebut. Kanggo iki, kita bakal nggunakake layanan AWS RDS.

Pindhah menyang konsol AWS -> AWS RDS -> Database -> Gawe database:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Pilih PostgreSQL banjur klik Sabanjure:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Amarga Conto iki mung kanggo tujuan pendidikan; kita bakal nggunakake server gratis "minimal" (Tier Gratis):
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Sabanjure, kita menehi tandha ing blok Free Tier, lan sawise iku kita bakal diwenehi conto kelas t2.micro kanthi otomatis - sanajan ora kuwat, iku gratis lan cocok kanggo tugas kita:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Sabanjure kedadeyan sing penting banget: jeneng conto database, jeneng pangguna master lan sandhi. Ayo jenenge conto: myHabrTest, pangguna master: habr, sandi: habr12345 lan klik tombol Sabanjure:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Ing kaca sabanjure ana paramèter sing tanggung jawab kanggo aksesibilitas server database saka njaba (aksesibilitas umum) lan kasedhiyan port:

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Ayo nggawe setelan anyar kanggo grup keamanan VPC, sing bakal ngidini akses eksternal menyang server database liwat port 5432 (PostgreSQL).
Ayo menyang konsol AWS ing jendela browser sing kapisah menyang Dashboard VPC -> Grup Keamanan -> Gawe bagean grup keamanan:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Kita nyetel jeneng kanggo grup Keamanan - PostgreSQL, deskripsi, nuduhake VPC sing kudu digandhengake karo grup iki lan klik tombol Gawe:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Isi aturan Inbound kanggo port 5432 kanggo grup sing mentas digawe, minangka ditampilake ing gambar ing ngisor iki. Sampeyan ora bisa nemtokake port kanthi manual, nanging pilih PostgreSQL saka dhaptar gulung mudhun Tipe.

Tegese, nilai ::/0 tegese kasedhiyan lalu lintas mlebu menyang server saka sak ndonya, sing sacara kanonik ora bener, nanging kanggo nganalisa contone, ayo nggunakake pendekatan iki:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Kita bali menyang kaca browser, ing ngendi kita mbukak "Konfigurasi setelan lanjut" lan pilih ing bagean grup keamanan VPC -> Pilih grup keamanan VPC sing ana -> PostgreSQL:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Sabanjure, ing opsi Database -> Jeneng database -> setel jeneng - habrDB.

Kita bisa ninggalake paramèter sing isih ana, kajaba mateni serep (periode retensi serep - 0 dina), ngawasi lan Performance Insights, kanthi standar. Klik ing tombol Nggawe database:
Apache Kafka lan Streaming Data Processing karo Spark Streaming

Penangan benang

Tahap pungkasan bakal dadi pangembangan proyek Spark, sing bakal ngolah data anyar saka Kafka saben rong detik lan nglebokake asil menyang database.

Kaya kasebut ing ndhuwur, checkpoints minangka mekanisme inti ing SparkStreaming sing kudu dikonfigurasi kanggo njamin toleransi kesalahan. Kita bakal nggunakake checkpoints lan, yen prosedur gagal, modul Spark Streaming mung kudu bali menyang checkpoint pungkasan lan nerusake petungan saka iku kanggo mbalekake data ilang.

Checkpointing bisa diaktifake kanthi nyetel direktori ing fault-tolerant, file sistem dipercaya (kayata HDFS, S3, etc.) kang informasi checkpoint bakal disimpen. Iki rampung nggunakake, contone:

streamingContext.checkpoint(checkpointDirectory)

Ing conto kita, kita bakal nggunakake pendekatan ing ngisor iki, yaiku, yen checkpointDirectory ana, banjur konteks bakal digawe maneh saka data checkpoint. Yen direktori ora ana (yaiku dieksekusi kanggo pisanan), banjur functionToCreateContext diarani kanggo nggawe konteks anyar lan ngatur DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Kita nggawe obyek DirectStream kanggo nyambung menyang topik "transaksi" nggunakake metode createDirectStream saka perpustakaan KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing data mlebu ing format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Nggunakake Spark SQL, kita nggawe klompok prasaja lan nampilake asil ing console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Njupuk teks pitakon lan mbukak liwat Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Banjur kita nyimpen data sing dikumpulake menyang tabel ing AWS RDS. Kanggo nyimpen asil agregasi menyang tabel database, kita bakal nggunakake metode nulis obyek DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Sawetara tembung babagan nyetel sambungan menyang AWS RDS. Kita nggawe pangguna lan sandhi kasebut ing langkah "Panyebaran AWS PostgreSQL". Sampeyan kudu nggunakake Endpoint minangka url server database, sing ditampilake ing bagean Konektivitas & keamanan:

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Kanggo nyambungake Spark lan Kafka kanthi bener, sampeyan kudu mbukak proyek kasebut liwat smark-submit nggunakake artefak spark-streaming-kafka-0-8_2.11. Kajaba iku, kita uga bakal nggunakake artefak kanggo sesambungan karo database PostgreSQL; kita bakal nransfer liwat --packages.

Kanggo keluwesan script, kita uga bakal kalebu minangka paramèter input jeneng server pesen lan topik saka ngendi kita arep kanggo nampa data.

Dadi, wektune kanggo miwiti lan mriksa fungsi sistem:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Kabeh wis rampung! Nalika sampeyan bisa ndeleng ing gambar ing ngisor iki, nalika aplikasi lagi mlaku, asil agregasi anyar metu saben 2 detik, amarga kita nyetel interval batching kanggo 2 detik nalika kita nggawe obyek StreamingContext:

Apache Kafka lan Streaming Data Processing karo Spark Streaming

Sabanjure, kita nggawe pitakon prasaja menyang database kanggo mriksa anané cathetan ing tabel transaction_flow:

Apache Kafka lan Streaming Data Processing karo Spark Streaming

kesimpulan

Artikel iki ndeleng conto pamroses informasi stream nggunakake Spark Streaming bebarengan karo Apache Kafka lan PostgreSQL. Kanthi tuwuhing data saka macem-macem sumber, angel banget kanggo ngira-ngira nilai praktis Spark Streaming kanggo nggawe aplikasi streaming lan wektu nyata.

Sampeyan bisa nemokake kode sumber lengkap ing repositori ing GitHub.

Aku seneng ngrembug artikel iki, aku ngarep-arep komentar sampeyan, lan aku uga ngarep-arep kritik sing mbangun saka kabeh pembaca sing peduli.

Muga-muga sampeyan sukses!

PS. Kaping pisanan direncanakake nggunakake database PostgreSQL lokal, nanging amarga tresnaku marang AWS, aku mutusake mindhah database kasebut menyang awan. Ing artikel sabanjure babagan topik iki, aku bakal nuduhake carane ngetrapake kabeh sistem sing diterangake ing ndhuwur ing AWS nggunakake AWS Kinesis lan AWS EMR. Tindakake warta!

Source: www.habr.com

Add a comment