Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Hello, Habr! Maanta waxaan dhisi doonaa nidaam ka baaraandegi doona qulqulka fariinta Apache Kafka anagoo adeegsanayna Spark Streaming oo u qori doona natiijada habaynta xogta daruuraha AWS RDS.

Aynu qiyaasno in hay'ad deyn gaar ah ay ina dejiso hawsha ka-hortagga macaamil ganacsi ee soo galaya "duuqsi" dhammaan laamaheeda. Tan waxaa loo samayn karaa iyadoo ujeedadu tahay in si degdeg ah loo xisaabiyo booska lacagta furan ee khasnadda, xadka ama natiijooyinka maaliyadeed ee wax kala iibsiga, iwm.

Sida loo hirgeliyo kiiskan iyada oo aan la isticmaalin sixirka iyo sixirka - akhri hoostiisa! Tag!

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming
(Isha sawirka)

Horudhac

Dabcan, ka baaraandegida tiro badan oo xog ah wakhtiga dhabta ah waxay bixisaa fursado ku filan oo loogu isticmaalo nidaamyada casriga ah. Mid ka mid ah isku dhafka ugu caansan ee tan waa tandem Apache Kafka iyo Spark Streaming, halkaas oo Kafka ay abuurto qulqulka xirmooyinka fariinta soo socota, iyo Spark Streaming waxay socodsiisaa xirmooyinkan waqti go'an.

Si loo kordhiyo dulqaadka qaladka ee codsiga, waxaan isticmaali doonaa isbaarooyinka. Habkan, marka mishiinka Spark Streaming u baahan yahay inuu soo kabsado xogta luntay, waxay u baahan tahay oo kaliya inuu ku laabto barta kontoroolka ee ugu dambeysa oo uu dib uga bilaabo xisaabinta halkaas.

Dhismaha nidaamka la horumariyay

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Qaybaha la isticmaalay:

  • Apache Kafka waa nidaamka fariimaha daabacaadda ee la qaybiyey. Ku habboon isticmaalka khadka tooska ah iyo khadka labadaba. Si looga hortago luminta xogta, fariimaha Kafka waxaa lagu kaydiyaa saxanka waxaana lagu soo koobaa kooxda dhexdeeda. Nidaamka Kafka waxaa lagu dhisay korka adeegga isku-xirka ZooKeeper;
  • Apache Spark Streaming -Qaybta dhimbiil ee habaynta xogta socodka Module-ka Spark Streaming waxa la dhisay iyadoo la isticmaalayo qaab-dhismeed-yar-yar, halkaas oo qulqulka xogta loo tarjumo inay tahay taxane joogto ah oo xirmo xog yar ah. Qulqulka Spark wuxuu ka qaadaa xogta ilo kala duwan wuxuuna ku daraa baakado yaryar. Xirmooyinka cusub ayaa la abuuraa waqtiyo joogto ah. Bilawga wakhti kasta oo dhexda ah, baakidh cusub ayaa la abuuraa, xog kasta oo la helay inta lagu jiro mudadaas waxa lagu daraa baqshadda. Dhammaadka barqada, korriinka baakidhku wuu joogsadaa. Baaxadda dhexda waxaa lagu go'aamiyaa halbeeg la yiraahdo dhexda dufcadda;
  • Apache Spark SQL - wuxuu isku daraa habaynta xidhiidhka iyo barnaamijka Spark functional. Xogta qaabaysan macneheedu waa xog leh schema, taas oo ah, hal goob oo kayd ah oo loogu talagalay dhammaan diiwaannada. Spark SQL waxay taageertaa talooyinka laga helo ilo xogeed oo habaysan oo kala duwan, iyada oo ay ugu wacan tahay helitaanka macluumaadka schema, waxay si hufan u soo ceshan kartaa kaliya meelaha loo baahan yahay ee diiwaanada, waxayna sidoo kale bixisaa DataFrame APIs;
  • AWS RDS waa xog-ururin ku salaysan qiime jaban, adeeg shabakadeed kaas oo fududeeya habaynta, hawlgalka iyo miisaanka, waxaana si toos ah u maamula Amazon.

Ku rakibida iyo socodsiinta server-ka Kafka

Kahor intaadan si toos ah u isticmaalin Kafka, waxaad u baahan tahay inaad hubiso inaad haysato Java, sababtoo ah ... JVM waxaa loo isticmaalaa shaqada:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Aan abuurno isticmaale cusub oo la shaqeeya Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Marka xigta, kala soo bax qaybinta bogga rasmiga ah ee Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Furo kaydka la soo dejiyay:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Tallaabada xigta waa ikhtiyaari. Xaqiiqdu waxay tahay in goobaha caadiga ah aysan kuu ogolaaneynin inaad si buuxda u isticmaasho dhammaan sifooyinka Apache Kafka. Tusaale ahaan, tirtir mawduuc, qaybta, kooxda fariimaha la daabici karo. Si tan loo beddelo, aan tafatirno faylka qaabeynta:

vim ~/kafka/config/server.properties

Kudar waxa soo socda dhamaadka faylka:

delete.topic.enable = true

Kahor intaadan bilaabin server-ka Kafka, waxaad u baahan tahay inaad bilowdo server-ka ZooKeeper; waxaanu isticmaali doonaa qoraalka kaalka ah ee la socda qaybinta Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Ka dib markii ZooKeeper uu si guul leh ku bilowday, ku billow server-ka Kafka meel gaar ah:

bin/kafka-server-start.sh config/server.properties

Aan abuurno mowduuc cusub oo la yiraahdo Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Aan hubinno in mawduuc leh tirada loo baahan yahay ee qaybinta iyo ku celcelinta la abuuray:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Aynu seegno waqtiyada tijaabinta soo saaraha iyo macaamilka mawduuca cusub ee la abuuray. Faahfaahin dheeraad ah oo ku saabsan sida aad u tijaabin karto diritaanka iyo helitaanka fariimaha ayaa lagu qoray dukumeenti rasmi ah - Dir fariimaha qaar. Hagaag, waxaan u gudubnay inaan ku qorno soo saaraha Python anagoo adeegsanayna KafkaProducer API.

Qorista soo saaraha

Soo saaraha ayaa soo saari doona xog random - 100 farriimo ilbiriqsi kasta. Xogta random-ka waxaan ula jeednaa qaamuus ka kooban saddex qaybood:

  • Laanta - magaca barta iibka ee machadka deynta;
  • lacagta - lacagta wax kala iibsiga;
  • Lacagta - xadiga wax kala iibsiga. Lacagtu waxay noqonaysaa tiro togan haddii ay tahay lacag uu Bangigu soo iibsanayo, iyo tiro taban haddii ay tahay iib.

Koodhka soo saaraha ayaa u eg sidan:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Marka xigta, annagoo adeegsanayna habka dirista, waxaan fariin u direynaa server-ka, mowduuca aan u baahanahay, qaab JSON ah:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Marka aan wadno qoraalka, waxaan ku helnaa fariimaha soo socda ee terminalka:

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Tani waxay ka dhigan tahay in wax walba ay u shaqeeyaan sidii aan rabnay - soo saaraha ayaa soo saara oo u soo dira farriimaha mawduuca aan u baahanahay.
Talaabada xigta waa in la rakibo Spark oo la habeeyo socodka fariintan.

Ku rakibida Apache Spark

Apache Spark waa madal xisaabin kooxeed caalami ah oo waxqabadkeedu sarreeyo.

Spark wuxuu u shaqeeyaa si ka wanaagsan hirgelinta caanka ah ee qaabka MapReduce isagoo taageeraya noocyo badan oo xisaabinta, oo ay ku jiraan weydiimaha isdhexgalka iyo habaynta socodka. Xawaaruhu wuxuu door muhiim ah ka ciyaaraa marka la farsameynayo tiro badan oo xog ah, maadaama ay tahay xawaaraha kuu ogolaanaya inaad si wada jir ah u shaqeyso adigoon daqiiqado ama saacado sugin. Mid ka mid ah awoodaha ugu weyn ee Spark ee ka dhigaya mid aad u dhakhso badan ayaa ah awoodda ay u leedahay in ay qabato xisaabinta xusuusta.

Qaab dhismeedkan wuxuu ku qoran yahay Scala, marka waxaad u baahan tahay inaad marka hore rakibto:

sudo apt-get install scala

Ka soo deji qaybinta Spark website-ka rasmiga ah:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Furo kaydka:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ku dar dariiqa Spark faylka bash:

vim ~/.bashrc

Ku dar sadarradan soo socda tifaftiraha:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Ku socodsii amarka hoose ka dib markaad isbeddel ku sameyso bashrc:

source ~/.bashrc

Gelinaya AWS PostgreSQL

Waxa hadhay oo dhan waa in la geeyo kaydka xogta oo aanu ku shubi doono macluumaadka la farsameeyay ee ka imanaya durdurrada. Taas waxaan u isticmaali doonaa adeega AWS RDS.

Tag AWS console-> AWS RDS -> Databases -> Abuur xogta macluumaadka:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Dooro PostgreSQL oo dhagsii Xiga:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Sababtoo ah Tusaalahan waxa loogu talagalay ujeeddooyin waxbarasho oo keliya; waxaanu isticmaali doonaa adeegaha bilaashka ah β€œugu yaraan” (Tier Free):
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Marka xigta, waxaan ku dhejineynaa calaamadda qaybta Free Tier block, ka dibna si toos ah ayaa naloo soo bandhigi doonaa tusaale ahaan fasalka t2.micro - inkasta oo daciif ah, waa bilaash oo ku habboon hawshayada:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Waxa soo socda waxyaabo aad u muhiim ah: magaca tusaale ahaan database-ka, magaca isticmaalaha sayidkiisa iyo erayga sirta ah. Aan magacawno tusaale ahaan: myHabrTest, isticmaale sare: habr, furaha sirta ah: habr12345 oo dhagsii badhanka Xiga:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Bogga xiga waxaa ku jira cabbirro mas'uul ka ah gelitaanka server-ka xog-ururinta ee dibadda (Gelitaanka Dadweynaha) iyo helitaanka dekedda:

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Aan u abuurno goob cusub kooxda amniga ee VPC, taas oo u oggolaan doonta gelitaanka dibadda ee server-ka xog ururinta ee dekedda 5432 (PostgreSQL).
Aan u tagno console-ka AWS daaqad gaar ah oo browserka VPC Dashboard -> Kooxaha Amniga -> Abuur qaybta kooxda amniga:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Waxaan u dejinay magaca kooxda Amniga - PostgreSQL, sharraxaad, tilmaame VPC kooxdan waa in lala xiriiriyaa oo guji badhanka Abuur:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Buuxi xeerarka Inbound ee dekedda 5432 ee kooxda cusub ee la abuuray, sida ka muuqata sawirka hoose. Ma cayimi kartid dekedda gacanta, laakiin ka dooro PostgreSQL liiska hoos-u-dhaca Nooca.

Si adag u hadla, qiimaha :: / 0 macnaheedu waa helitaanka taraafikada soo galaya server-ka adduunka oo dhan, taas oo aan gebi ahaanba run ahayn, laakiin si loo falanqeeyo tusaalaha, aan u oggolaano nafteena inaan isticmaalno habkan:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Waxaan ku laabaneynaa bogga biraawsarka, halkaas oo aan ku hayno "Configure settings advanced" oo furan oo dooro qaybta kooxaha amniga VPC -> Dooro kooxaha amniga VPC ee jira -> PostgreSQL:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Marka xigta, ikhtiyaaraadka Database -> Magaca database -> deji magaca - habrDB.

Waxaan ka tagi karnaa xuduudaha haray, marka laga reebo curyaaminta kaabta (xilliga kaydinta - 0 maalmood), la socodka iyo Aragtida Waxqabadka, si caadi ah. Guji badhanka Abuur xog-ururin:
Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Xakameeyaha dunta

Marxaladda ugu dambeysa waxay noqon doontaa horumarinta shaqada Spark, kaas oo ka shaqeyn doona xogta cusub ee Kafka labadii ilbiriqsi kasta oo geli doonta natiijada xogta xogta.

Sida kor lagu xusay, isbaaradu waa habka ugu muhiimsan ee SparkStreaming kaas oo ay tahay in la habeeyo si loo hubiyo dulqaadka qaladka. Waxaan isticmaali doonaa isbaarooyinka, haddii nidaamku guuldareysto, moduleka Spark Streaming wuxuu kaliya u baahan doonaa inuu ku soo laabto isbaarada ugu dambeysa oo uu dib uga bilaabo xisaabinta si uu u soo ceshado xogta luntay.

Isbaarada waxaa lagu suurtagelin karaa in la dejiyo hage ku saabsan nidaamka faylalka u dulqaadan kara, ee la isku halayn karo (sida HDFS, S3, iwm.) kaas oo macluumaadka isbaarada lagu kaydin doono. Tan waxaa lagu sameeyaa iyadoo la isticmaalayo, tusaale ahaan:

streamingContext.checkpoint(checkpointDirectory)

Tusaalahayaga, waxaan adeegsan doonaa habkan soo socda, kaas oo ah, haddii barta isbaarada ay jirto, markaas macnaha guud ayaa laga soo saari doonaa xogta isbaarada. Haddii tusaha uusan jirin (ie, la fuliyay markii ugu horeysay), ka dibna functionToCreateContext waxaa loogu yeeraa si loo abuuro xaalad cusub oo loo habeeyo DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Waxaan abuurnaa shay DirectStream si aan ugu xidhno mawduuca "wax kala wareejinta" anagoo adeegsanayna habka CreateDirectStream ee maktabadda KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Falanqaynta xogta soo socota ee qaabka JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Adeegsiga Spark SQL, waxaanu samaynaa kooxaysi fudud oo aanu ku muujinayno natiijada console-ka:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Helitaanka qoraalka weydiinta oo ku socodsiiya Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Kadibna waxaan ku keydineynaa xogta la isku daray ee ka dhalatay shaxda AWS RDS. Si loo kaydiyo natiijooyinka isku-darka miiska xogta, waxaanu isticmaali doonaa habka qorista shayga DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Dhawr kelmadood oo ku saabsan samaynta xidhiidhka AWS RDS. Waxaan u abuurnay isticmaalaha iyo erayga sirta ah ee "Deploying AWS PostgreSQL" tallaabada. Waa inaad u isticmaashaa Endpoint sida url server-ka xogta, kaas oo lagu muujiyay qaybta isku xidhka iyo amniga:

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Si aad si sax ah ugu xidhid Spark iyo Kafka, waa in aad ku socodsiisaa shaqada adigoo isticmaalaya smark-submit addoo isticmaalaya artifact dhimbiil-qulqulaya-kafka-0-8_2.11. Intaa waxaa dheer, waxaan sidoo kale u isticmaali doonaa farshaxan si aan ula falgallo xogta PostgreSQL; waxaan ku wareejin doonaa iyada oo loo marayo --packages.

Dabacsanaanta qoraalka, waxaan sidoo kale ku dari doonaa sida cabbiraadda gelinta magaca server-ka fariinta iyo mawduuca aan rabno inaan ka helno xogta.

Marka, waa waqtigii la bilaabi lahaa oo la hubin lahaa shaqeynta nidaamka:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Wax walba waa ay shaqeeyeen! Sida aad ku arki karto sawirka hoose, inta codsigu socdo, natiijooyinka isugeynta cusub ayaa soo baxaya 2 ilbiriqsi kasta, sababtoo ah waxaan dejinay mudada u dhaxaysa 2 ilbiriqsi markii aan abuurnay shayga StreamingContext:

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

Marka xigta, waxaanu samaynaa su'aal fudud oo ku saabsan xogta xogta si aan u hubinno joogitaanka diiwaannada shaxda dhaqdhaqaaqa_socodka:

Apache Kafka iyo Habaynta Xogta Socodka leh ee Spark Streaming

gunaanad

Maqaalkani wuxuu eegay tusaale socodsiinta macluumaadka iyadoo la adeegsanayo Spark Streaming iyadoo lala kaashanayo Apache Kafka iyo PostgreSQL. Iyada oo kobaca xogta laga helayo ilo kala duwan, way adagtahay in la qiyaaso qiimaha wax ku oolka ah ee Spark Streaming ee abuurista qulqulka iyo codsiyada waqtiga-dhabta ah.

Waxaad ka heli kartaa koodhka isha buuxa ee kaydkayga GitHub.

Waan ku faraxsanahay inaan ka hadlo maqaalkan, waxaan rajeynayaa faallooyinkaaga, waxaan sidoo kale rajeynayaa dhaleeceyn wax ku ool ah dhammaan akhristayaasha daryeelka.

Guul baan kuu rajaynayaa!

Sabuurka. Markii hore waxaa la qorsheeyay in la isticmaalo xogta macluumaadka ee PostgreSQL, laakiin jacaylka aan u qabo AWS, waxaan go'aansaday inaan u wareejiyo xogta daruuraha. Maqaalka soo socda ee mawduucan, waxaan ku tusi doonaa sida loo hirgeliyo dhammaan nidaamka kor ku xusan ee AWS iyadoo la adeegsanayo AWS Kinesis iyo AWS EMR. Lasoco wararka

Source: www.habr.com

Add a comment