Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Hello, Habr! Hari ini kami akan membina sistem yang akan memproses strim mesej Apache Kafka menggunakan Spark Streaming dan menulis hasil pemprosesan ke pangkalan data awan AWS RDS.

Bayangkan bahawa institusi kredit tertentu menetapkan tugas kepada kita untuk memproses transaksi masuk "dengan cepat" di semua cawangannya. Ini boleh dilakukan untuk tujuan pengiraan segera kedudukan mata wang terbuka untuk perbendaharaan, had atau keputusan kewangan untuk urus niaga, dsb.

Bagaimana untuk melaksanakan kes ini tanpa menggunakan sihir dan mantra sihir - baca di bawah potongan! Pergi!

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark
(Sumber imej)

Pengenalan

Sudah tentu, memproses sejumlah besar data dalam masa nyata memberikan peluang yang luas untuk digunakan dalam sistem moden. Salah satu gabungan yang paling popular untuk ini ialah gabungan Apache Kafka dan Spark Streaming, di mana Kafka mencipta aliran paket mesej masuk dan Spark Streaming memproses paket ini pada selang masa tertentu.

Untuk meningkatkan toleransi kesalahan aplikasi, kami akan menggunakan pusat pemeriksaan. Dengan mekanisme ini, apabila enjin Spark Streaming perlu memulihkan data yang hilang, ia hanya perlu kembali ke pusat pemeriksaan terakhir dan menyambung pengiraan dari sana.

Seni bina sistem yang dibangunkan

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Komponen yang digunakan:

  • Apache Kafka ialah sistem pemesejan terbitan-langganan yang diedarkan. Sesuai untuk penggunaan mesej luar talian dan dalam talian. Untuk mengelakkan kehilangan data, mesej Kafka disimpan pada cakera dan direplikasi dalam kelompok. Sistem Kafka dibina di atas perkhidmatan penyegerakan ZooKeeper;
  • Penstriman Apache Spark - Komponen Spark untuk memproses data penstriman. Modul Spark Streaming dibina menggunakan seni bina kelompok mikro, di mana aliran data ditafsirkan sebagai urutan berterusan paket data kecil. Spark Streaming mengambil data daripada sumber yang berbeza dan menggabungkannya ke dalam pakej kecil. Pakej baharu dibuat pada selang masa yang tetap. Pada permulaan setiap selang masa, paket baharu dicipta dan sebarang data yang diterima semasa selang itu dimasukkan ke dalam paket. Pada penghujung selang, pertumbuhan paket berhenti. Saiz selang ditentukan oleh parameter yang dipanggil selang kelompok;
  • Apache Spark SQL - menggabungkan pemprosesan hubungan dengan pengaturcaraan berfungsi Spark. Data berstruktur bermaksud data yang mempunyai skema, iaitu satu set medan untuk semua rekod. Spark SQL menyokong input daripada pelbagai sumber data berstruktur dan, terima kasih kepada ketersediaan maklumat skema, ia boleh mendapatkan semula dengan cekap hanya medan rekod yang diperlukan, dan juga menyediakan API DataFrame;
  • AWS RDS ialah pangkalan data hubungan berasaskan awan yang agak murah, perkhidmatan web yang memudahkan persediaan, operasi dan penskalaan, dan ditadbir secara langsung oleh Amazon.

Memasang dan menjalankan pelayan Kafka

Sebelum menggunakan Kafka secara langsung, anda perlu memastikan anda mempunyai Java, kerana... JVM digunakan untuk kerja:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Mari buat pengguna baharu untuk bekerja dengan Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Seterusnya, muat turun pengedaran dari laman web rasmi Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Buka pek arkib yang dimuat turun:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Langkah seterusnya adalah pilihan. Hakikatnya ialah tetapan lalai tidak membenarkan anda menggunakan sepenuhnya semua ciri Apache Kafka. Sebagai contoh, padamkan topik, kategori, kumpulan yang mesejnya boleh diterbitkan. Untuk menukar ini, mari edit fail konfigurasi:

vim ~/kafka/config/server.properties

Tambahkan yang berikut pada penghujung fail:

delete.topic.enable = true

Sebelum memulakan pelayan Kafka, anda perlu memulakan pelayan ZooKeeper; kami akan menggunakan skrip tambahan yang disertakan dengan pengedaran Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Selepas ZooKeeper berjaya dimulakan, lancarkan pelayan Kafka dalam terminal berasingan:

bin/kafka-server-start.sh config/server.properties

Mari buat topik baharu yang dipanggil Transaksi:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Mari pastikan bahawa topik dengan bilangan partition dan replikasi yang diperlukan telah dibuat:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Mari terlepas detik-detik menguji pengeluar dan pengguna untuk topik yang baru dibuat. Butiran lanjut tentang cara anda boleh menguji penghantaran dan penerimaan mesej ditulis dalam dokumentasi rasmi - Hantar beberapa mesej. Baiklah, kami terus menulis pengeluar dalam Python menggunakan API KafkaProducer.

Penulisan penerbit

Pengeluar akan menjana data rawak - 100 mesej setiap saat. Dengan data rawak yang kami maksudkan kamus yang terdiri daripada tiga medan:

  • Cawangan β€” nama tempat jualan institusi kredit;
  • mata wang β€” mata wang urus niaga;
  • jumlah - jumlah transaksi. Jumlah tersebut akan menjadi nombor positif jika ia merupakan pembelian mata wang oleh Bank, dan nombor negatif jika ia adalah jualan.

Kod untuk pengeluar kelihatan seperti ini:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Seterusnya, menggunakan kaedah hantar, kami menghantar mesej ke pelayan, ke topik yang kami perlukan, dalam format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Semasa menjalankan skrip, kami menerima mesej berikut dalam terminal:

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Ini bermakna semuanya berfungsi seperti yang kita mahu - pengeluar menjana dan menghantar mesej kepada topik yang kita perlukan.
Langkah seterusnya ialah memasang Spark dan memproses aliran mesej ini.

Memasang Apache Spark

Apache Spark ialah platform pengkomputeran kluster sejagat dan berprestasi tinggi.

Spark berprestasi lebih baik daripada pelaksanaan popular model MapReduce sambil menyokong pelbagai jenis pengiraan yang lebih luas, termasuk pertanyaan interaktif dan pemprosesan strim. Kelajuan memainkan peranan penting apabila memproses sejumlah besar data, kerana ia adalah kelajuan yang membolehkan anda bekerja secara interaktif tanpa menghabiskan minit atau jam menunggu. Salah satu perkara terbesar yang menjadikan Spark begitu pantas ialah keupayaannya untuk melakukan pengiraan dalam memori.

Rangka kerja ini ditulis dalam Scala, jadi anda perlu memasangnya terlebih dahulu:

sudo apt-get install scala

Muat turun pengedaran Spark dari laman web rasmi:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Buka pembungkusan arkib:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tambahkan laluan ke Spark ke fail bash:

vim ~/.bashrc

Tambah baris berikut melalui editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Jalankan arahan di bawah selepas membuat perubahan pada bashrc:

source ~/.bashrc

Menggunakan AWS PostgreSQL

Apa yang tinggal ialah menggunakan pangkalan data di mana kami akan memuat naik maklumat yang diproses daripada aliran. Untuk ini kami akan menggunakan perkhidmatan AWS RDS.

Pergi ke konsol AWS -> AWS RDS -> Pangkalan Data -> Buat pangkalan data:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Pilih PostgreSQL dan klik Seterusnya:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Kerana Contoh ini adalah untuk tujuan pendidikan sahaja; kami akan menggunakan pelayan percuma "sekurang-kurangnya" (Peringkat Percuma):
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Seterusnya, kami meletakkan tanda pada blok Tahap Percuma, dan selepas itu kami akan ditawarkan secara automatik contoh kelas t2.micro - walaupun lemah, ia adalah percuma dan agak sesuai untuk tugas kami:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Seterusnya datang perkara yang sangat penting: nama contoh pangkalan data, nama pengguna induk dan kata laluannya. Mari namakan contoh: myHabrTest, pengguna induk: habr, kata laluan: habr12345 dan klik pada butang Seterusnya:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Pada halaman seterusnya terdapat parameter yang bertanggungjawab untuk kebolehcapaian pelayan pangkalan data kami dari luar (Kebolehcapaian awam) dan ketersediaan port:

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Mari buat tetapan baharu untuk kumpulan keselamatan VPC, yang akan membenarkan akses luaran kepada pelayan pangkalan data kami melalui port 5432 (PostgreSQL).
Mari pergi ke konsol AWS dalam tetingkap penyemak imbas yang berasingan ke Papan Pemuka VPC -> Kumpulan Keselamatan -> Buat bahagian kumpulan keselamatan:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Kami menetapkan nama untuk kumpulan Keselamatan - PostgreSQL, perihalan, menunjukkan VPC kumpulan ini harus dikaitkan dan klik butang Cipta:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Isikan peraturan Masuk untuk port 5432 untuk kumpulan yang baru dibuat, seperti yang ditunjukkan dalam gambar di bawah. Anda tidak boleh menentukan port secara manual, tetapi pilih PostgreSQL daripada senarai juntai bawah Jenis.

Tegasnya, nilai ::/0 bermaksud ketersediaan trafik masuk ke pelayan dari seluruh dunia, yang secara kanonik tidak sepenuhnya benar, tetapi untuk menganalisis contoh, mari membenarkan diri kita menggunakan pendekatan ini:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Kami kembali ke halaman penyemak imbas, di mana kami membuka "Konfigurasikan tetapan lanjutan" dan pilih dalam bahagian kumpulan keselamatan VPC -> Pilih kumpulan keselamatan VPC sedia ada -> PostgreSQL:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Seterusnya, dalam pilihan Pangkalan Data -> Nama pangkalan data -> tetapkan nama - habrDB.

Kami boleh meninggalkan parameter yang selebihnya, dengan pengecualian melumpuhkan sandaran (tempoh pengekalan sandaran - 0 hari), pemantauan dan Cerapan Prestasi, secara lalai. Klik pada butang Buat pangkalan data:
Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Pengendali benang

Peringkat terakhir ialah pembangunan kerja Spark, yang akan memproses data baharu yang datang daripada Kafka setiap dua saat dan memasukkan hasilnya ke dalam pangkalan data.

Seperti yang dinyatakan di atas, pusat pemeriksaan ialah mekanisme teras dalam SparkStreaming yang mesti dikonfigurasikan untuk memastikan toleransi kesalahan. Kami akan menggunakan pusat pemeriksaan dan, jika prosedur gagal, modul Spark Streaming hanya perlu kembali ke pusat pemeriksaan terakhir dan menyambung semula pengiraan daripadanya untuk memulihkan data yang hilang.

Pemeriksaan boleh didayakan dengan menetapkan direktori pada sistem fail yang tahan terhadap kerosakan dan boleh dipercayai (seperti HDFS, S3, dll.) di mana maklumat pusat pemeriksaan akan disimpan. Ini dilakukan menggunakan, sebagai contoh:

streamingContext.checkpoint(checkpointDirectory)

Dalam contoh kami, kami akan menggunakan pendekatan berikut, iaitu, jika checkpointDirectory wujud, maka konteks akan dicipta semula daripada data checkpoint. Jika direktori tidak wujud (iaitu dilaksanakan buat kali pertama), maka functionToCreateContext dipanggil untuk mencipta konteks baharu dan mengkonfigurasi DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Kami mencipta objek DirectStream untuk menyambung ke topik "transaksi" menggunakan kaedah createDirectStream pustaka KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Menghuraikan data masuk dalam format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Menggunakan Spark SQL, kami melakukan pengumpulan mudah dan memaparkan hasilnya dalam konsol:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Mendapatkan teks pertanyaan dan menjalankannya melalui Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Dan kemudian kami menyimpan data agregat yang terhasil ke dalam jadual dalam AWS RDS. Untuk menyimpan hasil pengagregatan ke jadual pangkalan data, kami akan menggunakan kaedah tulis objek DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Beberapa perkataan tentang menyediakan sambungan ke AWS RDS. Kami mencipta pengguna dan kata laluan untuknya pada langkah "Memasang AWS PostgreSQL". Titik akhir harus digunakan sebagai url pelayan pangkalan data, yang dipaparkan dalam bahagian Kesambungan & keselamatan:

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Untuk menyambungkan Spark dan Kafka dengan betul, anda harus menjalankan tugas melalui smark-submit menggunakan artifak spark-streaming-kafka-0-8_2.11. Selain itu, kami juga akan menggunakan artifak untuk berinteraksi dengan pangkalan data PostgreSQL; kami akan memindahkannya melalui --packages.

Untuk fleksibiliti skrip, kami juga akan memasukkan sebagai parameter input nama pelayan mesej dan topik dari mana kami ingin menerima data.

Jadi, sudah tiba masanya untuk melancarkan dan menyemak kefungsian sistem:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Semuanya berjaya! Seperti yang anda lihat dalam gambar di bawah, semasa aplikasi sedang berjalan, hasil pengagregatan baharu dikeluarkan setiap 2 saat, kerana kami menetapkan selang batching kepada 2 saat apabila kami mencipta objek StreamingContext:

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Seterusnya, kami membuat pertanyaan ringkas kepada pangkalan data untuk menyemak kehadiran rekod dalam jadual transaction_flow:

Apache Kafka dan Pemprosesan Data Penstriman dengan Penstriman Spark

Kesimpulan

Artikel ini melihat contoh pemprosesan strim maklumat menggunakan Spark Streaming bersama dengan Apache Kafka dan PostgreSQL. Dengan pertumbuhan data daripada pelbagai sumber, sukar untuk menilai terlalu tinggi nilai praktikal Spark Streaming untuk mencipta aplikasi penstriman dan masa nyata.

Anda boleh mencari kod sumber penuh dalam repositori saya di GitHub.

Saya gembira untuk membincangkan artikel ini, saya mengharapkan komen anda, dan saya juga mengharapkan kritikan yang membina daripada semua pembaca yang prihatin.

Saya berharap kejayaan anda!

Ps. Pada mulanya ia dirancang untuk menggunakan pangkalan data PostgreSQL tempatan, tetapi memandangkan kecintaan saya terhadap AWS, saya memutuskan untuk memindahkan pangkalan data ke awan. Dalam artikel seterusnya mengenai topik ini, saya akan menunjukkan cara untuk melaksanakan keseluruhan sistem yang diterangkan di atas dalam AWS menggunakan AWS Kinesis dan AWS EMR. Ikuti berita!

Sumber: www.habr.com

Tambah komen