Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Kia ora, Habr! I tenei ra ka hangaia e matou he punaha hei tukatuka i nga awa karere Apache Kafka ma te whakamahi i te Spark Streaming me te tuhi i nga hua tukatuka ki te AWS RDS cloud database.

Me whakaaro tatou ka tukuna e tetahi umanga nama te mahi ki te tukatuka i nga whakawhitinga taumai "i runga i te rere" puta noa i ona manga katoa. Ka taea tenei mo te tatau wawe i tetahi tuunga moni tuwhera mo te putea, nga rohe, nga hua putea ranei mo nga whakawhitinga, aha atu.

Me pehea te whakatinana i tenei keehi me te kore whakamahi i nga mahi makutu me nga mahi makutu - panui i raro i te tapahi! Haere!

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming
(Putake whakaahua)

Whakataki

Ko te tikanga, ko te tukatuka i te nui o nga raraunga i roto i te waa tuuturu e whakarato ana i nga waahi maha mo te whakamahi i nga punaha hou. Ko tetahi o nga huinga tino rongonui mo tenei ko te taarua o Apache Kafka me Spark Streaming, kei reira a Kafka e hanga ana i te awa o nga paanui karere taumai, me te Spark Streaming e whakahaere ana i enei paatete i te waa kua whakaritea.

Hei whakanui ake i te he o te tono, ka whakamahia e matou nga tohu tirotiro. Ma tenei tikanga, ina hiahia te miihini Spark Streaming ki te whakaora i nga raraunga ngaro, me hoki ki te waahi tirotiro whakamutunga ka timata ano i nga tatauranga mai i reira.

Te hoahoanga o te punaha kua whakawhanakehia

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Wae i whakamahia:

  • Apache Kafka he punaha korero tuku-ohauru kua tohatohahia. He pai mo te kohi karere tuimotu me te ipurangi. Hei aukati i te ngaronga o nga raraunga, ka penapena nga karere Kafka ki runga i te kopae, ka tukuna ki roto i te tautau. Ka hangaia te punaha Kafka ki runga ake o te ratonga tukutahi ZooKeeper;
  • Rererenga Apache Spark - Wae korakora mo te tukatuka i nga raraunga rerenga. Ka hangaia te kōwae Spark Streaming ma te whakamahi i te hoahoanga moroiti-popu, kei reira te whakamaoritanga o te awa raraunga hei raupapa haere tonu o nga paatete raraunga iti. Ka tangohia e Spark Streaming nga raraunga mai i nga punaa rereke ka whakakotahi ki nga kohinga iti. Ka hangaia nga kohinga hou i nga wa katoa. I te timatanga o ia waahi wa, ka hangaia he paatete hou, a ko nga raraunga ka tae mai i taua wa ka uru ki roto i te paatete. I te mutunga o te waahi, ka mutu te tipu o te kete. Ko te rahi o te waahi ka whakatauhia e tetahi tawhā e kiia nei ko te waahi puranga;
  • Apache Spark SQL - ka whakakotahi i te tukatuka hononga me te Spark mahi hotaka. Ko te tikanga o te raraunga hanganga ko te raraunga he aronuinga, ara, he huinga mara mo nga rekoata katoa. Kei te tautoko a Spark SQL i te whakaurunga mai i nga momo puna raraunga kua hangai, a, na te wateatanga o nga korero aronuinga, ka taea e ia te whakahoki tika i nga waahi rekoata e hiahiatia ana, me te whakarato hoki i nga API DataFrame;
  • AWS RDS he putunga korero hononga-a-kapua he iti te utu, he ratonga tukutuku e whakangwari ana i te tatūnga, te mahi me te tauine, me te whakahaere tika e Amazon.

Te tāuta me te whakahaere i te tūmau Kafka

I mua i te whakamahi tika i a Kafka, me mohio koe kei a koe a Java, na te mea ... Ka whakamahia a JVM mo te mahi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Me hanga he kaiwhakamahi hou hei mahi tahi me Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

I muri mai, tango i te tohatoha mai i te paetukutuku mana Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Wewetehia te pūranga kua tikiakehia:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Ko te taahiraa e whai ake nei he mea whiriwhiri. Ko te mea ko nga tautuhinga taunoa kaore e taea e koe te whakamahi i nga ahuatanga katoa o Apache Kafka. Hei tauira, mukua he kaupapa, he waahanga, he roopu ka taea te whakaputa karere. Hei huri i tenei, me whakatika te konae whirihoranga:

vim ~/kafka/config/server.properties

Tāpirihia te mea e whai ake nei ki te mutunga o te kōnae:

delete.topic.enable = true

I mua i te tiimata i te tūmau Kafka, me timata koe i te tūmau ZooKeeper; ka whakamahia e matou te tuhinga awhina ka tae mai me te tohatoha Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

I muri i te tiimatanga a ZooKeeper, whakarewahia te Kafka tūmau ki tetahi tauranga motuhake:

bin/kafka-server-start.sh config/server.properties

Me hanga he kaupapa hou e kiia nei ko te Whakawhitiwhiti:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Me mohio kua hangahia he kaupapa me te maha o nga wehewehenga me te tukurua:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Kia ngaro tatou i nga wa o te whakamatautau i te kaihanga me te kaihoko mo te kaupapa hou i hangaia. Ko etahi atu korero mo te pehea e taea ai e koe te whakamatautau i te tuku me te whiwhi karere kua tuhia ki roto i nga tuhinga whaimana - Tukuna etahi karere. Ana, ka neke taatau ki te tuhi i tetahi kaihanga ki Python ma te whakamahi i te KafkaProducer API.

Kaihanga tuhi

Ka whakaputahia e te kaihanga nga raraunga matapōkere - 100 nga karere ia hekona. Ma te raraunga matapōkere ko te tikanga he papakupu e toru nga mara:

  • manga - te ingoa o te waahi hoko a te umanga nama;
  • Moni — moni tauwhitinga;
  • Te nui — moni tauwhitinga. Ko te moni he tau pai mena he hoko moni na te Peeke, he nama kino mena he hoko.

Ko te waehere mo te kaihanga he penei:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

I muri mai, ma te whakamahi i te tikanga tuku, ka tukuna he karere ki te tūmau, ki te kaupapa e hiahiatia ana e matou, i te whakatakotoranga JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

I te wa e whakahaere ana i te tuhinga, ka whiwhi matou i nga karere e whai ake nei i te tauranga:

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Ko te tikanga ka mahi nga mea katoa i ta matou i hiahia ai - ko te kaihanga ka whakaputa me te tuku karere ki te kaupapa e hiahiatia ana e matou.
Ko te mahi e whai ake nei ko te whakauru i te Spark me te tukatuka i tenei awa karere.

Tāuta Apache Spark

Apache Spark he papaa rorohiko huinga-nui me te tino mahi.

He pai ake te mahi a Spark i nga whakatinanatanga rongonui o te tauira MapReduce i te wa e tautoko ana i te whānuitanga o nga momo tatauranga, tae atu ki nga patai tauwhitiwhiti me te tukatuka awa. He waahi nui te tere i te wa e tukatuka ana i nga raraunga nui, na te mea ko te tere e taea ai e koe te mahi whakawhitiwhiti me te kore e whakapau meneti, haora ranei e tatari ana. Ko tetahi o nga kaha nui a Spark e tere ana ko tona kaha ki te mahi i nga tatauranga mahara-a-roto.

Kua tuhia tenei anga ki te Scala, na me whakauru tuatahi koe:

sudo apt-get install scala

Tangohia te tohatoha Spark mai i te paetukutuku mana:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Wewetehia te pūranga:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tāpirihia te ara ki te Spark ki te kōnae bash:

vim ~/.bashrc

Tāpirihia ngā rārangi e whai ake nei mā te ētita:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Whakahaerehia te whakahau i raro nei i muri i te whakarereketanga ki te bashrc:

source ~/.bashrc

Te tuku AWS PostgreSQL

Ko te mea e toe ana ko te whakatakoto i te papaa raraunga ki roto ka tukuna e matou nga korero kua tukatukahia mai i nga awa. Mo tenei ka whakamahia e matou te ratonga AWS RDS.

Haere ki te papatohu AWS -> AWS RDS -> Raraunga Raraunga -> Waihangahia te papanga:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Tīpakohia PostgreSQL ka paato Panuku:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

No te mea Ko tenei tauira mo nga kaupapa matauranga anake; ka whakamahia e matou he tūmau kore utu "i te iti rawa" (Tier Free):
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

I muri mai, ka makahia he tohu ki roto i te poraka Tier Free, a muri iho ka tukuna aunoatia he tauira o te akomanga t2.micro - ahakoa he ngoikore, he kore utu, he pai hoki mo ta maatau mahi:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Ka puta mai nga mea tino nui: ko te ingoa o te tauira raraunga, te ingoa o te kaiwhakamahi matua me tana kupuhipa. Me whakaingoatia te tauira: myHabrTest, rangatira kaiwhakamahi: habr, kupuhipa: habr12345 ka paato i te paatene Panuku:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Kei te wharangi e whai ake nei he tawhā te kawenga mo te urunga o to tatou tūmau pātengi raraunga mai i waho (Whakauru mo te iwi) me te waatea o te tauranga:

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Me hanga he tautuhinga hou mo te roopu haumaru VPC, ka taea te uru atu ki waho ki to tatou tūmau pātengi raraunga mā te tauranga 5432 (PostgreSQL).
Me haere ki te papatohu AWS i roto i te matapihi tirotiro motuhake ki te Papatohu VPC -> Roopu Haumaru -> Waihangahia te waahanga roopu haumaru:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Ka tautuhia e matou te ingoa mo te roopu Haumarutanga - PostgreSQL, he whakaahuatanga, tohu ko wai te VPC me hono tenei roopu ka paato i te paatene Waihanga:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Whakakiia nga ture Inbound mo te tauranga 5432 mo te roopu hou i hangaia, penei i te pikitia i raro nei. Kaore e taea e koe te tautuhi i te tauranga a-ringa, engari tohua te PostgreSQL mai i te rarangi taka-iho Momo.

Ma te tino korero, ko te uara ::/0 te tikanga o te waatea o nga waka taumai ki te kaimau mai i nga wa katoa o te ao, kaore i te tino pono, engari ki te tātari i te tauira, me tuku taatau ki te whakamahi i tenei huarahi:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Ka hoki matou ki te wharangi tirotiro, kei reira "Whirihorahia nga tautuhinga matatau" tuwhera ka kowhiri i te waahanga roopu haumaru VPC -> Whiriwhiria nga roopu haumaru VPC inaianei -> PostgreSQL:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

I muri mai, i roto i nga whiringa Raraunga Raraunga -> Ingoa Raraunga Raraunga -> tautuhi i te ingoa - habrDB.

Ka taea e tatou te waiho i nga tawhā e toe ana, haunga te whakakore i te taapiri (waa pupuri putunga - 0 ra), te aro turuki me nga Whakaaturanga Mahi, ma te taunoa. Patohia te paatene Waihangahia te pātengi raraunga:
Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Kaihautu miro

Ko te waahanga whakamutunga ko te whakawhanaketanga o te mahi Spark, ka tukatuka i nga raraunga hou ka puta mai i Kafka ia rua hēkona, ka whakauru i te hua ki te paataka raraunga.

Ka rite ki te korero i runga ake nei, ko nga waahi arowhai he tikanga matua i roto i te SparkStreaming me whirihora hei whakarite i te pai o te he. Ka whakamahi matou i nga waahi tirotiro, a, ki te rahua te tikanga, ka hoki noa te kōwae Spark Streaming ki te waahi arowhai whakamutunga ka hoki ano i nga tatauranga mai i a ia ki te whakaora i nga raraunga ngaro.

Ka taea te whakaahei te tohutaki ma te whakatakoto i te raarangi ki runga i te punaha konae-a-hara, pono (penei i te HDFS, S3, me etahi atu) ka penapena nga korero tohutaki. Ka mahia tenei ma te whakamahi, hei tauira:

streamingContext.checkpoint(checkpointDirectory)

I roto i ta maatau tauira, ka whakamahia e matou te huarahi e whai ake nei, ara, mena kei te noho tonu te tirotiroDirectory, katahi ka hangaia ano te horopaki mai i nga raraunga tohutaki. Mena karekau te whaiaronga (arā, ka mahia mo te wa tuatahi), ka karangahia te functionToCreateContext ki te hanga horopaki hou me te whirihora i nga DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ka hangaia e matou he ahanoa DirectStream hei hono atu ki te kaupapa "whakawhitiwhiti" ma te whakamahi i te tikanga createDirectStream o te whare pukapuka KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Werohia nga raraunga taumai ki te whakatakotoranga JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ma te whakamahi i te Spark SQL, ka mahia e matou he whakarōpū ngawari me te whakaatu i te hua ki te papatohu:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Te tiki i te kupu patai me te whakahaere ma te Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Na ka tiakina e matou nga raraunga whakahiato ki roto i te ripanga i AWS RDS. Hei tiaki i nga hua whakahiato ki te ripanga raraunga, ka whakamahia te tikanga tuhi o te ahanoa DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

He kupu torutoru mo te whakarite hononga ki AWS RDS. I hanga e matou te kaiwhakamahi me te kupuhipa mo taua mea i te taahiraa "Whakatuwhera AWS PostgreSQL". Me whakamahi koe i te Endpoint hei url tūmau pātengi raraunga, ka whakaatuhia ki te wāhanga Hononga me te haumarutanga:

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

Hei hono tika i a Spark me Kafka, me whakahaere e koe te mahi ma te smark-tuku ma te whakamahi i te toi. spark-streaming-kafka-0-8_2.11. I tua atu, ka whakamahia ano e matou he taonga toi mo te taunekeneke me te papanga PostgreSQL; ka whakawhitia e matou ma te --packages.

Mo te ngawari o te tuhinga, ka whakauruhia ano e matou hei tawhā whakauru te ingoa o te tūmau karere me te kaupapa e hiahia ana matou ki te whiwhi raraunga.

Na, kua tae ki te wa ki te whakarewa me te tirotiro i nga mahi a te punaha:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

I pai nga mea katoa! Kei te kite koe i te pikitia i raro nei, i te wa e rere ana te tono, ka puta nga hua whakahiato hou ia 2 hēkona, na te mea ka whakatauhia e matou te waahi puranga ki te 2 hēkona i te wa i hanga ai e matou te ahanoa StreamingContext:

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

I muri mai, ka mahia e matou he patai ngawari ki te papaaarangi ki te tirotiro i te waahi o nga rekoata kei te ripanga tauwhitinga_rere:

Apache Kafka me te Tukatuka Raraunga Raraunga me te Spark Streaming

mutunga

I titiro tenei tuhinga ki tetahi tauira o te tukatuka rerenga korero ma te whakamahi i te Spark Streaming i te taha o Apache Kafka me PostgreSQL. Na te tipu o nga raraunga mai i nga momo puna, he uaua ki te whakanui i te uara whaihua o Spark Streaming mo te hanga tono roma me te wa-tūturu.

Ka kitea e koe te waehere puna katoa i roto i taku putunga i GitHub.

E harikoa ana ahau ki te matapaki i tenei tuhinga, kei te tumanako ahau ki o korero, me te tumanako hoki mo nga whakaheinga whai hua mai i nga kaipānui manaaki katoa.

E hiahia ana ahau kia angitu koe!

Sala. I te tuatahi i whakamaheretia ki te whakamahi i te putunga korero a PostgreSQL o te rohe, engari na taku aroha ki te AWS, ka whakatau ahau ki te neke i te papaarangi ki te kapua. I roto i te tuhinga e whai ake nei mo tenei kaupapa, ka whakaatu ahau me pehea te whakatinana i te punaha katoa i whakaahuahia i runga ake nei i roto i te AWS ma te whakamahi i te AWS Kinesis me te AWS EMR. Whaia nga korero!

Source: will.com

Tāpiri i te kōrero