Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Halo, Habr! Dinten ieu kami bakal ngawangun sistem anu bakal ngolah aliran pesen Apache Kafka nganggo Spark Streaming sareng nyerat hasil pamrosésan kana database awan AWS RDS.

Hayu urang ngabayangkeun yén lembaga kiridit tangtu nangtukeun kami tugas ngolah transaksi asup "dina laleur" sakuliah sakabéh cabang na. Ieu tiasa dilakukeun pikeun tujuan gancang ngitung posisi mata uang kabuka pikeun perbendaharaan, wates atanapi hasil kauangan pikeun transaksi, jsb.

Kumaha nerapkeun hal ieu tanpa pamakéan magic jeung mantra magic - baca handapeun cut! indit!

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming
(Sumber gambar)

perkenalan

Tangtosna, ngolah sajumlah ageung data sacara real waktos nyayogikeun kasempetan anu ageung pikeun dianggo dina sistem modern. Salah sahiji kombinasi anu paling populér pikeun ieu nyaéta tandem Apache Kafka sareng Spark Streaming, dimana Kafka nyiptakeun aliran pakét pesen anu asup, sareng Spark Streaming ngolah pakét ieu dina interval waktu anu ditangtukeun.

Pikeun ningkatkeun kasabaran kasalahan tina aplikasi, kami bakal nganggo checkpoints. Kalayan mékanisme ieu, nalika mesin Spark Streaming kedah pulih data anu leungit, éta ngan ukur kedah uih deui ka tempat pamariksaan anu terakhir sareng neraskeun itungan ti dinya.

Arsitéktur sistem dimekarkeun

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Komponén dipaké:

  • Apache Kafka nyaéta sistem olahtalatah nyebarkeun-langganan anu disebarkeun. Cocog pikeun konsumsi pesen offline sareng online. Pikeun nyegah leungitna data, pesen Kafka disimpen dina disk sareng diulang dina kluster. Sistem Kafka diwangun dina luhureun layanan sinkronisasi ZooKeeper;
  • Apache Spark Streaming - Komponén Spark pikeun ngolah data streaming. Modul Spark Streaming diwangun ngagunakeun arsitéktur micro-batch, dimana aliran data diinterpretasi salaku runtuyan kontinyu tina pakét data leutik. Spark Streaming nyandak data tina sumber anu béda-béda sareng ngagabungkeun kana bungkusan leutik. bungkusan anyar dijieun dina interval nu sarua. Dina awal unggal interval waktu, hiji pakét anyar dijieun, sarta sagala data narima salila interval ieu kaasup kana pakét. Dina ahir interval, tumuwuhna pakét eureun. Ukuran interval ditangtukeun ku parameter disebut interval bets;
  • Apache Spark SQL - ngagabungkeun processing relational kalawan Spark programming hanca. Data terstruktur hartina data anu boga skéma, nyaéta, hiji set widang pikeun sakabéh rékaman. Spark SQL ngarojong input ti rupa-rupa sumber data terstruktur na, hatur nuhun kana kasadiaan informasi skéma, éta éfisién bisa meunangkeun ukur widang diperlukeun rékaman, sarta nyadiakeun API DataFrame;
  • AWS RDS nyaéta database relational dumasar-awan rélatif murah, jasa web nu simplifies setelan, operasi sarta skala, sarta diadministrasi langsung ku Amazon.

Masang sareng ngajalankeun server Kafka

Sateuacan nganggo Kafka langsung, anjeun kedah mastikeun yén anjeun gaduh Java, sabab ... JVM dianggo pikeun damel:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Hayu urang ngadamel pangguna énggal pikeun damel sareng Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Salajengna, unduh distribusi tina situs wéb Apache Kafka resmi:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Buka bungkus arsip anu diunduh:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Lengkah saterusna nyaeta pilihan. Kanyataan yén setélan standar teu ngidinan Anjeun pikeun pinuh ngagunakeun sagala kamampuhan tina Apache Kafka. Contona, pupus topik, kategori, grup nu pesen bisa diterbitkeun. Pikeun ngarobah ieu, hayu urang ngédit file konfigurasi:

vim ~/kafka/config/server.properties

Tambahkeun di handap ieu kana tungtung file:

delete.topic.enable = true

Sateuacan ngamimitian server Kafka, anjeun kedah ngamimitian server ZooKeeper; kami bakal nganggo skrip bantu anu aya dina distribusi Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Saatos ZooKeeper parantos suksés, jalankeun server Kafka dina terminal anu misah:

bin/kafka-server-start.sh config/server.properties

Hayu urang nyieun topik anyar disebut Transaksi:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Pastikeun yén topik kalayan jumlah partisi sareng réplikasi anu diperyogikeun parantos didamel:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Hayu urang sono momen nguji produsén sareng konsumen pikeun topik anu nembé diciptakeun. Rincian langkung seueur ngeunaan kumaha anjeun tiasa nguji ngirim sareng nampi pesen ditulis dina dokuméntasi resmi - Kirim sababaraha pesen. Nya, urang teraskeun nyerat produser dina Python nganggo API KafkaProducer.

Tulisan produser

Produser bakal ngahasilkeun data acak - 100 pesen unggal detik. Ku data acak kami hartosna kamus anu diwangun ku tilu widang:

  • cabang - ngaran titik diobral lembaga kiridit;
  • duit nu dipake di sahiji nagara - mata uang transaksi;
  • jumlah - jumlah transaksi. Jumlah bakal jadi angka positif lamun eta mangrupakeun beuli mata uang ku Bank, sarta angka négatip lamun diobral a.

Kodeu pikeun produsén sapertos kieu:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Salajengna, nganggo metode kirim, urang ngirim pesen ka server, kana topik anu urang peryogikeun, dina format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Nalika ngajalankeun skrip, kami nampi pesen di handap ieu dina terminal:

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Ieu ngandung harti yén sagalana jalan sakumaha urang hayang - produser ngahasilkeun sarta ngirimkeun pesen ka topik urang butuh.
Lengkah saterusna nyaéta masang Spark sareng ngolah aliran pesen ieu.

Masang Apache Spark

Apache bitu nyaéta platform komputasi klaster universal sareng berkinerja tinggi.

Spark ngalaksanakeun langkung saé tibatan palaksanaan populér tina modél MapReduce bari ngadukung rupa-rupa jinis komputasi, kalebet patarosan interaktif sareng pamrosésan aliran. Kagancangan maénkeun peran anu penting nalika ngolah data anu ageung, sabab éta kacepetan anu ngamungkinkeun anjeun damel sacara interaktif tanpa nyéépkeun menit atanapi jam ngantosan. Salah sahiji kaunggulan pangbadagna Spark anu ngajadikeun éta gancang nyaéta kamampuhna pikeun ngalakukeun itungan dina mémori.

Kerangka ieu ditulis dina Scala, janten anjeun kedah pasang heula:

sudo apt-get install scala

Unduh distribusi Spark tina situs wéb resmi:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Buka bungkusan arsip:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tambahkeun jalur ka Spark kana file bash:

vim ~/.bashrc

Tambahkeun garis handap ngaliwatan éditor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Jalankeun paréntah di handap saatos parobihan kana bashrc:

source ~/.bashrc

Nerapkeun AWS PostgreSQL

Sadaya anu tetep nyaéta nyebarkeun pangkalan data dimana kami bakal unggah inpormasi anu diolah tina aliran. Pikeun ieu kami bakal nganggo jasa AWS RDS.

Buka konsol AWS -> AWS RDS -> Database -> Jieun database:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Pilih PostgreSQL teras klik Salajengna:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Sabab Conto ieu kanggo tujuan pendidikan wungkul; kami bakal nganggo server gratis "minimal" (Tier Gratis):
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Salajengna, urang nempatkeun keletik dina blok Free Tier, sarta sanggeus éta urang bakal otomatis ditawarkeun hiji conto kelas t2.micro - sanajan lemah, éta bébas tur cukup cocog pikeun tugas urang:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Salajengna sumping hal anu penting pisan: nami conto database, nami pangguna master sareng kecap konci na. Hayu urang ngaranan conto: myHabrTest, master pamaké: warta, kecap akses: habr12345 sareng klik tombol Next:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Dina kaca hareup aya parameter jawab aksés ka server database urang ti luar (aksesibilitas publik) jeung kasadiaan port:

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Hayu urang nyieun setelan anyar pikeun grup kaamanan VPC, nu bakal ngidinan aksés éksternal ka server database urang via port 5432 (PostgreSQL).
Hayu urang ka konsol AWS dina jandela browser anu misah ka VPC Dashboard -> Grup Kaamanan -> Jieun bagian grup kaamanan:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Kami nyetél nami pikeun grup Kaamanan - PostgreSQL, pedaran, nunjukkeun VPC mana grup ieu kedah dikaitkeun sareng klik tombol Jieun:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Eusian aturan Inbound pikeun port 5432 pikeun grup karek dijieun, ditémbongkeun saperti dina gambar di handap ieu. Anjeun teu tiasa netepkeun port sacara manual, tapi pilih PostgreSQL tina daptar turun-handap Type.

Tegesna, nilai ::/0 hartina kasadiaan lalulintas asup ka server ti sakuliah dunya, nu canonically teu sagemblengna bener, tapi pikeun nganalisis conto, hayu urang ngidinan diri ngagunakeun pendekatan ieu:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Kami uih deui ka halaman browser, dimana kami gaduh "Konpigurasikeun setélan canggih" kabuka sareng pilih dina bagian grup kaamanan VPC -> Pilih grup kaamanan VPC anu tos aya -> PostgreSQL:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Salajengna, dina pilihan Database -> Ngaran database -> setel nami - habrDB.

Urang bisa ninggalkeun parameter sésana, iwal nganonaktipkeun cadangan (periode ingetan cadangan - 0 poé), monitoring sarta Performance Insights, sacara standar. Pencét kana tombol Jieun database:
Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Panangan benang

Tahap ahir bakal ngembangkeun pakasaban Spark, anu bakal ngolah data anyar datang ti Kafka unggal dua detik sarta ngasupkeun hasilna kana database.

Sakumaha anu kacatet di luhur, checkpoints mangrupikeun mékanisme inti dina SparkStreaming anu kedah dikonpigurasi pikeun mastikeun kasabaran kasalahan. Kami bakal nganggo checkpoints sareng, upami prosedurna gagal, modul Spark Streaming ngan ukur kedah uih deui ka tempat pamariksaan anu terakhir sareng neraskeun itungan ti dinya pikeun pulih data anu leungit.

Checkpointing tiasa diaktipkeun ku netepkeun diréktori dina sistem file anu toleran, dipercaya (sapertos HDFS, S3, jsb) dimana inpormasi titik pamariksaan bakal disimpen. Hal ieu dilakukeun ngagunakeun, contona:

streamingContext.checkpoint(checkpointDirectory)

Dina conto urang, urang bakal ngagunakeun pendekatan di handap ieu, nyaéta, lamun checkpointDirectory aya, lajeng konteks bakal recreated tina data checkpoint. Upami diréktorina teu aya (nyaéta dieksekusi pikeun kahiji kalina), maka functionToCreateContext disebut pikeun nyiptakeun kontéks énggal sareng ngonpigurasikeun DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Kami nyiptakeun objek DirectStream pikeun nyambung ka topik "transaksi" nganggo metode createDirectStream tina perpustakaan KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing data asup dina format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ngagunakeun Spark SQL, urang ngalakukeun grup basajan tur mintonkeun hasil dina konsol nu:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Meunangkeun téks query tur ngajalankeun eta ngaliwatan Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Teras we simpen data aggregated hasilna kana tabel di AWS RDS. Pikeun nyimpen hasil aggregation kana tabel database, urang bakal ngagunakeun métode nulis objék DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Sababaraha kecap ngeunaan nyetel sambungan ka AWS RDS. Kami nyiptakeun pangguna sareng kecap konci pikeun éta dina lengkah "Deploying AWS PostgreSQL". Anjeun kedah nganggo Endpoint salaku url pangladén database, anu dipidangkeun dina bagian Konektipitas & kaamanan:

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Pikeun leres nyambungkeun Spark sareng Kafka, anjeun kedah ngajalankeun padamelan ngalangkungan smark-submit nganggo artefak. spark-streaming-kafka-0-8_2.11. Salaku tambahan, urang ogé bakal ngagunakeun artefak pikeun berinteraksi sareng database PostgreSQL; urang bakal mindahkeun aranjeunna via --packages.

Pikeun kalenturan naskah, kami ogé bakal ngalebetkeun salaku parameter input nami server pesen sareng topik anu kami hoyong nampi data.

Janten, waktosna pikeun ngaluncurkeun sareng pariksa pungsionalitas sistem:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Sagalana digawé kaluar! Sakumaha anjeun tiasa tingali dina gambar di handap ieu, nalika aplikasi dijalankeun, hasil agrégasi anyar kaluaran unggal 2 detik, sabab kami nyetél interval batching ka 2 detik nalika kami nyiptakeun objék StreamingContext:

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

Salajengna, urang ngadamel query basajan ka database pikeun pariksa ayana rékaman dina tabél transaction_flow:

Apache Kafka sareng Ngolah Data Streaming sareng Spark Streaming

kacindekan

Tulisan ieu ningali conto pamrosésan aliran inpormasi nganggo Spark Streaming sareng Apache Kafka sareng PostgreSQL. Kalayan tumuwuhna data ti sagala rupa sumber, hese overestimate nilai praktis Spark Streaming pikeun nyieun streaming sarta aplikasi real-time.

Anjeun tiasa mendakan kode sumber lengkep dina gudang kuring di GitHub.

Kami senang ngabahas artikel ieu, Kuring ngarepkeun komentar anjeun, sarta kuring miharep ogé kritik konstruktif ti sakabeh pamiarsa paduli.

Kuring miharep anjeun sukses!

PS. Mimitina ieu rencanana ngagunakeun database PostgreSQL lokal, tapi tinangtu cinta kuring pikeun AWS, Kuring mutuskeun pikeun mindahkeun database ka awan. Dina artikel salajengna ngeunaan topik ieu, kuring bakal nunjukkeun kumaha nerapkeun sadayana sistem anu dijelaskeun di luhur dina AWS nganggo AWS Kinesis sareng AWS EMR. Turutan warta!

sumber: www.habr.com

Tambahkeun komentar