Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Përshëndetje, Habr! Sot do të ndërtojmë një sistem që do të përpunojë transmetimet e mesazheve Apache Kafka duke përdorur Spark Streaming dhe do t'i shkruajë rezultatet e përpunimit në bazën e të dhënave AWS RDS cloud.

Le të imagjinojmë se një institucion i caktuar krediti na vendos detyrën për të përpunuar transaksionet hyrëse “në fluturim” në të gjitha degët e tij. Kjo mund të bëhet për qëllimin e llogaritjes së menjëhershme të një pozicioni të hapur valutor për thesarin, kufijtë ose rezultatet financiare për transaksionet, etj.

Si ta zbatoni këtë rast pa përdorimin e magjive dhe magjive - lexoni nën prerje! Shkoni!

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming
(Burimi i imazhit)

Paraqitje

Natyrisht, përpunimi i një sasie të madhe të dhënash në kohë reale ofron mundësi të shumta për përdorim në sistemet moderne. Një nga kombinimet më të njohura për këtë është tandemi i Apache Kafka dhe Spark Streaming, ku Kafka krijon një rrjedhë të paketave të mesazheve hyrëse dhe Spark Streaming i përpunon këto pako në një interval të caktuar kohor.

Për të rritur tolerancën ndaj gabimeve të aplikacionit, ne do të përdorim pikat e kontrollit. Me këtë mekanizëm, kur motori Spark Streaming duhet të rikuperojë të dhënat e humbura, duhet vetëm të kthehet në pikën e fundit të kontrollit dhe të rifillojë llogaritjet prej andej.

Arkitektura e sistemit të zhvilluar

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Komponentët e përdorur:

  • Apache Kafka është një sistem i shpërndarë mesazhesh publikim-subscribe. I përshtatshëm për konsumimin e mesazheve offline dhe online. Për të parandaluar humbjen e të dhënave, mesazhet e Kafkës ruhen në disk dhe riprodhohen brenda grupit. Sistemi Kafka është ndërtuar në krye të shërbimit të sinkronizimit ZooKeeper;
  • Apache Spark Streaming - Komponenti Spark për përpunimin e të dhënave të transmetimit. Moduli Spark Streaming është ndërtuar duke përdorur një arkitekturë mikro-batch, ku rrjedha e të dhënave interpretohet si një sekuencë e vazhdueshme e paketave të vogla të të dhënave. Spark Streaming merr të dhëna nga burime të ndryshme dhe i kombinon ato në paketa të vogla. Paketat e reja krijohen në intervale të rregullta. Në fillim të çdo intervali kohor, krijohet një paketë e re dhe çdo e dhënë e marrë gjatë atij intervali përfshihet në paketë. Në fund të intervalit, rritja e paketave ndalon. Madhësia e intervalit përcaktohet nga një parametër i quajtur intervali i grupit;
  • Apache Spark SQL - kombinon përpunimin relacional me programimin funksional Spark. Të dhëna të strukturuara nënkuptojnë të dhëna që kanë një skemë, domethënë një grup të vetëm fushash për të gjitha regjistrimet. Spark SQL mbështet të dhëna nga një shumëllojshmëri burimesh të strukturuara të të dhënave dhe, falë disponueshmërisë së informacionit të skemës, mund të marrë në mënyrë efikase vetëm fushat e kërkuara të regjistrimeve, dhe gjithashtu ofron DataFrame API;
  • AWS RDS është një bazë të dhënash relacionale relativisht e lirë e bazuar në cloud, shërbim ueb që thjeshton konfigurimin, funksionimin dhe shkallëzimin dhe administrohet drejtpërdrejt nga Amazon.

Instalimi dhe ekzekutimi i serverit Kafka

Përpara se të përdorni Kafka direkt, duhet të siguroheni që keni Java, sepse... JVM përdoret për punë:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Le të krijojmë një përdorues të ri për të punuar me Kafkën:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Më pas, shkarkoni shpërndarjen nga faqja zyrtare e Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Shpaketoni arkivin e shkarkuar:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Hapi tjetër është fakultativ. Fakti është se cilësimet e paracaktuara nuk ju lejojnë të përdorni plotësisht të gjitha tiparet e Apache Kafka. Për shembull, fshini një temë, kategori, grup në të cilin mund të publikohen mesazhet. Për ta ndryshuar këtë, le të modifikojmë skedarin e konfigurimit:

vim ~/kafka/config/server.properties

Shtoni sa vijon në fund të skedarit:

delete.topic.enable = true

Përpara se të nisni serverin Kafka, duhet të nisni serverin ZooKeeper; ne do të përdorim skriptin ndihmës që vjen me shpërndarjen Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Pasi ZooKeeper të ketë filluar me sukses, hapni serverin Kafka në një terminal të veçantë:

bin/kafka-server-start.sh config/server.properties

Le të krijojmë një temë të re të quajtur Transaksion:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Le të sigurohemi që është krijuar një temë me numrin e nevojshëm të ndarjeve dhe replikimeve:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Le të humbasim momentet e testimit të prodhuesit dhe konsumatorit për temën e krijuar rishtazi. Më shumë detaje se si mund të testoni dërgimin dhe marrjen e mesazheve janë shkruar në dokumentacionin zyrtar - Dërgo disa mesazhe. Epo, ne vazhdojmë të shkruajmë një prodhues në Python duke përdorur API-në e KafkaProducer.

Shkrimi i prodhuesit

Prodhuesi do të gjenerojë të dhëna të rastësishme - 100 mesazhe çdo sekondë. Me të dhëna të rastësishme nënkuptojmë një fjalor të përbërë nga tre fusha:

  • Degë - emri i pikës së shitjes së institucionit të kreditit;
  • Monedhë — valuta e transaksionit;
  • Sasia - shuma e transaksionit. Shuma do të jetë një numër pozitiv nëse është një blerje valutë nga Banka dhe një numër negativ nëse është një shitje.

Kodi për prodhuesin duket si ky:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Më pas, duke përdorur metodën e dërgimit, ne dërgojmë një mesazh në server, në temën që na nevojitet, në formatin JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Kur ekzekutojmë skriptin, marrim mesazhet e mëposhtme në terminal:

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Kjo do të thotë që gjithçka funksionon siç kemi dashur - prodhuesi gjeneron dhe dërgon mesazhe në temën që na nevojitet.
Hapi tjetër është të instaloni Spark dhe të përpunoni këtë transmetim mesazhesh.

Instalimi i Apache Spark

Apache Spark është një platformë kompjuterike e grupeve universale dhe me performancë të lartë.

Spark performon më mirë se implementimet e njohura të modelit MapReduce ndërsa mbështet një gamë më të gjerë të llojeve të llogaritjes, duke përfshirë pyetjet interaktive dhe përpunimin e transmetimit. Shpejtësia luan një rol të rëndësishëm gjatë përpunimit të sasive të mëdha të të dhënave, pasi është shpejtësia që ju lejon të punoni në mënyrë interaktive pa shpenzuar minuta ose orë në pritje. Një nga pikat më të forta të Spark që e bën atë kaq të shpejtë është aftësia e tij për të kryer llogaritjet në memorie.

Ky kuadër është shkruar në Scala, kështu që duhet ta instaloni së pari:

sudo apt-get install scala

Shkarkoni shpërndarjen e Spark nga faqja zyrtare e internetit:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Shpaketoni arkivin:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Shtoni shtegun e Spark në skedarin bash:

vim ~/.bashrc

Shtoni rreshtat e mëposhtëm përmes redaktorit:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Ekzekutoni komandën më poshtë pasi të bëni ndryshime në bashrc:

source ~/.bashrc

Vendosja e AWS PostgreSQL

E tëra që mbetet është të vendosim bazën e të dhënave në të cilën do të ngarkojmë informacionin e përpunuar nga transmetimet. Për këtë ne do të përdorim shërbimin AWS RDS.

Shkoni te tastiera AWS -> AWS RDS -> Bazat e të dhënave -> Krijoni bazën e të dhënave:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Zgjidhni PostgreSQL dhe klikoni Next:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Sepse Ky shembull është vetëm për qëllime edukative; ne do të përdorim një server falas "në minimum" (Free Tier):
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Më pas, vendosim një shenjë në bllokun Free Tier, dhe pas kësaj do të na ofrohet automatikisht një shembull i klasës t2.micro - megjithëse i dobët, ai është falas dhe mjaft i përshtatshëm për detyrën tonë:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Më pas vijnë gjëra shumë të rëndësishme: emri i shembullit të bazës së të dhënave, emri i përdoruesit kryesor dhe fjalëkalimi i tij. Le të emërtojmë shembullin: myHabrTest, përdoruesi kryesor: habr, fjalëkalimi: habr12345 dhe klikoni në butonin Next:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Në faqen tjetër ka parametra përgjegjës për aksesin e serverit tonë të bazës së të dhënave nga jashtë (Qasshmëria publike) dhe disponueshmëria e portit:

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Le të krijojmë një cilësim të ri për grupin e sigurisë VPC, i cili do të lejojë akses të jashtëm në serverin tonë të bazës së të dhënave nëpërmjet portit 5432 (PostgreSQL).
Le të shkojmë në tastierën AWS në një dritare të veçantë të shfletuesit te paneli VPC -> Grupet e Sigurisë -> Krijo seksionin e grupit të sigurisë:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Ne vendosëm emrin për grupin e Sigurisë - PostgreSQL, një përshkrim, tregoni se me cilin VPC duhet të lidhet ky grup dhe klikoni butonin Krijo:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Plotësoni rregullat Inbound për portin 5432 për grupin e sapokrijuar, siç tregohet në foton më poshtë. Ju nuk mund ta specifikoni portin me dorë, por zgjidhni PostgreSQL nga lista rënëse Lloji.

Në mënyrë të rreptë, vlera ::/0 nënkupton disponueshmërinë e trafikut hyrës në server nga e gjithë bota, gjë që kanonikisht nuk është plotësisht e vërtetë, por për të analizuar shembullin, le t'i lejojmë vetes të përdorim këtë qasje:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Kthehemi në faqen e shfletuesit, ku kemi hapur “Configure advanced settings” dhe zgjedhim në seksionin e grupeve të sigurisë VPC -> Zgjidhni grupet ekzistuese të sigurisë VPC -> PostgreSQL:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Më pas, në opsionet e bazës së të dhënave -> Emri i bazës së të dhënave -> vendosni emrin - habrDB.

Mund t'i lëmë parametrat e mbetur, me përjashtim të çaktivizimit të rezervimit (periudha e ruajtjes së rezervës - 0 ditë), monitorimit dhe Vështrimeve të Performancës, si parazgjedhje. Klikoni në butonin Krijo bazën e të dhënave:
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Mbajtës i fijeve

Faza përfundimtare do të jetë zhvillimi i një pune Spark, e cila do të përpunojë të dhëna të reja që vijnë nga Kafka çdo dy sekonda dhe do të futë rezultatin në bazën e të dhënave.

Siç u përmend më lart, pikat e kontrollit janë një mekanizëm thelbësor në SparkStreaming që duhet të konfigurohet për të siguruar tolerancën e gabimeve. Ne do të përdorim pikat e kontrollit dhe, nëse procedura dështon, moduli Spark Streaming do të duhet vetëm të kthehet në pikën e fundit të kontrollit dhe të rifillojë llogaritjet prej tij për të rikuperuar të dhënat e humbura.

Pika e kontrollit mund të aktivizohet duke vendosur një direktori në një sistem skedari tolerant ndaj gabimeve dhe të besueshëm (si HDFS, S3, etj.) në të cilin do të ruhen informacionet e pikës së kontrollit. Kjo bëhet duke përdorur, për shembull:

streamingContext.checkpoint(checkpointDirectory)

Në shembullin tonë, ne do të përdorim qasjen e mëposhtme, domethënë, nëse ekziston Lista e pikave të kontrollit, atëherë konteksti do të rikrijohet nga të dhënat e pikës së kontrollit. Nëse drejtoria nuk ekziston (d.m.th. ekzekutohet për herë të parë), atëherë funksioniToCreateContext thirret për të krijuar një kontekst të ri dhe për të konfiguruar DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ne krijojmë një objekt DirectStream për t'u lidhur me temën e "transaksionit" duke përdorur metodën createDirectStream të bibliotekës KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analiza e të dhënave hyrëse në formatin JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Duke përdorur Spark SQL, ne bëjmë një grupim të thjeshtë dhe shfaqim rezultatin në tastierë:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Marrja e tekstit të pyetjes dhe ekzekutimi i tij përmes Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Dhe pastaj ne i ruajmë të dhënat e grumbulluara që rezultojnë në një tabelë në AWS RDS. Për të ruajtur rezultatet e grumbullimit në një tabelë të bazës së të dhënave, ne do të përdorim metodën e shkrimit të objektit DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Disa fjalë për vendosjen e një lidhjeje me AWS RDS. Ne krijuam përdoruesin dhe fjalëkalimin për të në hapin "Përdorimi i AWS PostgreSQL". Ju duhet të përdorni Endpoint si url-në e serverit të bazës së të dhënave, e cila shfaqet në seksionin Lidhshmëria dhe siguria:

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Në mënyrë që të lidhni saktë Spark dhe Kafka, duhet ta kryeni punën përmes smark-submit duke përdorur objektin spark-streaming-kafka-0-8_2.11. Përveç kësaj, ne do të përdorim gjithashtu një objekt për të bashkëvepruar me bazën e të dhënave PostgreSQL; ne do t'i transferojmë ato përmes --paketave.

Për fleksibilitetin e skriptit, ne do të përfshijmë gjithashtu si parametra hyrës emrin e serverit të mesazheve dhe temën nga e cila duam të marrim të dhëna.

Pra, është koha për të nisur dhe kontrolluar funksionalitetin e sistemit:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Gjithçka funksionoi! Siç mund ta shihni në foton më poshtë, ndërsa aplikacioni është duke u ekzekutuar, rezultatet e reja të grumbullimit dalin çdo 2 sekonda, sepse ne e vendosëm intervalin e grupit në 2 sekonda kur krijuam objektin StreamingContext:

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Më pas, ne bëjmë një pyetje të thjeshtë në bazën e të dhënave për të kontrolluar praninë e të dhënave në tabelë rrjedha_transaksioni:

Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming

Përfundim

Ky artikull shikoi një shembull të përpunimit të rrjedhës së informacionit duke përdorur Spark Streaming në lidhje me Apache Kafka dhe PostgreSQL. Me rritjen e të dhënave nga burime të ndryshme, është e vështirë të mbivlerësohet vlera praktike e Spark Streaming për krijimin e aplikacioneve të transmetimit dhe në kohë reale.

Mund ta gjeni kodin e plotë burimor në depon time në GitHub.

Jam i lumtur të diskutoj këtë artikull, pres me padurim komentet tuaja dhe gjithashtu shpresoj për kritika konstruktive nga të gjithë lexuesit e kujdesshëm.

Ju uroj suksese!

. Fillimisht ishte planifikuar të përdorja një bazë të dhënash lokale PostgreSQL, por duke pasur parasysh dashurinë time për AWS, vendosa ta zhvendos bazën e të dhënave në cloud. Në artikullin tjetër mbi këtë temë, unë do të tregoj se si të zbatohet i gjithë sistemi i përshkruar më sipër në AWS duke përdorur AWS Kinesis dhe AWS EMR. Ndiqni lajmet!

Burimi: www.habr.com

Shto një koment