Halò, Habr! An-diugh togaidh sinn siostam a lĂ imhsicheas sruthan teachdaireachd Apache Kafka aâ cleachdadh Spark Streaming agus sgrĂŹobhaidh sinn na toraidhean giullachd gu stòr-dĂ ta sgòthan AWS RDS.
Smaoinichidh sinn gu bheil institiud creideis sònraichte aâ cur uallach oirnn a bhith aâ giullachd ghnothaichean a tha aâ tighinn a-steach âair an iteigâ thar a meuran gu lèir. Faodar seo a dhèanamh gus obrachadh a-mach gu sgiobalta suidheachadh airgead fosgailte airson an ionmhais, crĂŹochan no toraidhean ionmhais airson gnothaichean, msaa.
Mar a chuireas tu a 'chĂšis seo an gnĂŹomh gun a bhith a' cleachdadh geasan draoidheachd is draoidheachd - leugh fon ghearradh! Rach!

Ro-rĂ dh
Gu dearbh, tha giullachd tòrr dĂ ta ann an Ă m fĂŹor aâ toirt seachad cothroman gu leòr airson a chleachdadh ann an siostaman an latha an-diugh. Is e aon de na cothlamadh as mòr-chòrdte airson seo an tandem de Apache Kafka agus Spark Streaming, far am bi Kafka aâ cruthachadh sruth de phasgan teachdaireachd a tha a âtighinn a-steach, agus bidh Spark Streaming aâ pròiseasadh nam pacaidean sin aig Ă m sònraichte.
Gus fulangas locht an tagraidh a mheudachadh, cleachdaidh sinn puingean-seic. Leis an uidheamachd seo, nuair a dhâ fheumas an einnsean Spark Streaming dĂ ta a chaidh air chall fhaighinn air ais, chan fheum e ach a dhol air ais chun Ă ite-seic mu dheireadh agus Ă ireamhachadh ath-thòiseachadh Ă s an sin.
Ailtireachd an t-siostam leasaichte

Co-phĂ irtean air an cleachdadh:
- na shiostam teachdaireachdan foillsichte-fo-sgrĂŹobhadh sgaoilte. Freagarrach airson an dĂ chuid caitheamh teachdaireachd far-loidhne agus air-loidhne. Gus casg a chuir air call dĂ ta, bidh teachdaireachdan Kafka air an stòradh air diosc agus air an ath-aithris taobh a-staigh aâ bhuidheann. Tha siostam Kafka air a thogail air mullach seirbheis sioncronaidh ZooKeeper;
- - Co-phĂ irt spark airson a bhith aâ giullachd dĂ ta sruthadh. Tha am modal Spark Streaming air a thogail aâ cleachdadh ailtireachd meanbh-batch, far a bheil an sruth dĂ ta air a mhĂŹneachadh mar sreath leantainneach de phacaidean dĂ ta beaga. Bidh Spark Streaming aâ toirt dĂ ta bho dhiofar thĂšsan agus ga chur còmhla ann am pasganan beaga. Bithear aâ cruthachadh phasganan Ăšra aig amannan cunbhalach. Aig toiseach gach eadar-ama, thèid pasgan Ăšr a chruthachadh, agus tha dĂ ta sam bith a gheibhear tron ââââĂ m ââsin air a ghabhail a-steach sa phacaid. Aig deireadh an eadar-ama, bidh fĂ s pacaid a 'stad. Tha meud an eadar-ama air a dhearbhadh le paramadair ris an canar an eadar-ama baidse;
- - aâ cothlamadh giollachd dĂ imheach le prògramadh gnĂŹomh Spark. Tha dĂ ta structaraichte aâ ciallachadh dĂ ta aig a bheil sgeama, is e sin, aon sheata de raointean airson a h-uile clĂ r. Bidh Spark SQL aâ toirt taic do chuir a-steach bho ghrunn stòran dĂ ta structaraichte agus, le taing don fhiosrachadh sgeama a tha ri fhaighinn, chan urrainn dha ach na raointean riatanach de chlĂ ran fhaighinn air ais, agus bheir e cuideachd API DataFrame;
- na stòr-dĂ ta dĂ imheach stèidhichte air sgòthan, seirbheis lĂŹn a tha aâ sĂŹmpleachadh stèidheachadh, obrachadh agus sgèileadh, agus air a rianachd gu dĂŹreach le Amazon.
Aâ stĂ ladh agus aâ ruith frithealaiche Kafka
Mus cleachd thu Kafka gu dÏreach, feumaidh tu dèanamh cinnteach gu bheil Java agad, oir ... Tha JVM air a chleachdadh airson obair:
sudo apt-get update
sudo apt-get install default-jre
java -version
Cruthaichidh sinn cleachdaiche Ăšr airson obrachadh le Kafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
An uairsin, luchdaich sĂŹos an sgaoileadh bho lĂ rach-lĂŹn oifigeil Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"Unpack an tasglann a chaidh a luchdachadh sĂŹos:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Tha an ath cheum roghainneil. Is e an fhÏrinn nach leig na roghainnean bunaiteach leat na comasan uile aig Apache Kafka a chleachdadh gu h-iomlan. Mar eisimpleir, sguab às cuspair, roinn-seòrsa, buidheann ris am faodar teachdaireachdan fhoillseachadh. Gus seo atharrachadh, deasaich sinn am faidhle rèiteachaidh:
vim ~/kafka/config/server.propertiesCuir na leanas ri deireadh an fhaidhle:
delete.topic.enable = trueMus tòisich thu air an fhrithealaiche Kafka, feumaidh tu am frithealaiche ZooKeeper a thòiseachadh; cleachdaidh sinn an sgriobt taice a thig an cois cuairteachadh Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Ăs deidh ZooKeeper tòiseachadh gu soirbheachail, cuir air bhog am frithealaiche Kafka ann an ceann-uidhe air leth:
bin/kafka-server-start.sh config/server.propertiesCruthaichidh sinn cuspair Ăšr ris an canar Transaction:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transactionDèan cinnteach gun deach cuspair leis an à ireamh riatanach de sgaradh agus ath-riochdachadh a chruthachadh:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 
Nach caill sinn na h-amannan de bhith aâ dèanamh deuchainn air an riochdaire agus an neach-cleachdaidh airson aâ chuspair a chaidh a chruthachadh Ă s Ăšr. Tha tuilleadh fiosrachaidh mu mar as urrainn dhut deuchainn a dhèanamh air cur is faighinn teachdaireachdan sgrĂŹobhte anns na sgrĂŹobhainnean oifigeil - . Uill, gluaisidh sinn air adhart gu bhith aâ sgrĂŹobhadh riochdaire ann am Python aâ cleachdadh an KafkaProducer API.
SgrĂŹobhadair riochdaire
Ginidh an riochdaire dĂ ta air thuaiream - 100 teachdaireachd gach diog. Le dĂ ta air thuaiream tha sinn aâ ciallachadh faclair anns a bheil trĂŹ raointean:
- Meur - ainm puing reic an ionaid creideis;
- currency - airgead malairt;
- Suim - suim malairt. Bidh an t-suim na Ă ireamh adhartach ma tha e na cheannach airgead leis aâ Bhanca, agus Ă ireamh Ă icheil ma tha e na reic.
Tha an còd airson an riochdaire aâ coimhead mar seo:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
An ath rud, aâ cleachdadh an dòigh cuir, bidh sinn aâ cur teachdaireachd chun an fhrithealaiche, chun chuspair a dhâ fheumas sinn, ann an cruth JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Nuair a bhios sinn aâ ruith an sgriobt, gheibh sinn na teachdaireachdan a leanas anns aâ chrĂŹoch:

Tha seo a 'ciallachadh gu bheil a h-uile cĂ il ag obair mar a bha sinn ag iarraidh - bidh an riochdaire a' gineadh agus a 'cur teachdaireachdan chun chuspair a tha a dhĂŹth oirnn.
Is e an ath cheum Spark a stà ladh agus an sruth teachdaireachd seo a phròiseasadh.
StĂ ladh apache spark
Apache Spark na Ă rd-Ăšrlar coimpiutaireachd brabhsair uile-choitcheann agus Ă rd-choileanadh.
Bidh Spark aâ coileanadh nas fheĂ rr na buileachadh mòr-chòrdte den mhodal MapReduce agus aig an aon Ă m aâ toirt taic do raon nas fharsainge de sheòrsan coimpiutaireachd, aâ toirt a-steach ceistean eadar-ghnĂŹomhach agus giullachd sruthan. Tha Ă ite cudromach aig astar ann a bhith aâ giullachd mòran dĂ ta, leis gur e astar a thâ ann a leigeas leat obrachadh gu eadar-ghnĂŹomhach gun a bhith aâ caitheamh mionaidean no uairean aâ feitheamh. Is e aon de na neartan as motha aig Spark a tha ga dhèanamh cho luath an comas air Ă ireamhachadh cuimhne a dhèanamh.
Tha am frèam seo sgrÏobhte ann an Scala, mar sin feumaidh tu a stà ladh an toiseach:
sudo apt-get install scalaLuchdaich a-nuas spark sgaoileadh bho lĂ rach-lĂŹn oifigeil. .
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"Unpack an tasglann:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/sparkCuir an t-slighe gu Spark ris an fhaidhle bash:
vim ~/.bashrcCuir na loidhnichean a leanas tron ââââneach-deasachaidh:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Ruith an à ithne gu h-Ïosal às deidh dhut atharrachaidhean a dhèanamh air bashrc:
source ~/.bashrcAâ cleachdadh AWS PostgreSQL
Chan eil air fhà gail ach an stòr-dà ta a chleachdadh anns an luchdaich sinn suas am fiosrachadh giullachd bho na sruthan. Airson seo cleachdaidh sinn seirbheis AWS RDS.
Rach gu consol AWS -> AWS RDS -> Stòr-dà ta -> Cruthaich stòr-dà ta:

Tagh PostgreSQL agus cliog air Next:

Air sgĂ th Tha an eisimpleir seo airson adhbharan foghlaim a-mhĂ in; cleachdaidh sinn frithealaiche an-asgaidh âaig aâ char as lughaâ (Sreath an-asgaidh):

An ath rud, chuir sinn diog anns aâ bhloc Sreath an-asgaidh, agus Ă s deidh sin gheibh sinn gu fèin-ghluasadach eisimpleir den chlas t2.micro - ged a tha e lag, tha e an-asgaidh agus gu math freagarrach airson ar gnĂŹomh:

An ath rud thig rudan glè chudromach: ainm eisimpleir an stòr-dĂ ta, ainm aâ phrĂŹomh neach-cleachdaidh agus am facal-faire aige. Ainmich an eisimpleir: myHabrTest, prĂŹomh chleachdaiche: habr, facal-faire: 12345 agus cliog air an ath phutan:

Air an ath dhuilleig tha paramadairean le uallach airson ruigsinneachd ar frithealaiche stòr-dà ta bhon taobh a-muigh (ruigsinneachd poblach) agus ruigsinneachd puirt:

Cruthaichidh sinn suidheachadh Úr airson buidheann tèarainteachd VPC, a leigeas le ruigsinneachd bhon taobh a-muigh don t-seirbheisiche stòr-dà ta againn tro phort 5432 (PostgreSQL).
Rachamaid gu consol AWS ann an uinneag brobhsair air leth gu deas-bhòrd VPC -> Buidhnean tèarainteachd -> Cruthaich roinn buidheann tèarainteachd:

Shuidhich sinn an t-ainm airson aâ bhuidheann Tèarainteachd - PostgreSQL, tuairisgeul, comharraich dè an VPC a bu chòir a bhith co-cheangailte ris aâ chuantal seo agus cliog air aâ phutan Cruthaich:

LĂŹon a-steach na riaghailtean Inbound airson port 5432 airson aâ bhuidheann Ăšr-chruthaichte, mar a chithear san dealbh gu h-ĂŹosal. Chan urrainn dhut am port a shònrachadh le lĂ imh, ach tagh PostgreSQL bhon liosta tuiteam-sĂŹos Seòrsa.
Gu cruaidh, tha an luach ::/0 aâ ciallachadh gum bi trafaic aâ tighinn a-steach don t-seirbheisiche bho air feadh an t-saoghail, rud nach eil gu tur fĂŹor, ach airson an eisimpleir a sgrĂšdadh, leigidh sinn leinn an dòigh-obrach seo a chleachdadh:

Bidh sinn aâ tilleadh gu duilleag aâ bhrobhsair, far a bheil âConfigure advanced settingsâ againn fosgailte agus tagh ann an roinn buidhnean tèarainteachd VPC -> Tagh buidhnean tèarainteachd VPC a thâ ann -> PostgreSQL:

An ath rud, anns na roghainnean Stòr-dà ta -> Ainm an stòr-dà ta -> suidhich an t-ainm - habrDB.
Faodaidh sinn na crÏochan a tha air fhà gail fhà gail, ach a-mhà in cÚl-taic a chuir dheth (Úine glèidhidh cÚl-taic - 0 latha), sgrÚdadh agus Lèirsinnean Coileanaidh, gu bunaiteach. Cliog air a 'phutan Cruthaich stòr-dà ta:

LĂ imhseachadh snĂ ithlean
Is e an ĂŹre mu dheireadh leasachadh obair Spark, a lĂ imhsicheas dĂ ta Ăšr aâ tighinn bho Kafka a h-uile dĂ dhiog agus a chuireas a-steach an toradh a-steach don stòr-dĂ ta.
Mar a chaidh a rĂ dh gu h-Ă rd, tha puingean-seic nam prĂŹomh inneal ann an SparkStreaming a dhâ fheumar a rèiteachadh gus dèanamh cinnteach Ă fulangas sgĂ inidhean. Cleachdaidh sinn puingean-seic agus, ma dhâ fhailicheas am modh-obrach, cha leig am modal Spark Streaming air ais ach tilleadh chun phuing-seic mu dheireadh agus Ă ireamhachadh a thòiseachadh bhuaithe gus an dĂ ta a chaidh air chall fhaighinn air ais.
Faodar sgrĂšdadh-seic a chomasachadh le bhith aâ suidheachadh eòlaire air siostam faidhle earbsach a tha fulangach air lochdan (leithid HDFS, S3, msaa) anns am bi am fiosrachadh puing-seic air a stòradh. Tha seo air a dhèanamh aâ cleachdadh, mar eisimpleir:
streamingContext.checkpoint(checkpointDirectory)Anns an eisimpleir againn, cleachdaidh sinn an dòigh-obrach a leanas, is e sin, ma tha checkpointDirectory ann, thèid an co-theacsa ath-chruthachadh bhon dĂ ta puing-seic. Mura h-eil an eòlaire ann (i.e. air a chuir gu bĂ s airson aâ chiad uair), an uairsin tha functionToCreateContext air a ghairm gus co-theacs Ăšr a chruthachadh agus DStreams a rèiteachadh:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Bidh sinn aâ cruthachadh rud DirectStream gus ceangal ris aâ chuspair âtransactionâ aâ cleachdadh an dòigh createDirectStream de leabharlann KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Aâ parsadh dĂ ta aâ tighinn a-steach ann an cruth JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Aâ cleachdadh Spark SQL, bidh sinn aâ dèanamh cruinneachadh sĂŹmplidh agus aâ taisbeanadh an toradh sa chonsail:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Aâ faighinn teacsa na ceiste agus ga ruith tro Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
Agus an uairsin bidh sinn aâ sĂ bhaladh an dĂ ta cruinnichte ann an clĂ r ann an AWS RDS. Gus na toraidhean cruinneachaidh a shĂ bhaladh gu clĂ r stòr-dĂ ta, cleachdaidh sinn an dòigh sgrĂŹobhaidh airson an nĂŹ DataFrame:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
Beagan fhaclan mu bhith aâ stèidheachadh ceangal ri AWS RDS. Chruthaich sinn an neach-cleachdaidh agus am facal-faire air a shon aig aâ cheum âDeploying AWS PostgreSQLâ. Bu chòir dhut Endpoint a chleachdadh mar url frithealaiche an stòr-dĂ ta, a tha air a thaisbeanadh anns an roinn Ceangalachd & tèarainteachd:
Gus Spark agus Kafka a cheangal gu ceart, bu chòir dhut an obair a ruith tro smark-submit aâ cleachdadh an artifact spark-streaming-kafka-0-8_2.11. A bharrachd air an sin, cleachdaidh sinn artifact airson eadar-obrachadh le stòr-dĂ ta PostgreSQL; gluaisidh sinn iad tro --packages.
Airson sĂšbailteachd an sgriobt, bheir sinn a-steach cuideachd mar pharaimearan cuir a-steach ainm frithealaiche na teachdaireachd agus an cuspair Ă s a bheil sinn airson dĂ ta fhaighinn.
Mar sin, tha an t-à m ann a chuir air bhog agus sgrÚdadh a dhèanamh air gnÏomhachd an t-siostaim:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
Dhâ obraich a h-uile cĂ il a-mach! Mar a chĂŹ thu san dealbh gu h-ĂŹosal, fhad âs a tha an tagradh aâ ruith, tha toraidhean cruinneachaidh Ăšra air an toirt a-mach gach 2 diog, oir shuidhich sinn an Ăšine batching gu 2 dhiog nuair a chruthaich sinn an rud StreamingContext:

An uairsin, bidh sinn aâ dèanamh ceist shĂŹmplidh don stòr-dĂ ta gus sĂšil a thoirt air lĂ thaireachd chlĂ ran sa chlĂ r malairt_sruth:

co-dhĂšnadh
Sheall an artaigil seo air eisimpleir de lĂ imhseachadh sruth fiosrachaidh aâ cleachdadh Spark Streaming ann an co-bhonn ri Apache Kafka agus PostgreSQL. Le fĂ s dĂ ta bho dhiofar thĂšsan, tha e duilich cus luach a thoirt air luach practaigeach Spark Streaming airson a bhith aâ cruthachadh sruthadh agus tagraidhean fĂŹor-Ăšine.
Gheibh thu an còd stòr slà n anns an stòr-dà ta agam aig .
Tha mi toilichte bruidhinn air an artaigil seo, tha mi aâ coimhead air adhart ri do bheachdan, agus tha mi an dòchas cuideachd cĂ ineadh cuideachail fhaighinn bhon a h-uile leughadair dĂ imheil.
Tha mi a 'guidhe gach soirbheachas dhut!
Salm. An toiseach bhathas an dĂšil stòr-dĂ ta PostgreSQL ionadail a chleachdadh, ach le mo ghaol air AWS, chuir mi romham an stòr-dĂ ta a ghluasad chun sgòth. Anns an ath artaigil air a âchuspair seo, seallaidh mi mar a chuireas tu an siostam gu lèir a tha air a mhĂŹneachadh gu h-Ă rd ann an AWS an gnĂŹomh aâ cleachdadh AWS Kinesis agus AWS EMR. Lean an naidheachd!
Source: www.habr.com

