Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Hello, Habr! Ngayon, bubuo kami ng system na magpoproseso ng mga stream ng mensahe ng Apache Kafka gamit ang Spark Streaming at isusulat ang mga resulta ng pagproseso sa cloud database ng AWS RDS.

Isipin natin na ang isang partikular na institusyon ng kredito ay nagtatakda sa atin ng gawain ng pagproseso ng mga papasok na transaksyon "on the fly" sa lahat ng sangay nito. Magagawa ito para sa layunin ng agarang pagkalkula ng isang bukas na posisyon ng pera para sa treasury, mga limitasyon o mga resulta sa pananalapi para sa mga transaksyon, atbp.

Paano ipatupad ang kasong ito nang hindi gumagamit ng magic at magic spells - basahin sa ilalim ng hiwa! Go!

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming
(Pinagmulan ng larawan)

Pagpapakilala

Siyempre, ang pagproseso ng malaking halaga ng data sa real time ay nagbibigay ng sapat na pagkakataon para magamit sa mga modernong system. Isa sa mga pinakasikat na kumbinasyon para dito ay ang tandem ng Apache Kafka at Spark Streaming, kung saan gumagawa ang Kafka ng stream ng mga papasok na packet ng mensahe, at pinoproseso ng Spark Streaming ang mga packet na ito sa isang partikular na agwat ng oras.

Para mapataas ang fault tolerance ng application, gagamit kami ng mga checkpoint. Sa mekanismong ito, kapag kailangan ng Spark Streaming engine na mabawi ang nawalang data, kailangan lang nitong bumalik sa huling checkpoint at ipagpatuloy ang mga kalkulasyon mula doon.

Arkitektura ng binuong sistema

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Mga bahaging ginamit:

  • Apache Kafka ay isang distributed publish-subscribe messaging system. Angkop para sa parehong offline at online na pagkonsumo ng mensahe. Upang maiwasan ang pagkawala ng data, ang mga mensahe ng Kafka ay iniimbak sa disk at ginagaya sa loob ng cluster. Ang sistema ng Kafka ay binuo sa ibabaw ng serbisyo ng pag-synchronize ng ZooKeeper;
  • Apache Spark Streaming - Spark component para sa pagproseso ng streaming data. Ang module ng Spark Streaming ay binuo gamit ang isang micro-batch architecture, kung saan ang stream ng data ay binibigyang-kahulugan bilang isang tuluy-tuloy na pagkakasunud-sunod ng maliliit na data packet. Ang Spark Streaming ay kumukuha ng data mula sa iba't ibang pinagmulan at pinagsasama ito sa maliliit na pakete. Ang mga bagong pakete ay nilikha sa mga regular na pagitan. Sa simula ng bawat agwat ng oras, isang bagong packet ang nilikha, at anumang data na natanggap sa pagitan ng agwat na iyon ay kasama sa packet. Sa pagtatapos ng agwat, hihinto ang paglaki ng packet. Ang laki ng pagitan ay tinutukoy ng isang parameter na tinatawag na batch interval;
  • Apache Spark SQL - pinagsasama ang relational processing sa Spark functional programming. Ang structured data ay nangangahulugan ng data na may schema, iyon ay, isang set ng field para sa lahat ng record. Sinusuportahan ng Spark SQL ang pag-input mula sa iba't ibang structured data source at, salamat sa pagkakaroon ng impormasyon ng schema, maaari lamang nitong makuha ang mga kinakailangang field ng record nang mahusay, at nagbibigay din ng mga DataFrame API;
  • AWS RDS ay isang medyo murang cloud-based na relational database, serbisyo sa web na pinapasimple ang setup, operasyon at scaling, at direktang pinangangasiwaan ng Amazon.

Pag-install at pagpapatakbo ng Kafka server

Bago gamitin ang Kafka nang direkta, kailangan mong tiyakin na mayroon kang Java, dahil... Ginagamit ang JVM para sa trabaho:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Gumawa tayo ng bagong user para magtrabaho kasama si Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Susunod, i-download ang pamamahagi mula sa opisyal na website ng Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

I-unpack ang na-download na archive:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Ang susunod na hakbang ay opsyonal. Ang katotohanan ay ang mga default na setting ay hindi nagpapahintulot sa iyo na ganap na gamitin ang lahat ng mga tampok ng Apache Kafka. Halimbawa, tanggalin ang isang paksa, kategorya, pangkat kung saan maaaring mai-publish ang mga mensahe. Para baguhin ito, i-edit natin ang configuration file:

vim ~/kafka/config/server.properties

Idagdag ang sumusunod sa dulo ng file:

delete.topic.enable = true

Bago simulan ang Kafka server, kailangan mong simulan ang ZooKeeper server; gagamitin namin ang auxiliary script na kasama ng Kafka distribution:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Matapos matagumpay na magsimula ang ZooKeeper, ilunsad ang Kafka server sa isang hiwalay na terminal:

bin/kafka-server-start.sh config/server.properties

Gumawa tayo ng bagong paksa na tinatawag na Transaksyon:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Siguraduhin natin na ang isang paksa na may kinakailangang bilang ng mga partisyon at pagtitiklop ay nalikha:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Palampasin natin ang mga sandali ng pagsubok sa producer at consumer para sa bagong likhang paksa. Higit pang mga detalye tungkol sa kung paano mo masusubok ang pagpapadala at pagtanggap ng mga mensahe ay nakasulat sa opisyal na dokumentasyon - Magpadala ng ilang mensahe. Buweno, nagpapatuloy kami sa pagsulat ng isang producer sa Python gamit ang KafkaProducer API.

Pagsusulat ng producer

Ang producer ay bubuo ng random na data - 100 mga mensahe bawat segundo. Sa pamamagitan ng random na data, ang ibig naming sabihin ay isang diksyunaryo na binubuo ng tatlong field:

  • Sangay β€” pangalan ng punto ng pagbebenta ng institusyon ng kredito;
  • Pera - pera ng transaksyon;
  • dami β€” halaga ng transaksyon. Ang halaga ay magiging isang positibong numero kung ito ay isang pagbili ng pera ng Bangko, at isang negatibong numero kung ito ay isang pagbebenta.

Ang code para sa producer ay ganito:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Susunod, gamit ang paraan ng pagpapadala, nagpapadala kami ng mensahe sa server, sa paksang kailangan namin, sa format na JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Kapag pinapatakbo ang script, natatanggap namin ang mga sumusunod na mensahe sa terminal:

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Nangangahulugan ito na gumagana ang lahat ayon sa gusto namin - ang producer ay bumubuo at nagpapadala ng mga mensahe sa paksang kailangan namin.
Ang susunod na hakbang ay i-install ang Spark at iproseso ang stream ng mensaheng ito.

Pag-install ng Apache Spark

Apache Spark ay isang unibersal at high-performance cluster computing platform.

Mas mahusay ang performance ng Spark kaysa sa mga sikat na pagpapatupad ng modelong MapReduce habang sinusuportahan ang mas malawak na hanay ng mga uri ng pagkalkula, kabilang ang mga interactive na query at pagpoproseso ng stream. Ang bilis ay gumaganap ng isang mahalagang papel kapag nagpoproseso ng malaking halaga ng data, dahil ito ay ang bilis na nagbibigay-daan sa iyo upang gumana nang interactive nang hindi gumugugol ng ilang minuto o oras sa paghihintay. Isa sa pinakamalaking lakas ng Spark na nagpapabilis dito ay ang kakayahang magsagawa ng mga kalkulasyon sa memorya.

Ang balangkas na ito ay nakasulat sa Scala, kaya kailangan mo muna itong i-install:

sudo apt-get install scala

I-download ang pamamahagi ng Spark mula sa opisyal na website:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

I-unpack ang archive:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Idagdag ang path sa Spark sa bash file:

vim ~/.bashrc

Idagdag ang mga sumusunod na linya sa pamamagitan ng editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Patakbuhin ang utos sa ibaba pagkatapos gumawa ng mga pagbabago sa bashrc:

source ~/.bashrc

Pag-deploy ng AWS PostgreSQL

Ang natitira na lang ay i-deploy ang database kung saan namin ia-upload ang naprosesong impormasyon mula sa mga stream. Para dito gagamitin namin ang serbisyo ng AWS RDS.

Pumunta sa AWS console -> AWS RDS -> Mga Database -> Lumikha ng database:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Piliin ang PostgreSQL at i-click ang Susunod:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

kasi Ang halimbawang ito ay para sa mga layuning pang-edukasyon lamang; gagamit kami ng isang libreng server "sa pinakamababa" (Libreng Tier):
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Susunod, naglalagay kami ng tsek sa bloke ng Libreng Tier, at pagkatapos nito ay awtomatiko kaming bibigyan ng isang halimbawa ng klase ng t2.micro - kahit na mahina, ito ay libre at medyo angkop para sa aming gawain:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Susunod na darating ang mga napakahalagang bagay: ang pangalan ng halimbawa ng database, ang pangalan ng master user at ang kanyang password. Pangalanan natin ang instance: myHabrTest, master user: habr, password: habr12345 at i-click ang Next button:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Sa susunod na pahina mayroong mga parameter na responsable para sa accessibility ng aming database server mula sa labas (Public accessibility) at port availability:

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Gumawa tayo ng bagong setting para sa pangkat ng seguridad ng VPC, na magbibigay-daan sa external na access sa aming database server sa pamamagitan ng port 5432 (PostgreSQL).
Pumunta tayo sa AWS console sa isang hiwalay na browser window sa VPC Dashboard -> Security Groups -> Lumikha ng seksyon ng security group:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Itinakda namin ang pangalan para sa pangkat ng Seguridad - PostgreSQL, isang paglalarawan, ipahiwatig kung aling VPC ang pangkat na ito ay dapat iugnay at i-click ang pindutang Lumikha:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Punan ang Inbound na mga panuntunan para sa port 5432 para sa bagong likhang grupo, tulad ng ipinapakita sa larawan sa ibaba. Hindi mo maaaring tukuyin ang port nang manu-mano, ngunit piliin ang PostgreSQL mula sa Uri ng drop-down na listahan.

Sa mahigpit na pagsasalita, ang halaga ::/0 ay nangangahulugang ang pagkakaroon ng papasok na trapiko sa server mula sa buong mundo, na hindi ganap na totoo, ngunit upang suriin ang halimbawa, hayaan natin ang ating sarili na gamitin ang diskarteng ito:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Bumalik kami sa page ng browser, kung saan nakabukas ang "I-configure ang mga advanced na setting" at pumili sa seksyon ng mga pangkat ng seguridad ng VPC -> Piliin ang mga umiiral nang pangkat ng seguridad ng VPC -> PostgreSQL:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Susunod, sa mga pagpipilian sa Database -> Pangalan ng database -> itakda ang pangalan - habrDB.

Maaari naming iwanan ang natitirang mga parameter, maliban sa hindi pagpapagana ng backup (panahon ng pagpapanatili ng backup - 0 araw), pagsubaybay at Performance Insights, bilang default. Mag-click sa pindutan Lumikha ng database:
Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Tagapangasiwa ng thread

Ang huling yugto ay ang pagbuo ng isang trabaho sa Spark, na magpoproseso ng bagong data na nagmumula sa Kafka bawat dalawang segundo at ipasok ang resulta sa database.

Gaya ng nabanggit sa itaas, ang mga checkpoint ay isang pangunahing mekanismo sa SparkStreaming na dapat i-configure upang matiyak ang pagpapahintulot sa pagkakamali. Gagamit kami ng mga checkpoint at, kung nabigo ang pamamaraan, kakailanganin lamang ng Spark Streaming na module na bumalik sa huling checkpoint at ipagpatuloy ang mga kalkulasyon mula dito upang mabawi ang nawalang data.

Maaaring paganahin ang checkpointing sa pamamagitan ng pagtatakda ng direktoryo sa isang fault-tolerant, maaasahang file system (tulad ng HDFS, S3, atbp.) kung saan iimbak ang impormasyon ng checkpoint. Ginagawa ito gamit ang, halimbawa:

streamingContext.checkpoint(checkpointDirectory)

Sa aming halimbawa, gagamitin namin ang sumusunod na diskarte, ibig sabihin, kung umiiral ang checkpointDirectory, muling gagawa ang konteksto mula sa data ng checkpoint. Kung ang direktoryo ay hindi umiiral (ibig sabihin, naisakatuparan sa unang pagkakataon), ang functionToCreateContext ay tinatawag upang lumikha ng bagong konteksto at i-configure ang DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Lumilikha kami ng DirectStream object upang kumonekta sa paksang "transaksyon" gamit ang paraan ng createDirectStream ng KafkaUtils library:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Pag-parse ng papasok na data sa JSON format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Gamit ang Spark SQL, gumagawa kami ng isang simpleng pagpapangkat at ipinapakita ang resulta sa console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Pagkuha ng query text at pagpapatakbo nito sa pamamagitan ng Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

At pagkatapos ay ise-save namin ang nagresultang pinagsama-samang data sa isang talahanayan sa AWS RDS. Upang i-save ang mga resulta ng pagsasama-sama sa isang talahanayan ng database, gagamitin namin ang paraan ng pagsulat ng object ng DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Ilang salita tungkol sa pag-set up ng koneksyon sa AWS RDS. Ginawa namin ang user at password para dito sa hakbang na "Pag-deploy ng AWS PostgreSQL". Dapat mong gamitin ang Endpoint bilang url ng server ng database, na ipinapakita sa seksyong Pagkakakonekta at seguridad:

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Upang maikonekta nang tama ang Spark at Kafka, dapat mong patakbuhin ang trabaho sa pamamagitan ng smark-submit gamit ang artifact spark-streaming-kafka-0-8_2.11. Bukod pa rito, gagamit din kami ng artifact para sa pakikipag-ugnayan sa database ng PostgreSQL; ililipat namin ang mga ito sa pamamagitan ng --packages.

Para sa flexibility ng script, isasama rin namin bilang mga parameter ng input ang pangalan ng server ng mensahe at ang paksa kung saan gusto naming makatanggap ng data.

Kaya, oras na para ilunsad at suriin ang functionality ng system:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Lahat ay nagtagumpay! Tulad ng makikita mo sa larawan sa ibaba, habang tumatakbo ang application, ang mga bagong resulta ng pagsasama-sama ay output tuwing 2 segundo, dahil itinakda namin ang pagitan ng batching sa 2 segundo noong nilikha namin ang object ng StreamingContext:

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Susunod, gumawa kami ng isang simpleng query sa database upang suriin ang pagkakaroon ng mga tala sa talahanayan transaction_flow:

Apache Kafka at Streaming Data Processing gamit ang Spark Streaming

Konklusyon

Ang artikulong ito ay tumingin sa isang halimbawa ng pagpoproseso ng stream ng impormasyon gamit ang Spark Streaming kasabay ng Apache Kafka at PostgreSQL. Sa paglaki ng data mula sa iba't ibang pinagmumulan, mahirap i-overestimate ang praktikal na halaga ng Spark Streaming para sa paglikha ng streaming at real-time na mga application.

Mahahanap mo ang buong source code sa aking repository sa GitHub.

Natutuwa akong talakayin ang artikulong ito, inaasahan ko ang iyong mga komento, at umaasa rin ako para sa nakabubuo na pagpuna mula sa lahat ng nagmamalasakit na mambabasa.

Nais kong tagumpay ka!

Ps. Noong una ay binalak na gumamit ng lokal na database ng PostgreSQL, ngunit dahil sa pagmamahal ko sa AWS, nagpasya akong ilipat ang database sa cloud. Sa susunod na artikulo sa paksang ito, ipapakita ko kung paano ipatupad ang buong sistemang inilarawan sa itaas sa AWS gamit ang AWS Kinesis at AWS EMR. Sundan ang balita!

Pinagmulan: www.habr.com

Magdagdag ng komento