Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Kaixo, Habr! Gaur Apache Kafka mezu-jarioak Spark Streaming erabiliz prozesatu eta prozesatzeko emaitzak AWS RDS hodeiko datu-basean idatziko dituen sistema bat eraikiko dugu.

Imajina dezagun kreditu-erakunde jakin batek ezartzen digula sarrerako eragiketak «bere adar guztietan» prozesatzeko zeregina. Hau egin daiteke diruzaintzako diru-posizio irekia berehala kalkulatzeko, transakzioetarako mugak edo finantza-emaitzak, etab.

Nola ezarri kasu hau magia eta sorginkeriarik erabili gabe - irakurri ebaki azpian! Joan!

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin
(Irudiaren iturria)

Sarrera

Jakina, datu kopuru handia denbora errealean prozesatzeak aukera zabalak eskaintzen ditu sistema modernoetan erabiltzeko. Horretarako konbinazio ezagunenetako bat Apache Kafka eta Spark Streaming-en tandema da, non Kafkak sarrerako mezu-paketeen korrontea sortzen duen eta Spark Streaming-ek pakete horiek denbora-tarte jakin batean prozesatzen dituen.

Aplikazioaren akatsen tolerantzia handitzeko, kontrol-puntuak erabiliko ditugu. Mekanismo honekin, Spark Streaming motorrak galdutako datuak berreskuratu behar dituenean, azken kontrol-puntura itzuli eta handik kalkuluei berriro ekin behar die.

Garatutako sistemaren arkitektura

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Erabilitako osagaiak:

  • Apache Kafka argitalpen-harpidetza banatutako mezularitza sistema da. Lineaz kanpoko zein sareko mezuen kontsumorako egokia. Datuak gal ez daitezen, Kafkaren mezuak diskoan gordetzen dira eta kluster barruan errepikatzen dira. Kafka sistema ZooKeeper sinkronizazio zerbitzuaren gainean eraikita dago;
  • Apache Spark erreprodukzioa - Streaming datuak prozesatzeko Spark osagaia. Spark Streaming modulua mikro-batch arkitektura erabiliz eraiki da, non datu-korrontea datu-pakete txikien sekuentzia jarraitu gisa interpretatzen den. Spark Streaming-ek iturri ezberdinetako datuak hartzen ditu eta pakete txikietan konbinatzen ditu. Pakete berriak aldizka sortzen dira. Denbora-tarte bakoitzaren hasieran, pakete berri bat sortzen da, eta tarte horretan jasotako datu guztiak paketean sartzen dira. Tartearen amaieran, paketeen hazkundea gelditzen da. Tartearen tamaina batch tartea izeneko parametro batek zehazten du;
  • Apache Spark SQL - Erlazio-prozesamendua Spark-en programazio funtzionalarekin konbinatzen ditu. Datu egituratuek eskema bat duten datuak esan nahi dituzte, hau da, erregistro guztien eremu multzo bakarra. Spark SQL-k hainbat datu-iturri egituratuen sarrera onartzen du eta, eskemaren informazioaren erabilgarritasunari esker, beharrezkoak diren erregistroen eremuak soilik berreskura ditzake eraginkortasunez, eta DataFrame APIak ere eskaintzen ditu;
  • AWS RDS Hodeian oinarritutako erlazio datu-base nahiko merke bat da, konfigurazioa, funtzionamendua eta eskalatzea errazten dituen web-zerbitzua, eta Amazonek zuzenean kudeatzen du.

Kafka zerbitzaria instalatu eta exekutatzen

Kafka zuzenean erabili aurretik, Java duzula ziurtatu behar duzu, izan ere... JVM lanerako erabiltzen da:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Sortu dezagun erabiltzaile berri bat Kafkarekin lan egiteko:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ondoren, deskargatu banaketa Apache Kafka webgune ofizialetik:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Deskonpaktatu deskargatutako artxiboa:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Hurrengo urratsa aukerakoa da. Kontua da ezarpen lehenetsiek ez dizutela uzten Apache Kafkaren gaitasun guztiak guztiz erabiltzeko. Adibidez, ezabatu mezuak argitaratzeko gai, kategoria, talde bat. Hau aldatzeko, edita dezagun konfigurazio fitxategia:

vim ~/kafka/config/server.properties

Gehitu ondorengoa fitxategiaren amaieran:

delete.topic.enable = true

Kafka zerbitzaria hasi aurretik, ZooKeeper zerbitzaria abiarazi behar duzu; Kafka banaketarekin datorren script laguntzailea erabiliko dugu:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper arrakastaz hasi ondoren, abiarazi Kafka zerbitzaria beste terminal batean:

bin/kafka-server-start.sh config/server.properties

Sor dezagun Transakzioa izeneko gai berri bat:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ziurtatu behar den partizio eta erreplika kopurua duen gai bat sortu dela:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Galdu ditzagun ekoizlea eta kontsumitzaileari sortu berri den gaiari buruzko probak egiteko uneak. Mezuak bidali eta jasotzeari buruzko xehetasun gehiago dokumentazio ofizialean idatzita daude - Bidali mezu batzuk. Beno, ekoizle bat idaztera pasako gara Python-en KafkaProducer APIa erabiliz.

Ekoizlearen idazketa

Ekoizleak ausazko datuak sortuko ditu - 100 mezu segundo bakoitzean. Ausazko datuekin hiru eremuz osatutako hiztegia esan nahi dugu:

  • Adarra — kreditu-erakundearen salmenta-puntuaren izena;
  • Moneta - transakzio-moneta;
  • zenbatekoa - Transakzio zenbatekoa. Zenbatekoa zenbaki positiboa izango da Bankuak moneta erosketa bat bada, eta zenbaki negatiboa salmenta bada.

Ekoizlearen kodea honelakoa da:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ondoren, bidali metodoa erabiliz, mezu bat bidaliko diogu zerbitzariari, behar dugun gaiari, JSON formatuan:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Scripta exekutatzean, mezu hauek jasotzen ditugu terminalean:

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Horrek esan nahi du dena nahi genuen bezala funtzionatzen duela: ekoizleak behar dugun gaiari mezuak sortzen eta bidaltzen dizkio.
Hurrengo urratsa Spark instalatzea eta mezu-jario hau prozesatzea da.

Apache Spark instalatzen

Apache Spark cluster informatika plataforma unibertsala eta errendimendu handikoa da.

Spark-ek MapReduce ereduaren inplementazio ezagunak baino hobeto funtzionatzen du, konputazio mota ugari onartzen dituen bitartean, kontsulta interaktiboak eta korronteen prozesamendua barne. Abiadurak paper garrantzitsua betetzen du datu kopuru handiak prozesatzen direnean, abiadura delako interaktiboki lan egiteko aukera ematen baitu minutu edo ordurik itxaron gabe. Hain azkar egiten duen Spark-en indargune handienetako bat memorian kalkuluak egiteko gaitasuna da.

Esparru hau Scala-n idatzita dago, beraz, lehenik instalatu behar duzu:

sudo apt-get install scala

Deskargatu Spark banaketa webgune ofizialetik:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Deskonprimitu artxiboa:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Gehitu Spark-erako bidea bash fitxategira:

vim ~/.bashrc

Gehitu lerro hauek editorearen bidez:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Exekutatu beheko komandoa bashrc-en aldaketak egin ondoren:

source ~/.bashrc

AWS PostgreSQL inplementatzea

Korronteetatik prozesatutako informazioa igoko dugun datu-basea zabaltzea besterik ez da geratzen. Horretarako AWS RDS zerbitzua erabiliko dugu.

Joan AWS kontsolara -> AWS RDS -> Datu-baseak -> Sortu datu-basea:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Hautatu PostgreSQL eta egin klik Hurrengoa:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Zeren Adibide hau hezkuntza-helburuetarako soilik da; doako zerbitzari bat erabiliko dugu "gutxienez" (doako maila):
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Ondoren, tick bat jarri dugu Doako Maila blokean, eta horren ostean automatikoki t2.micro klasearen instantzia bat eskainiko zaigu - ahula bada ere, doakoa da eta nahiko egokia da gure zereginerako:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Ondoren, gauza oso garrantzitsuak datoz: datu-basearen instantziaren izena, erabiltzaile nagusiaren izena eta bere pasahitza. Izena eman diezaiogun instantziari: myHabrTest, erabiltzaile maisua: habr, pasahitza: habr12345 eta egin klik Hurrengoa botoian:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Hurrengo orrialdean gure datu-basearen zerbitzariaren irisgarritasunaz kanpotik (Irisgarritasun publikoa) eta portuaren erabilgarritasunaz arduratzen diren parametroak daude:

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Sortu dezagun ezarpen berri bat VPC segurtasun-taldearentzat, gure datu-basearen zerbitzarirako kanpoko sarbidea ahalbidetuko duena 5432 atakaren bidez (PostgreSQL).
Goazen AWS kontsolara arakatzailearen leiho bereizi batean VPC Arbelera -> Segurtasun Taldeak -> Sortu segurtasun taldea atalera:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Segurtasun taldearen izena ezarri dugu - PostgreSQL, deskribapen bat, adierazi zein VPCrekin lotu behar den talde hau eta egin klik Sortu botoian:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Bete 5432 atakarako sarrerako arauak sortu berri den taldearentzat, beheko irudian ikusten den moduan. Ezin duzu portua eskuz zehaztu, baina hautatu PostgreSQL Mota goitibeherako zerrendan.

Zorrotz esanda, ::/0 balioak mundu osoko zerbitzarirako sarrerako trafikoaren erabilgarritasuna esan nahi du, eta hori kanonikoki ez da guztiz egia, baina adibidea aztertzeko, utz diezaiogun ikuspegi hau erabiltzeko:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Arakatzailearen orrira itzuliko gara, non irekita daukagun “Konfiguratu ezarpen aurreratuak” eta hautatu VPC segurtasun taldeak atalean -> Aukeratu lehendik dauden VPC segurtasun taldeak -> PostgreSQL:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Ondoren, Datu-basearen aukeretan -> Datu-basearen izena -> ezarri izena - habrDB.

Gainerako parametroak utzi ditzakegu, babeskopia desgaitzea (backup atxikipen epea - 0 egun), monitorizazioa eta Performance Insights izan ezik, lehenespenez. Egin klik botoian Sortu datu-basea:
Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Hari-kudeatzailea

Azken fasea Spark lan baten garapena izango da, Kafkatik datozen datu berriak bi segundoro prozesatu eta emaitza datu-basean sartuko duena.

Goian adierazi bezala, kontrol-puntuak SparkStreaming-en oinarrizko mekanismo bat dira, akatsen tolerantzia bermatzeko konfiguratu behar dena. Kontrol-puntuak erabiliko ditugu eta, prozedurak huts egiten badu, Spark Streaming moduluak azkeneko kontrol-puntura itzuli eta bertatik kalkuluak berreskuratu beharko ditu galdutako datuak berreskuratzeko.

Checkpointing aktibatu daiteke akatsak tolerantea den fitxategi-sistema fidagarri batean (adibidez, HDFS, S3, etab.) direktorio bat ezarriz eta bertan kontrol puntuaren informazioa gordeko den. Hau erabiltzen da, adibidez:

streamingContext.checkpoint(checkpointDirectory)

Gure adibidean, hurrengo ikuspegia erabiliko dugu, hots, checkpointDirectory existitzen bada, orduan testuingurua birsortuko da checkpoint datuetatik. Direktorioa existitzen ez bada (hau da, lehen aldiz exekutatuta), functionToCreateContext deitzen da testuinguru berri bat sortzeko eta DStreams konfiguratzeko:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

DirectStream objektu bat sortzen dugu "transakzio" gaira konektatzeko KafkaUtils liburutegiko createDirectStream metodoa erabiliz:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Jasotako datuak JSON formatuan analizatzen:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL erabiliz, taldekatze sinple bat egiten dugu eta emaitza kontsolara ateratzen dugu:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kontsulta testua lortu eta Spark SQL bidez exekutatu:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ondoren, lortutako datu agregatuak AWS RDS-en taula batean gordetzen ditugu. Agregazio-emaitzak datu-baseko taula batean gordetzeko, DataFrame objektuaren idazketa metodoa erabiliko dugu:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS-rako konexioa konfiguratzeari buruzko hitz batzuk. Erabiltzailea eta pasahitza sortu ditugu "AWS PostgreSQL inplementatzen" urratsean. Endpoint datu-base zerbitzariaren URL gisa erabili beharko zenuke, Konektibitatea eta segurtasuna atalean bistaratzen dena:

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Spark eta Kafka behar bezala konektatzeko, lana smark-submit bidez exekutatu beharko zenuke artefaktua erabiliz spark-streaming-kafka-0-8_2.11. Gainera, PostgreSQL datu-basearekin elkarreragiteko artefaktu bat ere erabiliko dugu; --packages bidez transferituko ditugu.

Script-aren malgutasunerako, sarrera-parametro gisa ere sartuko ditugu mezu-zerbitzariaren izena eta datuak jaso nahi ditugun gaia.

Beraz, sistemaren funtzionaltasuna abiarazteko eta egiaztatzeko unea da:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Dena ondo atera zen! Beheko irudian ikus dezakezun bezala, aplikazioa exekutatzen ari den bitartean, agregazio-emaitza berriak 2 segunduro ateratzen dira, StreamingContext objektua sortzen dugunean 2 segundotan ezartzen baitugu loteen tartea:

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Ondoren, datu-baseari kontsulta erraz bat egiten diogu taulan erregistroen presentzia egiaztatzeko transakzio_fluxua:

Apache Kafka eta Streaming Datu-prozesatzea Spark Streaming-ekin

Ondorioa

Artikulu honek Spark Streaming erabiliz informazioa korrontearen prozesamenduaren adibide bat aztertu du Apache Kafka eta PostgreSQL-ekin batera. Hainbat iturritako datuen hazkuntzarekin, zaila da Spark Streaming-en balio praktikoa gehiegi balioestea streaming eta denbora errealeko aplikazioak sortzeko.

Iturburu-kode osoa nire biltegian aurki dezakezu hemen GitHub.

Pozik nago artikulu hau eztabaidatzeaz, zure iruzkinak espero ditut eta irakurle arduratsu guztien kritika eraikitzaileak ere espero ditut.

Nahi dut arrakasta!

Ps. Hasieran tokiko PostgreSQL datu-base bat erabiltzea aurreikusi zen, baina AWSrekiko maitasuna ikusita, datu-basea hodeira eramatea erabaki nuen. Gai honi buruzko hurrengo artikuluan, goian deskribatutako sistema osoa AWS-n AWS Kinesis eta AWS EMR erabiliz nola inplementatu erakutsiko dut. Jarraitu albistea!

Iturria: www.habr.com

Gehitu iruzkin berria