Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Halò, Habr! An-diugh togaidh sinn siostam a làimhsicheas sruthan teachdaireachd Apache Kafka a’ cleachdadh Spark Streaming agus sgrìobhaidh sinn na toraidhean giullachd gu stòr-dàta sgòthan AWS RDS.

Smaoinichidh sinn gu bheil institiud creideis sònraichte a’ cur uallach oirnn a bhith a’ giullachd ghnothaichean a tha a’ tighinn a-steach “air an iteig” thar a meuran gu lèir. Faodar seo a dhèanamh gus obrachadh a-mach gu sgiobalta suidheachadh airgead fosgailte airson an ionmhais, crìochan no toraidhean ionmhais airson gnothaichean, msaa.

Mar a chuireas tu a 'chùis seo an gnìomh gun a bhith a' cleachdadh geasan draoidheachd is draoidheachd - leugh fon ghearradh! Rach!

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark
(Stòr ìomhaigh)

Ro-ràdh

Gu dearbh, tha giullachd tòrr dàta ann an àm fìor a’ toirt seachad cothroman gu leòr airson a chleachdadh ann an siostaman an latha an-diugh. Is e aon de na cothlamadh as mòr-chòrdte airson seo an tandem de Apache Kafka agus Spark Streaming, far am bi Kafka a’ cruthachadh sruth de phasgan teachdaireachd a tha a ’tighinn a-steach, agus bidh Spark Streaming a’ pròiseasadh nam pacaidean sin aig àm sònraichte.

Gus fulangas locht an tagraidh a mheudachadh, cleachdaidh sinn puingean-seic. Leis an uidheamachd seo, nuair a dh’ fheumas an einnsean Spark Streaming dàta a chaidh air chall fhaighinn air ais, chan fheum e ach a dhol air ais chun àite-seic mu dheireadh agus àireamhachadh ath-thòiseachadh às an sin.

Ailtireachd an t-siostam leasaichte

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Co-phàirtean air an cleachdadh:

  • Apache Kafka na shiostam teachdaireachdan foillsichte-fo-sgrìobhadh sgaoilte. Freagarrach airson an dà chuid caitheamh teachdaireachd far-loidhne agus air-loidhne. Gus casg a chuir air call dàta, bidh teachdaireachdan Kafka air an stòradh air diosc agus air an ath-aithris taobh a-staigh a’ bhuidheann. Tha siostam Kafka air a thogail air mullach seirbheis sioncronaidh ZooKeeper;
  • Apache spark sruthadh - Co-phàirt spark airson a bhith a’ giullachd dàta sruthadh. Tha am modal Spark Streaming air a thogail a’ cleachdadh ailtireachd meanbh-batch, far a bheil an sruth dàta air a mhìneachadh mar sreath leantainneach de phacaidean dàta beaga. Bidh Spark Streaming a’ toirt dàta bho dhiofar thùsan agus ga chur còmhla ann am pasganan beaga. Bithear a’ cruthachadh phasganan ùra aig amannan cunbhalach. Aig toiseach gach eadar-ama, thèid pasgan ùr a chruthachadh, agus tha dàta sam bith a gheibhear tron ​​​​àm ​​sin air a ghabhail a-steach sa phacaid. Aig deireadh an eadar-ama, bidh fàs pacaid a 'stad. Tha meud an eadar-ama air a dhearbhadh le paramadair ris an canar an eadar-ama baidse;
  • Apache spark - a’ cothlamadh giollachd dàimheach le prògramadh gnìomh Spark. Tha dàta structaraichte a’ ciallachadh dàta aig a bheil sgeama, is e sin, aon sheata de raointean airson a h-uile clàr. Bidh Spark SQL a’ toirt taic do chuir a-steach bho ghrunn stòran dàta structaraichte agus, le taing don fhiosrachadh sgeama a tha ri fhaighinn, chan urrainn dha ach na raointean riatanach de chlàran fhaighinn air ais, agus bheir e cuideachd API DataFrame;
  • AWS RDS na stòr-dàta dàimheach stèidhichte air sgòthan, seirbheis lìn a tha a’ sìmpleachadh stèidheachadh, obrachadh agus sgèileadh, agus air a rianachd gu dìreach le Amazon.

A’ stàladh agus a’ ruith frithealaiche Kafka

Mus cleachd thu Kafka gu dìreach, feumaidh tu dèanamh cinnteach gu bheil Java agad, oir ... Tha JVM air a chleachdadh airson obair:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Cruthaichidh sinn cleachdaiche ùr airson obrachadh le Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

An uairsin, luchdaich sìos an sgaoileadh bho làrach-lìn oifigeil Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Unpack an tasglann a chaidh a luchdachadh sìos:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Tha an ath cheum roghainneil. Is e an fhìrinn nach leig na roghainnean bunaiteach leat na comasan uile aig Apache Kafka a chleachdadh gu h-iomlan. Mar eisimpleir, sguab às cuspair, roinn-seòrsa, buidheann ris am faodar teachdaireachdan fhoillseachadh. Gus seo atharrachadh, deasaich sinn am faidhle rèiteachaidh:

vim ~/kafka/config/server.properties

Cuir na leanas ri deireadh an fhaidhle:

delete.topic.enable = true

Mus tòisich thu air an fhrithealaiche Kafka, feumaidh tu am frithealaiche ZooKeeper a thòiseachadh; cleachdaidh sinn an sgriobt taice a thig an cois cuairteachadh Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Às deidh ZooKeeper tòiseachadh gu soirbheachail, cuir air bhog am frithealaiche Kafka ann an ceann-uidhe air leth:

bin/kafka-server-start.sh config/server.properties

Cruthaichidh sinn cuspair ùr ris an canar Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Dèan cinnteach gun deach cuspair leis an àireamh riatanach de sgaradh agus ath-riochdachadh a chruthachadh:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Nach caill sinn na h-amannan de bhith a’ dèanamh deuchainn air an riochdaire agus an neach-cleachdaidh airson a’ chuspair a chaidh a chruthachadh às ùr. Tha tuilleadh fiosrachaidh mu mar as urrainn dhut deuchainn a dhèanamh air cur is faighinn teachdaireachdan sgrìobhte anns na sgrìobhainnean oifigeil - Cuir cuid de theachdaireachdan. Uill, gluaisidh sinn air adhart gu bhith a’ sgrìobhadh riochdaire ann am Python a’ cleachdadh an KafkaProducer API.

Sgrìobhadair riochdaire

Ginidh an riochdaire dàta air thuaiream - 100 teachdaireachd gach diog. Le dàta air thuaiream tha sinn a’ ciallachadh faclair anns a bheil trì raointean:

  • Meur - ainm puing reic an ionaid creideis;
  • currency - airgead malairt;
  • Suim - suim malairt. Bidh an t-suim na àireamh adhartach ma tha e na cheannach airgead leis a’ Bhanca, agus àireamh àicheil ma tha e na reic.

Tha an còd airson an riochdaire a’ coimhead mar seo:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

An ath rud, a’ cleachdadh an dòigh cuir, bidh sinn a’ cur teachdaireachd chun an fhrithealaiche, chun chuspair a dh’ fheumas sinn, ann an cruth JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Nuair a bhios sinn a’ ruith an sgriobt, gheibh sinn na teachdaireachdan a leanas anns a’ chrìoch:

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Tha seo a 'ciallachadh gu bheil a h-uile càil ag obair mar a bha sinn ag iarraidh - bidh an riochdaire a' gineadh agus a 'cur teachdaireachdan chun chuspair a tha a dhìth oirnn.
Is e an ath cheum Spark a stàladh agus an sruth teachdaireachd seo a phròiseasadh.

Stàladh apache spark

Apache Spark na àrd-ùrlar coimpiutaireachd brabhsair uile-choitcheann agus àrd-choileanadh.

Bidh Spark a’ coileanadh nas fheàrr na buileachadh mòr-chòrdte den mhodal MapReduce agus aig an aon àm a’ toirt taic do raon nas fharsainge de sheòrsan coimpiutaireachd, a’ toirt a-steach ceistean eadar-ghnìomhach agus giullachd sruthan. Tha àite cudromach aig astar ann a bhith a’ giullachd mòran dàta, leis gur e astar a th’ ann a leigeas leat obrachadh gu eadar-ghnìomhach gun a bhith a’ caitheamh mionaidean no uairean a’ feitheamh. Is e aon de na neartan as motha aig Spark a tha ga dhèanamh cho luath an comas air àireamhachadh cuimhne a dhèanamh.

Tha am frèam seo sgrìobhte ann an Scala, mar sin feumaidh tu a stàladh an toiseach:

sudo apt-get install scala

Luchdaich a-nuas spark sgaoileadh bho làrach-lìn oifigeil. .

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Unpack an tasglann:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Cuir an t-slighe gu Spark ris an fhaidhle bash:

vim ~/.bashrc

Cuir na loidhnichean a leanas tron ​​​​neach-deasachaidh:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Ruith an àithne gu h-ìosal às deidh dhut atharrachaidhean a dhèanamh air bashrc:

source ~/.bashrc

A’ cleachdadh AWS PostgreSQL

Chan eil air fhàgail ach an stòr-dàta a chleachdadh anns an luchdaich sinn suas am fiosrachadh giullachd bho na sruthan. Airson seo cleachdaidh sinn seirbheis AWS RDS.

Rach gu consol AWS -> AWS RDS -> Stòr-dàta -> Cruthaich stòr-dàta:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Tagh PostgreSQL agus cliog air Next:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Air sgàth Tha an eisimpleir seo airson adhbharan foghlaim a-mhàin; cleachdaidh sinn frithealaiche an-asgaidh “aig a’ char as lugha” (Sreath an-asgaidh):
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

An ath rud, chuir sinn diog anns a’ bhloc Sreath an-asgaidh, agus às deidh sin gheibh sinn gu fèin-ghluasadach eisimpleir den chlas t2.micro - ged a tha e lag, tha e an-asgaidh agus gu math freagarrach airson ar gnìomh:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

An ath rud thig rudan glè chudromach: ainm eisimpleir an stòr-dàta, ainm a’ phrìomh neach-cleachdaidh agus am facal-faire aige. Ainmich an eisimpleir: myHabrTest, prìomh chleachdaiche: habr, facal-faire: 12345 agus cliog air an ath phutan:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Air an ath dhuilleig tha paramadairean le uallach airson ruigsinneachd ar frithealaiche stòr-dàta bhon taobh a-muigh (ruigsinneachd poblach) agus ruigsinneachd puirt:

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Cruthaichidh sinn suidheachadh ùr airson buidheann tèarainteachd VPC, a leigeas le ruigsinneachd bhon taobh a-muigh don t-seirbheisiche stòr-dàta againn tro phort 5432 (PostgreSQL).
Rachamaid gu consol AWS ann an uinneag brobhsair air leth gu deas-bhòrd VPC -> Buidhnean tèarainteachd -> Cruthaich roinn buidheann tèarainteachd:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Shuidhich sinn an t-ainm airson a’ bhuidheann Tèarainteachd - PostgreSQL, tuairisgeul, comharraich dè an VPC a bu chòir a bhith co-cheangailte ris a’ chuantal seo agus cliog air a’ phutan Cruthaich:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Lìon a-steach na riaghailtean Inbound airson port 5432 airson a’ bhuidheann ùr-chruthaichte, mar a chithear san dealbh gu h-ìosal. Chan urrainn dhut am port a shònrachadh le làimh, ach tagh PostgreSQL bhon liosta tuiteam-sìos Seòrsa.

Gu cruaidh, tha an luach ::/0 a’ ciallachadh gum bi trafaic a’ tighinn a-steach don t-seirbheisiche bho air feadh an t-saoghail, rud nach eil gu tur fìor, ach airson an eisimpleir a sgrùdadh, leigidh sinn leinn an dòigh-obrach seo a chleachdadh:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Bidh sinn a’ tilleadh gu duilleag a’ bhrobhsair, far a bheil “Configure advanced settings” againn fosgailte agus tagh ann an roinn buidhnean tèarainteachd VPC -> Tagh buidhnean tèarainteachd VPC a th’ ann -> PostgreSQL:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

An ath rud, anns na roghainnean Stòr-dàta -> Ainm an stòr-dàta -> suidhich an t-ainm - habrDB.

Faodaidh sinn na crìochan a tha air fhàgail fhàgail, ach a-mhàin cùl-taic a chuir dheth (ùine glèidhidh cùl-taic - 0 latha), sgrùdadh agus Lèirsinnean Coileanaidh, gu bunaiteach. Cliog air a 'phutan Cruthaich stòr-dàta:
Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Làimhseachadh snàithlean

Is e an ìre mu dheireadh leasachadh obair Spark, a làimhsicheas dàta ùr a’ tighinn bho Kafka a h-uile dà dhiog agus a chuireas a-steach an toradh a-steach don stòr-dàta.

Mar a chaidh a ràdh gu h-àrd, tha puingean-seic nam prìomh inneal ann an SparkStreaming a dh’ fheumar a rèiteachadh gus dèanamh cinnteach à fulangas sgàinidhean. Cleachdaidh sinn puingean-seic agus, ma dh’ fhailicheas am modh-obrach, cha leig am modal Spark Streaming air ais ach tilleadh chun phuing-seic mu dheireadh agus àireamhachadh a thòiseachadh bhuaithe gus an dàta a chaidh air chall fhaighinn air ais.

Faodar sgrùdadh-seic a chomasachadh le bhith a’ suidheachadh eòlaire air siostam faidhle earbsach a tha fulangach air lochdan (leithid HDFS, S3, msaa) anns am bi am fiosrachadh puing-seic air a stòradh. Tha seo air a dhèanamh a’ cleachdadh, mar eisimpleir:

streamingContext.checkpoint(checkpointDirectory)

Anns an eisimpleir againn, cleachdaidh sinn an dòigh-obrach a leanas, is e sin, ma tha checkpointDirectory ann, thèid an co-theacsa ath-chruthachadh bhon dàta puing-seic. Mura h-eil an eòlaire ann (i.e. air a chuir gu bàs airson a’ chiad uair), an uairsin tha functionToCreateContext air a ghairm gus co-theacs ùr a chruthachadh agus DStreams a rèiteachadh:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Bidh sinn a’ cruthachadh rud DirectStream gus ceangal ris a’ chuspair “transaction” a’ cleachdadh an dòigh createDirectStream de leabharlann KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

A’ parsadh dàta a’ tighinn a-steach ann an cruth JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

A’ cleachdadh Spark SQL, bidh sinn a’ dèanamh cruinneachadh sìmplidh agus a’ taisbeanadh an toradh sa chonsail:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

A’ faighinn teacsa na ceiste agus ga ruith tro Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Agus an uairsin bidh sinn a’ sàbhaladh an dàta cruinnichte ann an clàr ann an AWS RDS. Gus na toraidhean cruinneachaidh a shàbhaladh gu clàr stòr-dàta, cleachdaidh sinn an dòigh sgrìobhaidh airson an nì DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Beagan fhaclan mu bhith a’ stèidheachadh ceangal ri AWS RDS. Chruthaich sinn an neach-cleachdaidh agus am facal-faire air a shon aig a’ cheum “Deploying AWS PostgreSQL”. Bu chòir dhut Endpoint a chleachdadh mar url frithealaiche an stòr-dàta, a tha air a thaisbeanadh anns an roinn Ceangalachd & tèarainteachd:

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

Gus Spark agus Kafka a cheangal gu ceart, bu chòir dhut an obair a ruith tro smark-submit a’ cleachdadh an artifact spark-streaming-kafka-0-8_2.11. A bharrachd air an sin, cleachdaidh sinn artifact airson eadar-obrachadh le stòr-dàta PostgreSQL; gluaisidh sinn iad tro --packages.

Airson sùbailteachd an sgriobt, bheir sinn a-steach cuideachd mar pharaimearan cuir a-steach ainm frithealaiche na teachdaireachd agus an cuspair às a bheil sinn airson dàta fhaighinn.

Mar sin, tha an t-àm ann a chuir air bhog agus sgrùdadh a dhèanamh air gnìomhachd an t-siostaim:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Dh’ obraich a h-uile càil a-mach! Mar a chì thu san dealbh gu h-ìosal, fhad ‘s a tha an tagradh a’ ruith, tha toraidhean cruinneachaidh ùra air an toirt a-mach gach 2 diog, oir shuidhich sinn an ùine batching gu 2 dhiog nuair a chruthaich sinn an rud StreamingContext:

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

An uairsin, bidh sinn a’ dèanamh ceist shìmplidh don stòr-dàta gus sùil a thoirt air làthaireachd chlàran sa chlàr malairt_sruth:

Apache Kafka agus giullachd dàta sruthadh le sruthadh Spark

co-dhùnadh

Sheall an artaigil seo air eisimpleir de làimhseachadh sruth fiosrachaidh a’ cleachdadh Spark Streaming ann an co-bhonn ri Apache Kafka agus PostgreSQL. Le fàs dàta bho dhiofar thùsan, tha e duilich cus luach a thoirt air luach practaigeach Spark Streaming airson a bhith a’ cruthachadh sruthadh agus tagraidhean fìor-ùine.

Gheibh thu an còd stòr slàn anns an stòr-dàta agam aig GitHub.

Tha mi toilichte bruidhinn air an artaigil seo, tha mi a’ coimhead air adhart ri do bheachdan, agus tha mi an dòchas cuideachd càineadh cuideachail fhaighinn bhon a h-uile leughadair dàimheil.

Tha mi a 'guidhe gach soirbheachas dhut!

Salm. An toiseach bhathas an dùil stòr-dàta PostgreSQL ionadail a chleachdadh, ach le mo ghaol air AWS, chuir mi romham an stòr-dàta a ghluasad chun sgòth. Anns an ath artaigil air a ’chuspair seo, seallaidh mi mar a chuireas tu an siostam gu lèir a tha air a mhìneachadh gu h-àrd ann an AWS an gnìomh a’ cleachdadh AWS Kinesis agus AWS EMR. Lean an naidheachd!

Source: www.habr.com

Cuir beachd ann