Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Halo, Habr! Hari ini kita akan membangun sistem yang akan memproses aliran pesan Apache Kafka menggunakan Spark Streaming dan menulis hasil pemrosesan ke database cloud AWS RDS.

Bayangkan sebuah lembaga kredit tertentu memberi kita tugas untuk memproses transaksi masuk “on the fly” di seluruh cabangnya. Hal ini dapat dilakukan dengan tujuan untuk segera menghitung posisi mata uang terbuka untuk perbendaharaan, batasan atau hasil keuangan untuk transaksi, dll.

Bagaimana menerapkan kasus ini tanpa menggunakan sihir dan mantra sihir - baca di bawah ini! Pergi!

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming
(Sumber gambar)

pengenalan

Tentu saja, pemrosesan data dalam jumlah besar secara real time memberikan banyak peluang untuk digunakan dalam sistem modern. Salah satu kombinasi paling populer untuk ini adalah tandem Apache Kafka dan Spark Streaming, di mana Kafka membuat aliran paket pesan masuk, dan Spark Streaming memproses paket-paket ini pada interval waktu tertentu.

Untuk meningkatkan toleransi kesalahan aplikasi, kami akan menggunakan pos pemeriksaan. Dengan mekanisme ini, ketika mesin Spark Streaming perlu memulihkan data yang hilang, ia hanya perlu kembali ke pos pemeriksaan terakhir dan melanjutkan penghitungan dari sana.

Arsitektur sistem yang dikembangkan

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Komponen yang digunakan:

  • Apache Kafka adalah sistem pesan terbitkan-langganan terdistribusi. Cocok untuk konsumsi pesan offline dan online. Untuk mencegah kehilangan data, pesan Kafka disimpan di disk dan direplikasi di dalam cluster. Sistem Kafka dibangun di atas layanan sinkronisasi ZooKeeper;
  • Streaming Apache Spark - Komponen Spark untuk memproses data streaming. Modul Spark Streaming dibangun menggunakan arsitektur micro-batch, di mana aliran data diinterpretasikan sebagai rangkaian paket data kecil yang berkelanjutan. Spark Streaming mengambil data dari berbagai sumber dan menggabungkannya ke dalam paket kecil. Paket baru dibuat secara berkala. Pada awal setiap interval waktu, paket baru dibuat, dan data apa pun yang diterima selama interval tersebut disertakan dalam paket. Di akhir interval, pertumbuhan paket berhenti. Besar kecilnya interval ditentukan oleh parameter yang disebut interval batch;
  • Apache Spark SQL - menggabungkan pemrosesan relasional dengan pemrograman fungsional Spark. Data terstruktur berarti data yang memiliki skema, yaitu sekumpulan bidang untuk semua rekaman. Spark SQL mendukung masukan dari berbagai sumber data terstruktur dan, berkat ketersediaan informasi skema, Spark SQL hanya dapat mengambil bidang rekaman yang diperlukan secara efisien, dan juga menyediakan API DataFrame;
  • AWSRDS adalah database relasional berbasis cloud yang relatif murah, layanan web yang menyederhanakan pengaturan, pengoperasian, dan penskalaan, dan dikelola langsung oleh Amazon.

Menginstal dan menjalankan server Kafka

Sebelum menggunakan Kafka secara langsung, Anda perlu memastikan Anda memiliki Java, karena... JVM digunakan untuk bekerja:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Mari buat pengguna baru untuk bekerja dengan Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Selanjutnya, unduh distribusinya dari situs resmi Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Buka paket arsip yang diunduh:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Langkah selanjutnya adalah opsional. Faktanya adalah pengaturan default tidak memungkinkan Anda untuk sepenuhnya menggunakan semua fitur Apache Kafka. Misalnya, menghapus topik, kategori, grup tempat pesan dapat dipublikasikan. Untuk mengubahnya, mari edit file konfigurasi:

vim ~/kafka/config/server.properties

Tambahkan yang berikut ini ke akhir file:

delete.topic.enable = true

Sebelum memulai server Kafka, Anda perlu memulai server ZooKeeper; kami akan menggunakan skrip tambahan yang disertakan dengan distribusi Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Setelah ZooKeeper berhasil dimulai, luncurkan server Kafka di terminal terpisah:

bin/kafka-server-start.sh config/server.properties

Mari buat topik baru bernama Transaksi:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Mari pastikan bahwa topik dengan jumlah partisi dan replikasi yang diperlukan telah dibuat:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Mari kita lewatkan momen pengujian produsen dan konsumen terhadap topik yang baru dibuat. Detail lebih lanjut tentang bagaimana Anda dapat menguji pengiriman dan penerimaan pesan tertulis di dokumentasi resmi - Kirim beberapa pesan. Baiklah, kita lanjutkan menulis produser dengan Python menggunakan KafkaProducer API.

Tulisan produser

Produser akan menghasilkan data acak - 100 pesan setiap detik. Yang kami maksud dengan data acak adalah kamus yang terdiri dari tiga bidang:

  • Cabang — nama tempat penjualan lembaga kredit;
  • Currency - transaksi mata uang;
  • Jumlah - jumlah transaksi. Jumlah tersebut akan menjadi angka positif jika merupakan pembelian mata uang oleh Bank, dan angka negatif jika merupakan penjualan.

Kode untuk produser terlihat seperti ini:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Selanjutnya dengan menggunakan metode send, kita mengirim pesan ke server, ke topik yang kita butuhkan, dalam format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Saat menjalankan skrip, kami menerima pesan berikut di terminal:

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Ini berarti semuanya berjalan sesuai keinginan kita - produser membuat dan mengirimkan pesan ke topik yang kita butuhkan.
Langkah selanjutnya adalah menginstal Spark dan memproses aliran pesan ini.

Menginstal Apache Spark

Apache Spark adalah platform komputasi cluster yang universal dan berkinerja tinggi.

Spark berkinerja lebih baik daripada implementasi populer model MapReduce sekaligus mendukung jenis komputasi yang lebih luas, termasuk kueri interaktif dan pemrosesan aliran. Kecepatan memainkan peran penting saat memproses data dalam jumlah besar, karena kecepatanlah yang memungkinkan Anda bekerja secara interaktif tanpa menghabiskan waktu menunggu beberapa menit atau jam. Salah satu kekuatan terbesar Spark yang membuatnya begitu cepat adalah kemampuannya melakukan penghitungan dalam memori.

Framework ini ditulis dalam Scala, jadi Anda perlu menginstalnya terlebih dahulu:

sudo apt-get install scala

Unduh distribusi Spark dari situs resmi:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Buka kemasan arsip:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tambahkan jalur ke Spark ke file bash:

vim ~/.bashrc

Tambahkan baris berikut melalui editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Jalankan perintah di bawah ini setelah melakukan perubahan pada bashrc:

source ~/.bashrc

Menerapkan AWS PostgreSQL

Yang tersisa hanyalah menyebarkan database tempat kami akan mengunggah informasi yang diproses dari aliran. Untuk ini kami akan menggunakan layanan AWS RDS.

Buka konsol AWS -> AWS RDS -> Database -> Buat database:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Pilih PostgreSQL dan klik Berikutnya:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Karena Contoh ini hanya untuk tujuan pendidikan; kami akan menggunakan server gratis “minimal” (Tingkat Gratis):
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Selanjutnya, kita beri tanda centang pada blok Tingkat Gratis, dan setelah itu kita akan secara otomatis ditawari sebuah instance dari kelas t2.micro - meskipun lemah, ini gratis dan cukup cocok untuk tugas kita:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Berikutnya adalah hal yang sangat penting: nama instance database, nama pengguna master, dan kata sandinya. Beri nama contohnya: myHabrTest, pengguna utama: habr, kata sandi: habr12345 dan klik tombol Berikutnya:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Pada halaman berikutnya terdapat parameter yang bertanggung jawab atas aksesibilitas server database kami dari luar (Aksesibilitas publik) dan ketersediaan port:

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Mari buat pengaturan baru untuk grup keamanan VPC, yang akan mengizinkan akses eksternal ke server database kita melalui port 5432 (PostgreSQL).
Mari kita pergi ke konsol AWS di jendela browser terpisah ke bagian Dasbor VPC -> Grup Keamanan -> Buat grup keamanan:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Kami menetapkan nama untuk grup Keamanan - PostgreSQL, deskripsinya, tunjukkan VPC mana yang harus dikaitkan dengan grup ini dan klik tombol Buat:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Isikan Inbound rule for port 5432 untuk grup yang baru dibuat, seperti terlihat pada gambar di bawah ini. Anda tidak dapat menentukan port secara manual, tetapi pilih PostgreSQL dari daftar drop-down Type.

Sebenarnya, nilai ::/0 berarti ketersediaan lalu lintas masuk ke server dari seluruh dunia, yang secara kanonik tidak sepenuhnya benar, tetapi untuk menganalisis contohnya, mari kita gunakan pendekatan ini:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Kami kembali ke halaman browser, di mana kami membuka "Konfigurasi pengaturan lanjutan" dan pilih di bagian Grup keamanan VPC -> Pilih grup keamanan VPC yang ada -> PostgreSQL:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Selanjutnya pada opsi Database -> Nama database -> atur nama - habrDB.

Kami dapat membiarkan parameter lainnya, kecuali menonaktifkan pencadangan (periode penyimpanan cadangan - 0 hari), pemantauan, dan Wawasan Kinerja, secara default. Klik pada tombol Buat database:
Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Penangan benang

Tahap terakhir adalah pengembangan pekerjaan Spark, yang akan memproses data baru yang berasal dari Kafka setiap dua detik dan memasukkan hasilnya ke dalam database.

Seperti disebutkan di atas, pos pemeriksaan adalah mekanisme inti di SparkStreaming yang harus dikonfigurasi untuk memastikan toleransi kesalahan. Kami akan menggunakan pos pemeriksaan dan, jika prosedur gagal, modul Spark Streaming hanya perlu kembali ke pos pemeriksaan terakhir dan melanjutkan penghitungan darinya untuk memulihkan data yang hilang.

Checkpointing dapat diaktifkan dengan mengatur direktori pada sistem file yang toleran terhadap kesalahan dan dapat diandalkan (seperti HDFS, S3, dll.) di mana informasi checkpoint akan disimpan. Ini dilakukan dengan menggunakan, misalnya:

streamingContext.checkpoint(checkpointDirectory)

Dalam contoh kita, kita akan menggunakan pendekatan berikut, yaitu jika checkpointDirectory ada, maka konteksnya akan dibuat ulang dari data checkpoint. Jika direktori tidak ada (yaitu dieksekusi untuk pertama kalinya), maka functionToCreateContext dipanggil untuk membuat konteks baru dan mengonfigurasi DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Kami membuat objek DirectStream untuk terhubung ke topik “transaksi” menggunakan metode createDirectStream dari perpustakaan KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Mengurai data masuk dalam format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Menggunakan Spark SQL, kami melakukan pengelompokan sederhana dan menampilkan hasilnya di konsol:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Mendapatkan teks kueri dan menjalankannya melalui Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Dan kemudian kami menyimpan data agregat yang dihasilkan ke dalam tabel di AWS RDS. Untuk menyimpan hasil agregasi ke tabel database, kita akan menggunakan metode tulis objek DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Beberapa kata tentang menyiapkan koneksi ke AWS RDS. Kami membuat pengguna dan kata sandi untuk itu pada langkah “Menerapkan AWS PostgreSQL”. Anda harus menggunakan Endpoint sebagai url server database, yang ditampilkan di bagian Konektivitas & keamanan:

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Untuk menghubungkan Spark dan Kafka dengan benar, Anda harus menjalankan pekerjaan melalui smark-submit menggunakan artefak percikan-streaming-kafka-0-8_2.11. Selain itu, kami juga akan menggunakan artefak untuk berinteraksi dengan database PostgreSQL; kami akan mentransfernya melalui --packages.

Untuk fleksibilitas skrip, kami juga akan menyertakan parameter input nama server pesan dan topik dari mana kami ingin menerima data.

Jadi, inilah waktunya untuk meluncurkan dan memeriksa fungsionalitas sistem:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Semuanya berhasil! Seperti yang Anda lihat pada gambar di bawah, saat aplikasi berjalan, hasil agregasi baru dikeluarkan setiap 2 detik, karena kita menyetel interval batching menjadi 2 detik saat membuat objek StreamingContext:

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Selanjutnya, kita membuat query sederhana ke database untuk memeriksa keberadaan record di tabel aliran_transaksi:

Apache Kafka dan Pemrosesan Data Streaming dengan Spark Streaming

Kesimpulan

Artikel ini membahas contoh pemrosesan aliran informasi menggunakan Spark Streaming bersama dengan Apache Kafka dan PostgreSQL. Dengan pertumbuhan data dari berbagai sumber, sulit untuk melebih-lebihkan nilai praktis Spark Streaming untuk membuat aplikasi streaming dan real-time.

Anda dapat menemukan kode sumber lengkap di repositori saya di GitHub.

Saya senang membahas artikel ini, saya menantikan komentar Anda, dan saya juga mengharapkan kritik yang membangun dari semua pembaca yang peduli.

Saya berharap Anda sukses!

Ps. Awalnya direncanakan menggunakan database lokal PostgreSQL, namun karena kecintaan saya pada AWS, saya memutuskan untuk memindahkan database tersebut ke cloud. Pada artikel berikutnya tentang topik ini, saya akan menunjukkan cara mengimplementasikan seluruh sistem yang dijelaskan di atas di AWS menggunakan AWS Kinesis dan AWS EMR. Ikuti beritanya!

Sumber: www.habr.com

Tambah komentar