Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Sveiks, Habr! Šodien mēs izveidosim sistēmu, kas apstrādās Apache Kafka ziņojumu straumes, izmantojot Spark Streaming, un ierakstīs apstrādes rezultātus AWS RDS mākoņa datubāzē.

Iedomāsimies, ka noteikta kredÄ«tiestāde mums uzdod veikt ienākoÅ”os darÄ«jumus "lidojuma laikā" visās tās filiālēs. To var izdarÄ«t, lai ātri aprēķinātu atklāto valÅ«tas pozÄ«ciju valsts kasei, darÄ«jumu limitus vai finanÅ”u rezultātus utt.

Kā īstenot Ŕo gadījumu, neizmantojot burvju un burvju burvestības - lasiet zem griezuma! Aiziet!

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu
(Attēla avots)

Ievads

Protams, liela datu apjoma apstrāde reāllaikā sniedz plaÅ”as izmantoÅ”anas iespējas mÅ«sdienu sistēmās. Viena no populārākajām kombinācijām Å”im nolÅ«kam ir Apache Kafka un Spark Streaming tandēms, kur Kafka izveido ienākoÅ”o ziņojumu pakeÅ”u straumi, bet Spark Streaming apstrādā Ŕīs paketes noteiktā laika intervālā.

Lai palielinātu lietojumprogrammas kļūdu toleranci, mēs izmantosim kontrolpunktus. Izmantojot Å”o mehānismu, kad Spark Streaming dzinējam ir jāatgÅ«st zaudētie dati, tam tikai jāatgriežas pēdējā kontrolpunktā un jāatsāk aprēķini no turienes.

Izstrādātās sistēmas arhitektūra

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Izmantotās sastāvdaļas:

  • Apache Kafka ir izplatÄ«ta publicÄ“Å”anas un abonÄ“Å”anas ziņojumapmaiņas sistēma. Piemērots gan bezsaistes, gan tieÅ”saistes ziņojumu patēriņam. Lai novērstu datu zudumu, Kafka ziņojumi tiek glabāti diskā un tiek replicēti klasterÄ«. Kafka sistēma ir balstÄ«ta uz ZooKeeper sinhronizācijas pakalpojumu;
  • Apache Spark straumÄ“Å”ana - Spark komponents straumÄ“Å”anas datu apstrādei. Spark Streaming modulis ir izveidots, izmantojot mikropakeÅ”u arhitektÅ«ru, kur datu straume tiek interpretēta kā nepārtraukta mazu datu pakeÅ”u secÄ«ba. Spark Streaming ņem datus no dažādiem avotiem un apvieno tos mazās pakotnēs. Jaunas paketes tiek veidotas regulāri. Katra laika intervāla sākumā tiek izveidota jauna pakete, un visi Å”ajā intervālā saņemtie dati tiek iekļauti paketē. Intervāla beigās pakeÅ”u augÅ”ana apstājas. Intervāla lielumu nosaka parametrs, ko sauc par partijas intervālu;
  • Apache Spark SQL - apvieno relāciju apstrādi ar Spark funkcionālo programmÄ“Å”anu. Strukturētie dati ir dati, kuriem ir shēma, tas ir, viena lauku kopa visiem ierakstiem. Spark SQL atbalsta ievadi no dažādiem strukturētu datu avotiem un, pateicoties shēmas informācijas pieejamÄ«bai, var efektÄ«vi izgÅ«t tikai nepiecieÅ”amos ierakstu laukus, kā arÄ« nodroÅ”ina DataFrame API;
  • AWS RDS ir salÄ«dzinoÅ”i lēta uz mākoņiem balstÄ«ta relāciju datubāze, tÄ«mekļa pakalpojums, kas vienkārÅ”o iestatÄ«Å”anu, darbÄ«bu un mērogoÅ”anu, un to tieÅ”i administrē Amazon.

Kafka servera instalēŔana un palaiŔana

Pirms tieŔi izmantojat Kafka, jums ir jāpārliecinās, vai jums ir Java, jo... JVM tiek izmantots darbam:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Izveidosim jaunu lietotāju darbam ar Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Pēc tam lejupielādējiet izplatÄ«Å”anu no oficiālās Apache Kafka vietnes:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Izsaiņojiet lejupielādēto arhīvu:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Nākamā darbība nav obligāta. Fakts ir tāds, ka noklusējuma iestatījumi neļauj pilnībā izmantot visas Apache Kafka iespējas. Piemēram, izdzēsiet tēmu, kategoriju, grupu, kurai var publicēt ziņojumus. Lai to mainītu, rediģēsim konfigurācijas failu:

vim ~/kafka/config/server.properties

Faila beigās pievienojiet Ŕādu tekstu:

delete.topic.enable = true

Pirms Kafka servera palaiÅ”anas jums ir jāstartē serveris ZooKeeper; mēs izmantosim Kafka izplatÄ«Å”anas komplektācijā iekļauto palÄ«gskriptu:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Kad ZooKeeper ir veiksmÄ«gi startējis, palaidiet Kafka serveri atseviŔķā terminālÄ«:

bin/kafka-server-start.sh config/server.properties

Izveidosim jaunu tēmu ar nosaukumu Darījums:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Pārliecināsimies, ka ir izveidota tēma ar nepiecieÅ”amo nodalÄ«jumu skaitu un replikāciju:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

PalaidÄ«sim garām brīžus, kad tiek pārbaudÄ«ts ražotājs un patērētājs jaunizveidotajai tēmai. SÄ«kāka informācija par to, kā pārbaudÄ«t ziņojumu sÅ«tÄ«Å”anu un saņemÅ”anu, ir rakstÄ«ta oficiālajā dokumentācijā - NosÅ«tiet dažas ziņas. Mēs pārejam pie ražotāja rakstÄ«Å”anas Python, izmantojot KafkaProducer API.

Producentu rakstīŔana

Ražotājs Ä£enerēs nejauÅ”us datus - 100 ziņas katru sekundi. Ar nejauÅ”iem datiem mēs saprotam vārdnÄ«cu, kas sastāv no trim laukiem:

  • Filiāle ā€” kredÄ«tiestādes tirdzniecÄ«bas vietas nosaukums;
  • ValÅ«ta ā€” darÄ«juma valÅ«ta;
  • summa ā€” darÄ«juma summa. Summa bÅ«s pozitÄ«vs skaitlis, ja tas ir Bankas valÅ«tas pirkums, un negatÄ«vs skaitlis, ja tas ir pārdoÅ”ana.

Ražotāja kods izskatās Ŕādi:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Pēc tam, izmantojot sÅ«tÄ«Å”anas metodi, mēs nosÅ«tām ziņojumu uz serveri, uz mums nepiecieÅ”amo tēmu, JSON formātā:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Palaižot skriptu, mēs terminālÄ« saņemam Ŕādus ziņojumus:

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Tas nozÄ«mē, ka viss darbojas tā, kā mēs vēlējāmies ā€“ producents Ä£enerē un sÅ«ta ziņas uz mums nepiecieÅ”amo tēmu.
Nākamais solis ir instalēt Spark un apstrādāt Å”o ziņojumu straumi.

Apache Spark instalēŔana

Apache Spark ir universāla un augstas veiktspējas klasteru skaitļoÅ”anas platforma.

Spark darbojas labāk nekā populārās MapReduce modeļa implementācijas, vienlaikus atbalstot plaŔāku skaitļoÅ”anas veidu klāstu, tostarp interaktÄ«vos vaicājumus un straumes apstrādi. Ātrumam ir svarÄ«ga loma, apstrādājot lielu datu apjomu, jo tas ir ātrums, kas ļauj strādāt interaktÄ«vi, netērējot minÅ«tes vai stundas. Viena no lielākajām lietām, kas padara Spark tik ātru, ir tās spēja veikt aprēķinus atmiņā.

Šis ietvars ir rakstīts programmā Scala, tāpēc vispirms tas jāinstalē:

sudo apt-get install scala

Lejupielādējiet Spark izplatÄ«Å”anu no oficiālās vietnes:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Izpakojiet arhīvu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Pievienojiet bash failam ceļu uz Spark:

vim ~/.bashrc

Izmantojot redaktoru, pievienojiet Ŕādas rindiņas:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Pēc bashrc izmaiņu veikÅ”anas palaidiet tālāk norādÄ«to komandu:

source ~/.bashrc

AWS PostgreSQL izvietoŔana

Atliek tikai izvietot datu bāzi, kurā augÅ”upielādēsim apstrādāto informāciju no straumēm. Å im nolÅ«kam mēs izmantosim AWS RDS pakalpojumu.

Dodieties uz AWS konsoli -> AWS RDS -> Datubāzes -> Izveidot datu bāzi:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Atlasiet PostgreSQL un noklikŔķiniet uz Tālāk:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Jo Å is piemērs ir paredzēts tikai izglÄ«tojoÅ”iem nolÅ«kiem; mēs izmantosim bezmaksas serveri ā€œvismazā€ (bezmaksas lÄ«menis):
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Tālāk mēs ieliekam Ä·eksÄ«ti Free Tier blokā, un pēc tam mums automātiski tiks piedāvāts t2.micro klases gadÄ«jums - lai arÄ« vājÅ”, tas ir bezmaksas un diezgan piemērots mÅ«su uzdevumam:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Tālāk seko ļoti svarÄ«gas lietas: datu bāzes instances nosaukums, galvenā lietotāja vārds un viņa parole. Nosauksim gadÄ«jumu: myHabrTest, galvenais lietotājs: habr, parole: habr12345 un noklikŔķiniet uz pogas Tālāk:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Nākamajā lapā ir norādīti parametri, kas atbild par mūsu datu bāzes servera pieejamību no ārpuses (publiskā pieejamība) un portu pieejamību:

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Izveidosim jaunu iestatÄ«jumu VPC droŔības grupai, kas ļaus ārēji piekļūt mÅ«su datu bāzes serverim caur portu 5432 (PostgreSQL).
Dosimies uz AWS konsoli atseviŔķā pārlūkprogrammas logā uz VPC informācijas paneli -> DroŔības grupas -> Izveidot droŔības grupu sadaļu:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Mēs iestatām droŔības grupas nosaukumu - PostgreSQL, aprakstu, norādiet, ar kuru VPC Ŕī grupa ir jāsaista, un noklikŔķiniet uz pogas Izveidot:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Aizpildiet jaunizveidotās grupas portam 5432 ienākoÅ”os noteikumus, kā parādÄ«ts attēlā zemāk. Portu nevar norādÄ«t manuāli, bet nolaižamajā sarakstā Type atlasiet PostgreSQL.

Stingri runājot, vērtÄ«ba ::/0 nozÄ«mē ienākoŔās trafika pieejamÄ«bu serverim no visas pasaules, kas kanoniski nav pilnÄ«gi taisnÄ«ba, taču, lai analizētu piemēru, ļausim sev izmantot Ŕādu pieeju:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Mēs atgriežamies pārlÅ«kprogrammas lapā, kur ir atvērts ā€œPapildu iestatÄ«jumu konfigurÄ“Å”anaā€ un sadaļā VPC droŔības grupas atlasiet -> Izvēlieties esoŔās VPC droŔības grupas -> PostgreSQL:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Pēc tam datu bāzes opcijās -> Datubāzes nosaukums -> iestatiet nosaukumu - habrDB.

AtlikuÅ”os parametrus, izņemot dublÄ“Å”anas atspējoÅ”anu (dublējuma saglabāŔanas periods - 0 dienas), uzraudzÄ«bu un Performance Insights, varam atstāt pēc noklusējuma. NoklikŔķiniet uz pogas Izveidot datu bāzi:
Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Vītņu apstrādātājs

Pēdējais posms būs Spark darba izstrāde, kas ik pēc divām sekundēm apstrādās jaunus Kafkas datus un ievadīs rezultātu datubāzē.

Kā minēts iepriekÅ”, kontrolpunkti ir SparkStreaming galvenais mehānisms, kas ir jākonfigurē, lai nodroÅ”inātu kļūdu toleranci. Mēs izmantosim kontrolpunktus, un, ja procedÅ«ra neizdosies, Spark Streaming modulim bÅ«s tikai jāatgriežas pēdējā kontrolpunktā un jāatsāk aprēķini no tā, lai atgÅ«tu zaudētos datus.

Kontrolpunktu var iespējot, iestatot direktoriju kļūmju izturīgā, uzticamā failu sistēmā (piemēram, HDFS, S3 utt.), kurā tiks saglabāta kontrolpunkta informācija. Tas tiek darīts, izmantojot, piemēram:

streamingContext.checkpoint(checkpointDirectory)

MÅ«su piemērā mēs izmantosim Ŕādu pieeju, proti, ja pastāv checkpointDirectory, konteksts tiks izveidots no jauna no kontrolpunkta datiem. Ja direktorija neeksistē (t.i., tiek izpildÄ«ta pirmo reizi), tiek izsaukta funkcija functionToCreateContext, lai izveidotu jaunu kontekstu un konfigurētu DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Mēs izveidojam DirectStream objektu, lai izveidotu savienojumu ar ā€œdarÄ«jumaā€ tēmu, izmantojot KafkaUtils bibliotēkas metodi createDirectStream:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

IenākoÅ”o datu parsÄ“Å”ana JSON formātā:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Izmantojot Spark SQL, mēs veicam vienkārÅ”u grupÄ“Å”anu un konsolē parādām rezultātu:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Vaicājuma teksta iegūŔana un palaiŔana, izmantojot Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Un pēc tam mēs saglabājam iegÅ«tos apkopotos datus tabulā AWS RDS. Lai apkopoÅ”anas rezultātus saglabātu datu bāzes tabulā, mēs izmantosim DataFrame objekta rakstÄ«Å”anas metodi:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Daži vārdi par savienojuma iestatÄ«Å”anu ar AWS RDS. Mēs tam izveidojām lietotāju un paroli, veicot darbÄ«bu ā€œAWS PostgreSQL izvietoÅ”anaā€. Kā datu bāzes servera URL ir jāizmanto Endpoint, kas tiek parādÄ«ts sadaļā SavienojamÄ«ba un droŔība:

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Lai pareizi savienotu Spark un Kafka, jums vajadzētu palaist darbu, izmantojot smark-submit, izmantojot artefaktu spark-streaming-kafka-0-8_2.11. Turklāt mēs izmantosim arī artefaktu, lai mijiedarbotos ar PostgreSQL datu bāzi; mēs tos pārsūtīsim, izmantojot paketes.

Skripta elastības labad kā ievades parametrus iekļausim arī ziņojumu servera nosaukumu un tēmu, no kuras vēlamies saņemt datus.

Tātad, ir pienācis laiks palaist un pārbaudīt sistēmas funkcionalitāti:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Viss izdevās! Kā redzams zemāk esoÅ”ajā attēlā, lietojumprogrammas darbÄ«bas laikā jauni apkopoÅ”anas rezultāti tiek izvadÄ«ti ik pēc 2 sekundēm, jo ā€‹ā€‹mēs iestatÄ«jām pakeÅ”u intervālu uz 2 sekundēm, kad veidojām StreamingContext objektu:

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Tālāk mēs veicam vienkārÅ”u vaicājumu datu bāzei, lai pārbaudÄ«tu ierakstu klātbÅ«tni tabulā transakcijas_plÅ«sma:

Apache Kafka un straumÄ“Å”anas datu apstrāde ar Spark straumÄ“Å”anu

Secinājums

Å ajā rakstā tika aplÅ«kots informācijas straumes apstrādes piemērs, izmantojot Spark Streaming kopā ar Apache Kafka un PostgreSQL. Pieaugot datiem no dažādiem avotiem, ir grÅ«ti pārvērtēt Spark Streaming praktisko vērtÄ«bu straumÄ“Å”anas un reāllaika lietojumprogrammu izveidē.

Pilnu avota kodu varat atrast manā repozitorijā vietnē GitHub.

Ar prieku apspriežu Ŕo rakstu, gaidu jūsu komentārus, kā arī ceru uz konstruktīvu kritiku no visiem gādīgajiem lasītājiem.

Es vēlu jums veiksmi!

Ps. Sākotnēji bija plānots izmantot vietējo PostgreSQL datu bāzi, taču, ņemot vērā manu mÄ«lestÄ«bu pret AWS, es nolēmu pārvietot datu bāzi uz mākoni. Nākamajā rakstā par Å”o tēmu es parādÄ«Å”u, kā ieviest visu iepriekÅ” aprakstÄ«to sistēmu AWS, izmantojot AWS Kinesis un AWS EMR. Sekojiet jaunumiem!

Avots: www.habr.com

Pievieno komentāru