Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming
Halo, Habr! Hari ini kita akan membangun sistem yang akan memproses aliran pesan Apache Kafka menggunakan Spark Streaming dan menulis hasil pemrosesan ke database cloud AWS RDS.
Bayangkan sebuah lembaga kredit tertentu memberi kita tugas untuk memproses transaksi masuk “on the fly” di seluruh cabangnya. Hal ini dapat dilakukan dengan tujuan untuk segera menghitung posisi mata uang terbuka untuk perbendaharaan, batasan atau hasil keuangan untuk transaksi, dll.
Bagaimana menerapkan kasus ini tanpa menggunakan sihir dan mantra sihir - baca di bawah ini! Pergi!
Tentu saja, pemrosesan data dalam jumlah besar secara real time memberikan banyak peluang untuk digunakan dalam sistem modern. Salah satu kombinasi paling populer untuk ini adalah tandem Apache Kafka dan Spark Streaming, di mana Kafka membuat aliran paket pesan masuk, dan Spark Streaming memproses paket-paket ini pada interval waktu tertentu.
Untuk meningkatkan toleransi kesalahan aplikasi, kami akan menggunakan pos pemeriksaan. Dengan mekanisme ini, ketika mesin Spark Streaming perlu memulihkan data yang hilang, ia hanya perlu kembali ke pos pemeriksaan terakhir dan melanjutkan penghitungan dari sana.
Arsitektur sistem yang dikembangkan
Komponen yang digunakan:
Apache Kafka adalah sistem pesan terbitkan-langganan terdistribusi. Cocok untuk konsumsi pesan offline dan online. Untuk mencegah kehilangan data, pesan Kafka disimpan di disk dan direplikasi di dalam cluster. Sistem Kafka dibangun di atas layanan sinkronisasi ZooKeeper;
Streaming Apache Spark - Komponen Spark untuk memproses data streaming. Modul Spark Streaming dibangun menggunakan arsitektur micro-batch, di mana aliran data diinterpretasikan sebagai rangkaian paket data kecil yang berkelanjutan. Spark Streaming mengambil data dari berbagai sumber dan menggabungkannya ke dalam paket kecil. Paket baru dibuat secara berkala. Pada awal setiap interval waktu, paket baru dibuat, dan data apa pun yang diterima selama interval tersebut disertakan dalam paket. Di akhir interval, pertumbuhan paket berhenti. Besar kecilnya interval ditentukan oleh parameter yang disebut interval batch;
Apache Spark SQL - menggabungkan pemrosesan relasional dengan pemrograman fungsional Spark. Data terstruktur berarti data yang memiliki skema, yaitu sekumpulan bidang untuk semua rekaman. Spark SQL mendukung masukan dari berbagai sumber data terstruktur dan, berkat ketersediaan informasi skema, Spark SQL hanya dapat mengambil bidang rekaman yang diperlukan secara efisien, dan juga menyediakan API DataFrame;
AWSRDS adalah database relasional berbasis cloud yang relatif murah, layanan web yang menyederhanakan pengaturan, pengoperasian, dan penskalaan, dan dikelola langsung oleh Amazon.
Menginstal dan menjalankan server Kafka
Sebelum menggunakan Kafka secara langsung, Anda perlu memastikan Anda memiliki Java, karena... JVM digunakan untuk bekerja:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Langkah selanjutnya adalah opsional. Faktanya adalah pengaturan default tidak memungkinkan Anda untuk sepenuhnya menggunakan semua fitur Apache Kafka. Misalnya, menghapus topik, kategori, grup tempat pesan dapat dipublikasikan. Untuk mengubahnya, mari edit file konfigurasi:
vim ~/kafka/config/server.properties
Tambahkan yang berikut ini ke akhir file:
delete.topic.enable = true
Sebelum memulai server Kafka, Anda perlu memulai server ZooKeeper; kami akan menggunakan skrip tambahan yang disertakan dengan distribusi Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Setelah ZooKeeper berhasil dimulai, luncurkan server Kafka di terminal terpisah:
Mari kita lewatkan momen pengujian produsen dan konsumen terhadap topik yang baru dibuat. Detail lebih lanjut tentang bagaimana Anda dapat menguji pengiriman dan penerimaan pesan tertulis di dokumentasi resmi - Kirim beberapa pesan. Baiklah, kita lanjutkan menulis produser dengan Python menggunakan KafkaProducer API.
Tulisan produser
Produser akan menghasilkan data acak - 100 pesan setiap detik. Yang kami maksud dengan data acak adalah kamus yang terdiri dari tiga bidang:
Cabang — nama tempat penjualan lembaga kredit;
Currency - transaksi mata uang;
Jumlah - jumlah transaksi. Jumlah tersebut akan menjadi angka positif jika merupakan pembelian mata uang oleh Bank, dan angka negatif jika merupakan penjualan.
Selanjutnya dengan menggunakan metode send, kita mengirim pesan ke server, ke topik yang kita butuhkan, dalam format JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Saat menjalankan skrip, kami menerima pesan berikut di terminal:
Ini berarti semuanya berjalan sesuai keinginan kita - produser membuat dan mengirimkan pesan ke topik yang kita butuhkan.
Langkah selanjutnya adalah menginstal Spark dan memproses aliran pesan ini.
Menginstal Apache Spark
Apache Spark adalah platform komputasi cluster yang universal dan berkinerja tinggi.
Spark berkinerja lebih baik daripada implementasi populer model MapReduce sekaligus mendukung jenis komputasi yang lebih luas, termasuk kueri interaktif dan pemrosesan aliran. Kecepatan memainkan peran penting saat memproses data dalam jumlah besar, karena kecepatanlah yang memungkinkan Anda bekerja secara interaktif tanpa menghabiskan waktu menunggu beberapa menit atau jam. Salah satu kekuatan terbesar Spark yang membuatnya begitu cepat adalah kemampuannya melakukan penghitungan dalam memori.
Framework ini ditulis dalam Scala, jadi Anda perlu menginstalnya terlebih dahulu:
Jalankan perintah di bawah ini setelah melakukan perubahan pada bashrc:
source ~/.bashrc
Menerapkan AWS PostgreSQL
Yang tersisa hanyalah menyebarkan database tempat kami akan mengunggah informasi yang diproses dari aliran. Untuk ini kami akan menggunakan layanan AWS RDS.
Karena Contoh ini hanya untuk tujuan pendidikan; kami akan menggunakan server gratis “minimal” (Tingkat Gratis):
Selanjutnya, kita beri tanda centang pada blok Tingkat Gratis, dan setelah itu kita akan secara otomatis ditawari sebuah instance dari kelas t2.micro - meskipun lemah, ini gratis dan cukup cocok untuk tugas kita:
Berikutnya adalah hal yang sangat penting: nama instance database, nama pengguna master, dan kata sandinya. Beri nama contohnya: myHabrTest, pengguna utama: habr, kata sandi: habr12345 dan klik tombol Berikutnya:
Pada halaman berikutnya terdapat parameter yang bertanggung jawab atas aksesibilitas server database kami dari luar (Aksesibilitas publik) dan ketersediaan port:
Mari buat pengaturan baru untuk grup keamanan VPC, yang akan mengizinkan akses eksternal ke server database kita melalui port 5432 (PostgreSQL).
Mari kita pergi ke konsol AWS di jendela browser terpisah ke bagian Dasbor VPC -> Grup Keamanan -> Buat grup keamanan:
Kami menetapkan nama untuk grup Keamanan - PostgreSQL, deskripsinya, tunjukkan VPC mana yang harus dikaitkan dengan grup ini dan klik tombol Buat:
Isikan Inbound rule for port 5432 untuk grup yang baru dibuat, seperti terlihat pada gambar di bawah ini. Anda tidak dapat menentukan port secara manual, tetapi pilih PostgreSQL dari daftar drop-down Type.
Sebenarnya, nilai ::/0 berarti ketersediaan lalu lintas masuk ke server dari seluruh dunia, yang secara kanonik tidak sepenuhnya benar, tetapi untuk menganalisis contohnya, mari kita gunakan pendekatan ini:
Kami kembali ke halaman browser, di mana kami membuka "Konfigurasi pengaturan lanjutan" dan pilih di bagian Grup keamanan VPC -> Pilih grup keamanan VPC yang ada -> PostgreSQL:
Selanjutnya pada opsi Database -> Nama database -> atur nama - habrDB.
Kami dapat membiarkan parameter lainnya, kecuali menonaktifkan pencadangan (periode penyimpanan cadangan - 0 hari), pemantauan, dan Wawasan Kinerja, secara default. Klik pada tombol Buat database:
Penangan benang
Tahap terakhir adalah pengembangan pekerjaan Spark, yang akan memproses data baru yang berasal dari Kafka setiap dua detik dan memasukkan hasilnya ke dalam database.
Seperti disebutkan di atas, pos pemeriksaan adalah mekanisme inti di SparkStreaming yang harus dikonfigurasi untuk memastikan toleransi kesalahan. Kami akan menggunakan pos pemeriksaan dan, jika prosedur gagal, modul Spark Streaming hanya perlu kembali ke pos pemeriksaan terakhir dan melanjutkan penghitungan darinya untuk memulihkan data yang hilang.
Checkpointing dapat diaktifkan dengan mengatur direktori pada sistem file yang toleran terhadap kesalahan dan dapat diandalkan (seperti HDFS, S3, dll.) di mana informasi checkpoint akan disimpan. Ini dilakukan dengan menggunakan, misalnya:
streamingContext.checkpoint(checkpointDirectory)
Dalam contoh kita, kita akan menggunakan pendekatan berikut, yaitu jika checkpointDirectory ada, maka konteksnya akan dibuat ulang dari data checkpoint. Jika direktori tidak ada (yaitu dieksekusi untuk pertama kalinya), maka functionToCreateContext dipanggil untuk membuat konteks baru dan mengonfigurasi DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Kami membuat objek DirectStream untuk terhubung ke topik “transaksi” menggunakan metode createDirectStream dari perpustakaan KafkaUtils:
Menggunakan Spark SQL, kami melakukan pengelompokan sederhana dan menampilkan hasilnya di konsol:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Mendapatkan teks kueri dan menjalankannya melalui Spark SQL:
Dan kemudian kami menyimpan data agregat yang dihasilkan ke dalam tabel di AWS RDS. Untuk menyimpan hasil agregasi ke tabel database, kita akan menggunakan metode tulis objek DataFrame:
Beberapa kata tentang menyiapkan koneksi ke AWS RDS. Kami membuat pengguna dan kata sandi untuk itu pada langkah “Menerapkan AWS PostgreSQL”. Anda harus menggunakan Endpoint sebagai url server database, yang ditampilkan di bagian Konektivitas & keamanan:
Untuk menghubungkan Spark dan Kafka dengan benar, Anda harus menjalankan pekerjaan melalui smark-submit menggunakan artefak percikan-streaming-kafka-0-8_2.11. Selain itu, kami juga akan menggunakan artefak untuk berinteraksi dengan database PostgreSQL; kami akan mentransfernya melalui --packages.
Untuk fleksibilitas skrip, kami juga akan menyertakan parameter input nama server pesan dan topik dari mana kami ingin menerima data.
Jadi, inilah waktunya untuk meluncurkan dan memeriksa fungsionalitas sistem:
Semuanya berhasil! Seperti yang Anda lihat pada gambar di bawah, saat aplikasi berjalan, hasil agregasi baru dikeluarkan setiap 2 detik, karena kita menyetel interval batching menjadi 2 detik saat membuat objek StreamingContext:
Selanjutnya, kita membuat query sederhana ke database untuk memeriksa keberadaan record di tabel aliran_transaksi:
Kesimpulan
Artikel ini membahas contoh pemrosesan aliran informasi menggunakan Spark Streaming bersama dengan Apache Kafka dan PostgreSQL. Dengan pertumbuhan data dari berbagai sumber, sulit untuk melebih-lebihkan nilai praktis Spark Streaming untuk membuat aplikasi streaming dan real-time.
Anda dapat menemukan kode sumber lengkap di repositori saya di GitHub.
Saya senang membahas artikel ini, saya menantikan komentar Anda, dan saya juga mengharapkan kritik yang membangun dari semua pembaca yang peduli.
Saya berharap Anda sukses!
Ps. Awalnya direncanakan menggunakan database lokal PostgreSQL, namun karena kecintaan saya pada AWS, saya memutuskan untuk memindahkan database tersebut ke cloud. Pada artikel berikutnya tentang topik ini, saya akan menunjukkan cara mengimplementasikan seluruh sistem yang dijelaskan di atas di AWS menggunakan AWS Kinesis dan AWS EMR. Ikuti beritanya!