Apache Kafka na nhazi data gụgharia na Spark Streaming

Ndewo, Habr! Taa, anyị ga-ewu usoro nke ga-ahazi iyi ozi Apache Kafka site na iji Spark Streaming wee dee nsonaazụ nhazi na nchekwa data igwe ojii AWS RDS.

Ka anyị were ya na otu ụlọ ọrụ kredit na-edobe anyị ọrụ ịhazi azụmahịa na-abata "na ofufe" n'ofe alaka ya niile. Enwere ike ime nke a maka ebumnuche nke ịgbakọ ọnọdụ ego mepere emepe maka ụlọ akụ, oke ma ọ bụ nsonaazụ ego maka azụmahịa, wdg.

Otu esi emejuputa ikpe a na-enweghị iji anwansi na anwansi - gụọ n'okpuru ịkpụ! Gaba!

Apache Kafka na nhazi data gụgharia na Spark Streaming
(isi iyi onyonyo)

Okwu Mmalite

N'ezie, nhazi nnukwu data n'ime oge na-enye ohere zuru oke maka iji na usoro ọgbara ọhụrụ. Otu n'ime njikọ kachasị ewu ewu maka nke a bụ tandem nke Apache Kafka na Spark Streaming, ebe Kafka na-emepụta iyi nke ngwugwu ozi na-abata, na Spark Streaming na-eme ngwugwu ndị a n'otu oge.

Ka iwelie ntachi obi nke ngwa ahụ, anyị ga-eji ebe nlele. Site na usoro a, mgbe injin Spark Streaming kwesịrị iweghachite data furu efu, ọ ga-achọ ịlaghachi na ebe nlele ikpeazụ wee maliteghachi mgbako site n'ebe ahụ.

Architecture nke usoro mepere emepe

Apache Kafka na nhazi data gụgharia na Spark Streaming

Ngwa ndị eji:

  • Apache Kafka bụ usoro mgbasa ozi na-ebipụta ndenye aha na-ekesa. Kwesịrị ekwesị maka ma ozi ịntanetị na ntanetị. Iji gbochie mfu data, echekwara ozi Kafka na diski ma megharịa ya n'ime ụyọkọ ahụ. A na-ewu usoro Kafka n'elu ọrụ mmekọrịta ZooKeeper;
  • Apache Spark Streaming - Akụkụ ọkụ maka nhazi data nkwanye. A na-ewu modul Spark Streaming site na iji ihe owuwu micro-batch, ebe a na-atụgharị iyi data dị ka usoro na-aga n'ihu nke obere ngwugwu data. Spark Streaming na-ewere data sitere na ebe dị iche iche wee jikọta ya na obere ngwugwu. A na-emepụta ngwugwu ọhụrụ n'oge ọ bụla. Na mmalite nke oge ọ bụla, a na-emepụta ngwugwu ọhụrụ, na data ọ bụla enwetara n'oge oge ahụ gụnyere na ngwugwu ahụ. Na njedebe nke etiti oge, uto ngwugwu na-akwụsị. A na-ekpebi oke nke etiti oge site na oke nke a na-akpọ nkeji oge;
  • Apache Spark SQL - na-ejikọta nhazi mmekọrịta yana mmemme na-arụ ọrụ Spark. Data ahaziri pụtara data nwere schema, ya bụ, otu nhazi ubi maka ndekọ niile. Spark SQL na-akwado ntinye sitere na isi mmalite data ahaziri ahazi yana, n'ihi nnweta ozi schema, ọ nwere ike weghachite nke ọma naanị mpaghara ndekọ achọrọ, ma na-enyekwa DataFrame API;
  • AWS RDS bụ nchekwa data mmekọrịta dabere na igwe ojii dị ọnụ ala, ọrụ webụ nke na-eme ka ntọlite, arụ ọrụ na nlegharị anya dị mfe, nke Amazon na-elekọta ya ozugbo.

Ịwụnye ma na-agba ọsọ nkesa Kafka

Tupu iji Kafka ozugbo, ịkwesịrị ijide n'aka na ị nwere Java, n'ihi na ... A na-eji JVM arụ ọrụ:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ka anyị mepụta onye ọrụ ọhụrụ iji Kafka rụọ ọrụ:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Na-esote, budata nkesa site na webụsaịtị Apache Kafka gọọmentị:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Bupụ ebe nchekwa ebudatara:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Nzọụkwụ ọzọ bụ nhọrọ. Nke bụ eziokwu bụ na ntọala ndabara anaghị ekwe ka ị jiri ike niile nke Apache Kafka mee ihe nke ọma. Dịka ọmụmaatụ, hichapụ isiokwu, otu, otu nke enwere ike ibipụta ozi. Ka anyị gbanwee nke a, ka anyị dezie faịlụ nhazi:

vim ~/kafka/config/server.properties

Tinye ihe ndị a na njedebe nke faịlụ:

delete.topic.enable = true

Tupu ịmalite sava Kafka, ịkwesịrị ịmalite sava ZooKeeper; anyị ga-eji edemede inyeaka na-abịa na nkesa Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Mgbe ZooKeeper amalitela nke ọma, malite ihe nkesa Kafka na ọdụ dị iche:

bin/kafka-server-start.sh config/server.properties

Ka anyị mepụta isiokwu ọhụrụ akpọrọ Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ka anyị gbaa mbọ hụ na emepụtara isiokwu nwere ọnụọgụ nkebi na mmụgharị achọrọ:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka na nhazi data gụgharia na Spark Streaming

Ka anyị chefuo oge ịnwale onye nrụpụta na onye ahịa maka isiokwu emepụtara ọhụrụ. Edere nkọwa ndị ọzọ gbasara otu ị ga-esi nwalee izipu na ịnata ozi n'ime akwụkwọ gọọmentị - Zipụ ụfọdụ ozi. Ọfọn, anyị na-aga n'ihu na-ede onye mmepụta na Python iji KafkaProducer API.

Onye na-emepụta ihe na-ede

Onye nrụpụta ga-ewepụta data enweghị usoro - ozi 100 kwa nkeji. Site na data enweghị usoro anyị pụtara akwụkwọ ọkọwa okwu nwere mpaghara atọ:

  • alaka - aha nke ụlọ ọrụ kredit nke ire ere;
  • Ego - ego azụmahịa;
  • ego - ego azụmahịa. Ọnụ ego ahụ ga-abụ ọnụọgụ dị mma ma ọ bụrụ na ọ bụ ịzụrụ ego site na Bank, na ọnụ ọgụgụ na-adịghị mma ma ọ bụrụ na ọ bụ ire ere.

Koodu maka onye nrụpụta dị ka nke a:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Na-esote, n'iji usoro izipu, anyị na-eziga ozi na ihe nkesa, na isiokwu anyị chọrọ, na usoro JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Mgbe anyị na-agba ọsọ edemede, anyị na-enweta ozi ndị a na ọnụ:

Apache Kafka na nhazi data gụgharia na Spark Streaming

Nke a pụtara na ihe niile na-arụ ọrụ dịka anyị chọrọ - onye na-emepụta ihe na-emepụta ma na-eziga ozi na isiokwu anyị chọrọ.
Nzọụkwụ ọzọ bụ ịwụnye Spark ma hazie iyi ozi a.

Ịwụnye Apache Spark

Apache Spark bụ ikpo okwu ụyọkọ kọmpụta zuru ụwa ọnụ na nke dị elu.

Spark na-arụ ọrụ nke ọma karịa mmejuputa iwu-ewu nke ụdị MapReduce mgbe ọ na-akwado ụdị mgbako dị iche iche, gụnyere ajụjụ mkparịta ụka na nhazi iyi. Ọsọ na-arụ ọrụ dị mkpa mgbe ị na-ahazi data buru ibu, ebe ọ bụ ọsọ na-enye gị ohere ịmekọrịta mmekọrịta na-ejighi nkeji ma ọ bụ awa echere. Otu n'ime nnukwu ike nke Spark na-eme ka ọ dị ngwa ngwa bụ ikike ya ịme mgbakọ n'ime ebe nchekwa.

Edere usoro a na Scala, yabụ ị ga-ebu ụzọ tinye ya:

sudo apt-get install scala

Budata nkesa Spark site na webụsaịtị gọọmentị:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Mepee ebe nchekwa ahụ:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tinye ụzọ na Spark na faịlụ bash:

vim ~/.bashrc

Tinye ahịrị ndị a site na onye ndezi:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Gbaa iwu n'okpuru mgbe ịmechara mgbanwe na bashrc:

source ~/.bashrc

Na-ebuga AWS PostgreSQL

Naanị ihe fọdụrụ bụ ibunye nchekwa data nke anyị ga-ebugote ozi emeziri site na iyi. Maka nke a anyị ga-eji ọrụ AWS RDS.

Gaa na njikwa AWS -> AWS RDS -> Ebe nchekwa data -> Mepụta nchekwa data:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Họrọ PostgreSQL wee pịa Ọzọ:
Apache Kafka na nhazi data gụgharia na Spark Streaming

N'ihi na Ihe atụ a bụ naanị maka ebumnuche mmụta; anyị ga-eji sava efu “opekata mpe” (Tier efu):
Apache Kafka na nhazi data gụgharia na Spark Streaming

Ọzọ, anyị na-etinye akara na ngọngọ Free Tier, mgbe nke ahụ gasịrị, a ga-enye anyị ihe atụ nke klas t2.micro na-akpaghị aka - ọ bụ ezie na ọ dị ike, ọ bụ n'efu ma dabara adaba maka ọrụ anyị:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Na-esote ihe ndị dị oke mkpa: aha nke nchekwa data atụ, aha onye ọrụ na paswọọdụ ya. Ka anyị kpọọ ihe atụ: myHabrTest, onye ọrụ ukwu: hab, okwuntughe: nha 12345 wee pịa bọtịnụ na-esote:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Na ibe na-esote, e nwere paramita maka ịnweta sava nchekwa data anyị site na mpụga (nweta ọha) yana nnweta ọdụ ụgbọ mmiri:

Apache Kafka na nhazi data gụgharia na Spark Streaming

Ka anyị mepụta ntọala ọhụrụ maka otu nchekwa VPC, nke ga-enye ohere ịnweta mpụga na sava nchekwa data anyị site na ọdụ ụgbọ mmiri 5432 (PostgreSQL).
Ka anyị gaa na console AWS na windo ihe nchọgharị dị iche na VPC Dashboard -> Otu nchekwa -> Mepụta ngalaba nchekwa:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Anyị edobere aha maka otu nchekwa - PostgreSQL, nkọwa, gosi VPC nke otu a kwesịrị ijikọ wee pịa bọtịnụ Mepụta:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Dejupụta iwu Inbound maka ọdụ ụgbọ mmiri 5432 maka otu emepụtara ọhụrụ, dị ka egosiri na foto dị n'okpuru. Ịnweghị ike iji aka gị kọwaa ọdụ ụgbọ mmiri, mana họrọ PostgreSQL site na listi mgbada ụdị.

N'ikwu ya n'ụzọ doro anya, uru ::/0 pụtara nnweta okporo ụzọ na-abata na ihe nkesa si n'ụwa nile, nke na-abụghị eziokwu kpamkpam, mana iji nyochaa ihe atụ, ka anyị kwe ka anyị jiri usoro a:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Anyị na-alaghachi na ibe ihe nchọgharị, ebe anyị nwere "Hazie ntọala dị elu" mepee wee họrọ na ngalaba nchekwa VPC -> Họrọ otu nchekwa VPC dị -> PostgreSQL:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Na-esote, na nhọrọ nchekwa data -> Aha nchekwa data -> tọọ aha - habrDB.

Anyị nwere ike ịhapụ paramita ndị ọzọ, ewezuga iwepu nkwado ndabere na mpaghara (oge nchekwa ndabere - ụbọchị 0), nleba anya na nleba anya arụmọrụ, na ndabara. Pịa na bọtịnụ Mepụta nchekwa data:
Apache Kafka na nhazi data gụgharia na Spark Streaming

Onye njikwa eriri

Oge ikpeazụ ga-abụ mmepe nke ọrụ Spark, nke ga-edozi data ọhụrụ na-abịa site na Kafka kwa sekọnd abụọ wee tinye nsonaazụ na nchekwa data.

Dịka ekwuru n'elu, ebe nlele bụ isi usoro na SparkStreaming nke a ga-ahazirịrị iji hụ na nnabata mmejọ. Anyị ga-eji ebe nlele na, ọ bụrụ na usoro ahụ ada ada, Spark Streaming module ga-achọ ịlaghachi na ebe nlele ikpeazụ wee maliteghachi mgbakọ na ya iji nwetaghachi data furu efu.

Enwere ike ịme nlele nlele site na ịtọ ndekọ na sistemụ faịlụ na-anabata mmejọ, nke a pụrụ ịdabere na ya (dịka HDFS, S3, wdg) nke ga-echekwa ozi ebe nlele. A na-eme nke a, dịka ọmụmaatụ:

streamingContext.checkpoint(checkpointDirectory)

N'ihe atụ anyị, anyị ga-eji usoro a, ya bụ, ọ bụrụ na checkpointDirectory dị, mgbe ahụ, a ga-emegharị ihe ndị gbara ya gburugburu site na data nlele. Ọ bụrụ na ndekọ ahụ adịghị (ya bụ, e gburu ya na nke mbụ), mgbe ahụ, a na-akpọ ọrụToCreateContext ka ịmepụta ọnọdụ ọhụrụ ma hazie DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Anyị na-emepụta ihe DirectStream iji jikọọ na isiokwu "azụmahịa" site na iji usoro createDirectStream nke ọba akwụkwọ KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Na-enyocha data mbata n'ụdị JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Iji Spark SQL, anyị na-eme nchịkọta dị mfe ma gosipụta nsonaazụ na njikwa:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Inweta ederede ajụjụ wee na-eme ya site na Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ma mgbe ahụ, anyị na-echekwa data achịkọtara n'ime tebụl na AWS RDS. Ka ịchekwaa nsonaazụ nchịkọta na tebụl nchekwa data, anyị ga-eji usoro ederede nke ihe DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Okwu ole na ole gbasara ịtọlite ​​njikọ na AWS RDS. Anyị mepụtara onye ọrụ na paswọọdụ maka ya na nzọụkwụ "Ikesa AWS PostgreSQL". Ịkwesịrị iji Endpoint dị ka url nkesa nchekwa data, nke egosiri na ngalaba Njikọ & nchekwa:

Apache Kafka na nhazi data gụgharia na Spark Streaming

Iji jikọọ Spark na Kafka n'ụzọ ziri ezi, ị ga-arụ ọrụ ahụ site na smark-submit site na iji artifact. spark-gụgharia-kafka-0-8_2.11. Na mgbakwunye, anyị ga-ejikwa ihe arịa maka imekọrịta na nchekwa data PostgreSQL; anyị ga-ebufe ha site na --ngwugwu.

Maka mgbanwe nke edemede ahụ, anyị ga-agụnyekwa dị ka ntinye ntinye aha nke ihe nkesa ozi na isiokwu nke anyị chọrọ ịnweta data.

Yabụ, ọ bụ oge ịmalite ma lelee ọrụ sistemụ ahụ:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ihe niile mere! Dị ka ị na-ahụ na foto dị n'okpuru ebe a, ka ngwa ahụ na-agba ọsọ, a na-emepụta nsonaazụ nchịkọta ọhụrụ kwa 2 sekọnd ọ bụla, n'ihi na anyị na-edozi nkeji oge na 2 sekọnd mgbe anyị mepụtara ihe StreamingContext:

Apache Kafka na nhazi data gụgharia na Spark Streaming

Ọzọ, anyị na-eme ajụjụ dị mfe na nchekwa data iji lelee ọnụnọ nke ndekọ na tebụl azụmahịa_flow:

Apache Kafka na nhazi data gụgharia na Spark Streaming

nkwubi

Edemede a lere anya n'ihe atụ nke nhazi mgbasa ozi site na iji Spark Streaming yana njikọ Apache Kafka na PostgreSQL. Site na uto nke data sitere na isi mmalite dị iche iche, ọ na-esiri ike ịkọwa uru bara uru nke Spark Streaming maka ịmepụta mgbasa ozi na ngwa ngwa.

Ị nwere ike ịchọta koodu iyi zuru ezu na ebe nchekwa m na GitHub.

Enwere m obi ụtọ ịkọrọ akụkọ a, ana m atụ anya azịza gị, ana m atụkwa anya maka nkatọ na-ewuli elu sitere n'aka ndị na-agụ akwụkwọ niile na-ahụ n'anya.

Achọrọ m gị ịga nke ọma!

Ọma Na mbụ e mere atụmatụ iji nchekwa data PostgreSQL dị na mpaghara, ma nyere m ịhụnanya maka AWS, ekpebiri m ịkwaga nchekwa data na igwe ojii. N'isiokwu na-esonụ banyere isiokwu a, m ga-egosi otu esi emejuputa usoro dum a kọwara n'elu na AWS site na iji AWS Kinesis na AWS EMR. Soro akụkọ!

isi: www.habr.com

Tinye a comment