Apache Kafka lan Streaming Data Processing karo Spark Streaming
Sugeng rawuh, Habr! Dina iki kita bakal mbangun sistem sing bakal ngolah stream pesen Apache Kafka nggunakake Spark Streaming lan nulis asil pangolahan menyang database awan AWS RDS.
Coba bayangake yen institusi kredit tartamtu nemtokake tugas kanggo ngolah transaksi sing mlebu "ing fly" ing kabeh cabang. Iki bisa ditindakake kanthi cepet kanggo ngitung posisi mata uang sing mbukak kanggo treasury, watesan utawa asil finansial kanggo transaksi, lsp.
Cara ngleksanakake kasus iki tanpa nggunakake sihir lan mantra sihir - maca ing ngisor potong! Tindak!
Mesthine, ngolah data sing akeh ing wektu nyata nyedhiyakake akeh kesempatan kanggo digunakake ing sistem modern. Salah sawijining kombinasi sing paling populer kanggo iki yaiku tandem Apache Kafka lan Spark Streaming, ing ngendi Kafka nggawe stream paket pesen sing mlebu, lan Spark Streaming ngolah paket kasebut ing interval wektu tartamtu.
Kanggo nambah toleransi fault saka aplikasi, kita bakal nggunakake checkpoints. Kanthi mekanisme iki, nalika mesin Spark Streaming kudu mbalekake data sing ilang, mung kudu bali menyang checkpoint pungkasan lan nerusake petungan saka kono.
Arsitektur sistem sing dikembangake
Komponen sing digunakake:
Apache Kafka yaiku sistem olahpesen publish-subscribe sing disebarake. Cocog kanggo konsumsi pesen offline lan online. Kanggo nyegah mundhut data, pesen Kafka disimpen ing disk lan ditiru ing kluster. Sistem Kafka dibangun ing ndhuwur layanan sinkronisasi ZooKeeper;
Apache Spark Streaming - Komponen Spark kanggo ngolah data streaming. Modul Spark Streaming dibangun nggunakake arsitektur kumpulan mikro, ing ngendi aliran data diinterpretasikake minangka urutan terus-terusan saka paket data cilik. Spark Streaming njupuk data saka macem-macem sumber lan nggabungake menyang paket cilik. Paket anyar digawe kanthi interval biasa. Ing wiwitan saben interval wektu, paket anyar digawe, lan data apa wae sing ditampa sajrone interval kasebut kalebu ing paket kasebut. Ing pungkasan interval, wutah paket mandheg. Ukuran interval ditemtokake dening parameter sing disebut interval kumpulan;
Apache Spark SQL - nggabungake pangolahan hubungan karo pemrograman fungsional Spark. Data terstruktur tegese data sing nduweni skema, yaiku, siji set kolom kanggo kabeh rekaman. Spark SQL ndhukung input saka macem-macem sumber data kabentuk lan, thanks kanggo kasedhiyan informasi skema, iku bisa irit njupuk mung kothak dibutuhake cathetan, lan uga nyedhiyani API DataFrame;
AWS RDS minangka basis data relasional basis maya sing relatif murah, layanan web sing nyederhanakake persiyapan, operasi lan skala, lan dikelola langsung dening Amazon.
Nginstal lan mbukak server Kafka
Sadurunge nggunakake Kafka langsung, sampeyan kudu nggawe manawa sampeyan duwe Jawa, amarga ... JVM digunakake kanggo karya:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Langkah sabanjure opsional. Kasunyatane yaiku setelan gawan ora ngidini sampeyan nggunakake kabeh kemampuan Apache Kafka. Contone, mbusak topik, kategori, grup sing pesen bisa diterbitake. Kanggo ngganti iki, ayo ngowahi file konfigurasi:
vim ~/kafka/config/server.properties
Tambah ing ngisor iki menyang mburi file:
delete.topic.enable = true
Sadurunge miwiti server Kafka, sampeyan kudu miwiti server ZooKeeper; kita bakal nggunakake skrip tambahan sing kasedhiya karo distribusi Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Sawise ZooKeeper wis sukses, bukak server Kafka ing terminal sing kapisah:
Ayo kantun wektu nyoba produser lan konsumen kanggo topik sing mentas digawe. Rincian liyane babagan carane sampeyan bisa nyoba ngirim lan nampa pesen ditulis ing dokumentasi resmi - Kirim sawetara pesen. Inggih, kita nerusake kanggo nulis produser ing Python nggunakake KafkaProducer API.
Tulisan produser
Produser bakal ngasilake data acak - 100 pesen saben detik. Miturut data acak, tegese kamus sing dumadi saka telung lapangan:
Branch - jeneng titik jual institusi kredit;
Currency - mata uang transaksi;
jumlah - jumlah transaksi. Jumlah kasebut bakal dadi nomer positif yen tuku mata uang dening Bank, lan nomer negatif yen adol.
Sabanjure, nggunakake metode kirim, kita ngirim pesen menyang server, menyang topik sing dibutuhake, ing format JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Nalika mbukak skrip, kita nampa pesen ing ngisor iki ing terminal:
Iki tegese kabeh bisa kaya sing dikarepake - produser ngasilake lan ngirim pesen menyang topik sing dibutuhake.
Langkah sabanjure kanggo nginstal Spark lan proses stream pesen iki.
Nginstal Apache Spark
Apache Spark minangka platform komputasi kluster universal lan kinerja dhuwur.
Spark nindakake luwih apik tinimbang implementasi populer saka model MapReduce nalika ndhukung macem-macem jinis komputasi, kalebu pitakon interaktif lan pangolahan stream. Kacepetan nduweni peran penting nalika ngolah data sing akeh, amarga kacepetan sing ngidini sampeyan bisa kerja kanthi interaktif tanpa ngenteni menit utawa jam. Salah sawijining kekiyatan paling gedhe Spark sing ndadekake cepet banget yaiku kemampuan kanggo nindakake petungan ing memori.
Kerangka iki ditulis ing Scala, dadi sampeyan kudu nginstal dhisik:
Jalanake perintah ing ngisor iki sawise nggawe pangowahan menyang bashrc:
source ~/.bashrc
Nggunakake AWS PostgreSQL
Kabeh sing isih ana yaiku masang basis data sing bakal diunggahake informasi sing wis diproses saka aliran kasebut. Kanggo iki, kita bakal nggunakake layanan AWS RDS.
Amarga Conto iki mung kanggo tujuan pendidikan; kita bakal nggunakake server gratis "minimal" (Tier Gratis):
Sabanjure, kita menehi tandha ing blok Free Tier, lan sawise iku kita bakal diwenehi conto kelas t2.micro kanthi otomatis - sanajan ora kuwat, iku gratis lan cocok kanggo tugas kita:
Sabanjure kedadeyan sing penting banget: jeneng conto database, jeneng pangguna master lan sandhi. Ayo jenenge conto: myHabrTest, pangguna master: habr, sandi: habr12345 lan klik tombol Sabanjure:
Ing kaca sabanjure ana paramèter sing tanggung jawab kanggo aksesibilitas server database saka njaba (aksesibilitas umum) lan kasedhiyan port:
Ayo nggawe setelan anyar kanggo grup keamanan VPC, sing bakal ngidini akses eksternal menyang server database liwat port 5432 (PostgreSQL).
Ayo menyang konsol AWS ing jendela browser sing kapisah menyang Dashboard VPC -> Grup Keamanan -> Gawe bagean grup keamanan:
Kita nyetel jeneng kanggo grup Keamanan - PostgreSQL, deskripsi, nuduhake VPC sing kudu digandhengake karo grup iki lan klik tombol Gawe:
Isi aturan Inbound kanggo port 5432 kanggo grup sing mentas digawe, minangka ditampilake ing gambar ing ngisor iki. Sampeyan ora bisa nemtokake port kanthi manual, nanging pilih PostgreSQL saka dhaptar gulung mudhun Tipe.
Tegese, nilai ::/0 tegese kasedhiyan lalu lintas mlebu menyang server saka sak ndonya, sing sacara kanonik ora bener, nanging kanggo nganalisa contone, ayo nggunakake pendekatan iki:
Kita bali menyang kaca browser, ing ngendi kita mbukak "Konfigurasi setelan lanjut" lan pilih ing bagean grup keamanan VPC -> Pilih grup keamanan VPC sing ana -> PostgreSQL:
Sabanjure, ing opsi Database -> Jeneng database -> setel jeneng - habrDB.
Kita bisa ninggalake paramèter sing isih ana, kajaba mateni serep (periode retensi serep - 0 dina), ngawasi lan Performance Insights, kanthi standar. Klik ing tombol Nggawe database:
Penangan benang
Tahap pungkasan bakal dadi pangembangan proyek Spark, sing bakal ngolah data anyar saka Kafka saben rong detik lan nglebokake asil menyang database.
Kaya kasebut ing ndhuwur, checkpoints minangka mekanisme inti ing SparkStreaming sing kudu dikonfigurasi kanggo njamin toleransi kesalahan. Kita bakal nggunakake checkpoints lan, yen prosedur gagal, modul Spark Streaming mung kudu bali menyang checkpoint pungkasan lan nerusake petungan saka iku kanggo mbalekake data ilang.
Checkpointing bisa diaktifake kanthi nyetel direktori ing fault-tolerant, file sistem dipercaya (kayata HDFS, S3, etc.) kang informasi checkpoint bakal disimpen. Iki rampung nggunakake, contone:
streamingContext.checkpoint(checkpointDirectory)
Ing conto kita, kita bakal nggunakake pendekatan ing ngisor iki, yaiku, yen checkpointDirectory ana, banjur konteks bakal digawe maneh saka data checkpoint. Yen direktori ora ana (yaiku dieksekusi kanggo pisanan), banjur functionToCreateContext diarani kanggo nggawe konteks anyar lan ngatur DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Kita nggawe obyek DirectStream kanggo nyambung menyang topik "transaksi" nggunakake metode createDirectStream saka perpustakaan KafkaUtils:
Nggunakake Spark SQL, kita nggawe klompok prasaja lan nampilake asil ing console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Banjur kita nyimpen data sing dikumpulake menyang tabel ing AWS RDS. Kanggo nyimpen asil agregasi menyang tabel database, kita bakal nggunakake metode nulis obyek DataFrame:
Sawetara tembung babagan nyetel sambungan menyang AWS RDS. Kita nggawe pangguna lan sandhi kasebut ing langkah "Panyebaran AWS PostgreSQL". Sampeyan kudu nggunakake Endpoint minangka url server database, sing ditampilake ing bagean Konektivitas & keamanan:
Kanggo nyambungake Spark lan Kafka kanthi bener, sampeyan kudu mbukak proyek kasebut liwat smark-submit nggunakake artefak spark-streaming-kafka-0-8_2.11. Kajaba iku, kita uga bakal nggunakake artefak kanggo sesambungan karo database PostgreSQL; kita bakal nransfer liwat --packages.
Kanggo keluwesan script, kita uga bakal kalebu minangka paramèter input jeneng server pesen lan topik saka ngendi kita arep kanggo nampa data.
Dadi, wektune kanggo miwiti lan mriksa fungsi sistem:
Kabeh wis rampung! Nalika sampeyan bisa ndeleng ing gambar ing ngisor iki, nalika aplikasi lagi mlaku, asil agregasi anyar metu saben 2 detik, amarga kita nyetel interval batching kanggo 2 detik nalika kita nggawe obyek StreamingContext:
Sabanjure, kita nggawe pitakon prasaja menyang database kanggo mriksa anané cathetan ing tabel transaction_flow:
kesimpulan
Artikel iki ndeleng conto pamroses informasi stream nggunakake Spark Streaming bebarengan karo Apache Kafka lan PostgreSQL. Kanthi tuwuhing data saka macem-macem sumber, angel banget kanggo ngira-ngira nilai praktis Spark Streaming kanggo nggawe aplikasi streaming lan wektu nyata.
Sampeyan bisa nemokake kode sumber lengkap ing repositori ing GitHub.
Aku seneng ngrembug artikel iki, aku ngarep-arep komentar sampeyan, lan aku uga ngarep-arep kritik sing mbangun saka kabeh pembaca sing peduli.
Muga-muga sampeyan sukses!
PS. Kaping pisanan direncanakake nggunakake database PostgreSQL lokal, nanging amarga tresnaku marang AWS, aku mutusake mindhah database kasebut menyang awan. Ing artikel sabanjure babagan topik iki, aku bakal nuduhake carane ngetrapake kabeh sistem sing diterangake ing ndhuwur ing AWS nggunakake AWS Kinesis lan AWS EMR. Tindakake warta!