„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Sveiki, Habr! Šiandien sukursime sistemą, kuri apdoros „Apache Kafka“ pranešimų srautus naudodami „Spark Streaming“ ir rašys apdorojimo rezultatus į AWS RDS debesų duomenų bazę.

Įsivaizduokime, kad tam tikra kredito įstaiga iškelia mums užduotį apdoroti gaunamas operacijas „skraidydamas“ visuose jos filialuose. Tai galima padaryti norint greitai apskaičiuoti atvirą valiutos poziciją iždui, limitus ar finansinius operacijų rezultatus ir pan.

Kaip įgyvendinti šį atvejį nenaudojant magijos ir magiškų burtų – skaitykite po pjūviu! Pirmyn!

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.
(Vaizdo šaltinis)

įvedimas

Žinoma, didelio duomenų kiekio apdorojimas realiu laiku suteikia plačias panaudojimo galimybes šiuolaikinėse sistemose. Vienas iš populiariausių tam skirtų derinių yra „Apache Kafka“ ir „Spark Streaming“ tandemas, kai „Kafka“ sukuria gaunamų pranešimų paketų srautą, o „Spark Streaming“ apdoroja šiuos paketus tam tikru laiko intervalu.

Norėdami padidinti programos atsparumą gedimams, naudosime kontrolinius taškus. Naudojant šį mechanizmą, kai „Spark Streaming“ varikliui reikia atkurti prarastus duomenis, jam tereikia grįžti į paskutinį kontrolinį tašką ir iš ten tęsti skaičiavimus.

Sukurtos sistemos architektūra

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Naudojami komponentai:

  • Apache Kafka yra paskirstyta publikavimo ir prenumeratos pranešimų sistema. Tinka tiek neprisijungus, tiek prisijungus žinutėms. Siekiant išvengti duomenų praradimo, Kafka pranešimai saugomi diske ir dauginami klasteryje. Kafka sistema sukurta naudojant ZooKeeper sinchronizavimo paslaugą;
  • „Apache Spark“ srautas - Spark komponentas srautinio perdavimo duomenims apdoroti. Spark Streaming modulis sukurtas naudojant mikropaketinę architektūrą, kur duomenų srautas interpretuojamas kaip nenutrūkstama mažų duomenų paketų seka. Spark Streaming ima duomenis iš skirtingų šaltinių ir sujungia juos į mažus paketus. Reguliariai kuriami nauji paketai. Kiekvieno laiko intervalo pradžioje sukuriamas naujas paketas, o visi per tą intervalą gauti duomenys įtraukiami į paketą. Pasibaigus intervalui, paketų augimas sustoja. Intervalo dydis nustatomas pagal parametrą, vadinamą partijos intervalu;
  • Apache Spark SQL - sujungia reliacinį apdorojimą su Spark funkciniu programavimu. Struktūriniai duomenys reiškia duomenis, kurie turi schemą, ty vieną laukų rinkinį visiems įrašams. „Spark SQL“ palaiko įvestį iš įvairių struktūrizuotų duomenų šaltinių ir dėl schemos informacijos prieinamumo gali efektyviai gauti tik reikiamus įrašų laukus, taip pat teikia DataFrame API;
  • AWS RDS yra santykinai nebrangi debesų pagrindu sukurta reliacinė duomenų bazė, žiniatinklio paslauga, kuri supaprastina sąranką, veikimą ir mastelio keitimą, kurią tiesiogiai administruoja „Amazon“.

Kafka serverio diegimas ir paleidimas

Prieš naudodami Kafka tiesiogiai, turite įsitikinti, kad turite Java, nes... JVM naudojamas darbui:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Sukurkime naują vartotoją, kuris dirbs su Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Tada atsisiųskite platinimą iš oficialios „Apache Kafka“ svetainės:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Išpakuokite atsisiųstą archyvą:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Kitas žingsnis yra neprivalomas. Faktas yra tas, kad numatytieji nustatymai neleidžia visiškai išnaudoti visų „Apache Kafka“ funkcijų. Pavyzdžiui, ištrinkite temą, kategoriją, grupę, kuriai galima skelbti pranešimus. Norėdami tai pakeisti, redaguokite konfigūracijos failą:

vim ~/kafka/config/server.properties

Failo pabaigoje pridėkite:

delete.topic.enable = true

Prieš paleisdami „Kafka“ serverį, turite paleisti „ZooKeeper“ serverį; mes naudosime pagalbinį scenarijų, kuris pateikiamas su Kafka paskirstymu:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Sėkmingai paleidę „ZooKeeper“, paleiskite „Kafka“ serverį atskirame terminale:

bin/kafka-server-start.sh config/server.properties

Sukurkime naują temą pavadinimu Sandoris:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Įsitikinkite, kad sukurta tema su reikiamu skaidinių skaičiumi ir replikacija:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Praleiskime prodiuserio ir vartotojo išbandymo akimirkas naujai kuriamai temai. Daugiau informacijos apie tai, kaip galite išbandyti pranešimų siuntimą ir gavimą, parašyta oficialioje dokumentacijoje - Išsiųskite keletą žinučių. Na, mes pereiname prie gamintojo rašymo Python, naudodami KafkaProducer API.

Prodiuserio rašymas

Gamintojas generuos atsitiktinius duomenis – 100 pranešimų kas sekundę. Atsitiktiniais duomenimis turime omenyje žodyną, sudarytą iš trijų laukų:

  • Filialas — kredito įstaigos pardavimo vietos pavadinimas;
  • valiuta — operacijos valiuta;
  • suma - sandorio suma. Suma bus teigiamas skaičius, jei tai yra banko valiutos pirkimas, ir neigiamas skaičius, jei tai yra pardavimas.

Gamintojo kodas atrodo taip:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Tada, naudodami siuntimo metodą, siunčiame pranešimą į serverį, mums reikiama tema, JSON formatu:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Vykdydami scenarijų terminale gauname šiuos pranešimus:

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Tai reiškia, kad viskas veikia taip, kaip norėjome – prodiuseris generuoja ir siunčia žinutes mums reikalinga tema.
Kitas žingsnis yra įdiegti „Spark“ ir apdoroti šį pranešimų srautą.

„Apache Spark“ diegimas

Apache Spark yra universali ir didelio našumo klasterinio skaičiavimo platforma.

„Spark“ veikia geriau nei populiarūs „MapReduce“ modelio diegimai, kartu palaikydama platesnį skaičiavimo tipų spektrą, įskaitant interaktyvias užklausas ir srauto apdorojimą. Greitis vaidina svarbų vaidmenį apdorojant didelius duomenų kiekius, nes būtent greitis leidžia dirbti interaktyviai, nelaukiant minučių ar valandų. Viena didžiausių „Spark“ privalumų, dėl kurių jis toks greitas, yra galimybė atlikti skaičiavimus atmintyje.

Ši sistema parašyta Scala, todėl pirmiausia turite ją įdiegti:

sudo apt-get install scala

Atsisiųskite „Spark“ platinimą iš oficialios svetainės:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Išpakuokite archyvą:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Pridėkite kelią į Spark į bash failą:

vim ~/.bashrc

Redagavimo priemonėje pridėkite šias eilutes:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Atlikę bashrc pakeitimus, paleiskite toliau pateiktą komandą:

source ~/.bashrc

AWS PostgreSQL diegimas

Belieka dislokuoti duomenų bazę, į kurią įkelsime apdorotą informaciją iš srautų. Tam naudosime AWS RDS paslaugą.

Eikite į AWS konsolę -> AWS RDS -> Duomenų bazės -> Kurti duomenų bazę:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Pasirinkite PostgreSQL ir spustelėkite Pirmyn:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Nes Šis pavyzdys skirtas tik švietimo tikslams; naudosime nemokamą serverį „mažiausiai“ (nemokama pakopa):
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Toliau pažymime varnelę „Free Tier“ bloke, o po to mums automatiškai bus pasiūlytas t2.micro klasės egzempliorius - nors ir silpnas, jis yra nemokamas ir gana tinkamas mūsų užduočiai:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Toliau ateina labai svarbūs dalykai: duomenų bazės egzemplioriaus pavadinimas, pagrindinio vartotojo vardas ir jo slaptažodis. Pavadinkime egzempliorių: myHabrTest, pagrindinis vartotojas: habr, Slaptažodis: habr12345 ir spustelėkite mygtuką Kitas:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Kitame puslapyje yra parametrai, atsakingi už mūsų duomenų bazės serverio pasiekiamumą iš išorės (viešasis prieinamumas) ir prievado prieinamumą:

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Sukurkime naują VPC saugos grupės nustatymą, kuris leis išorinę prieigą prie mūsų duomenų bazės serverio per prievadą 5432 (PostgreSQL).
Eikime į AWS konsolę atskirame naršyklės lange į VPC prietaisų skydelį -> Saugos grupės -> Sukurti saugos grupę:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Nustatome saugos grupės pavadinimą - PostgreSQL, aprašą, nurodykite, su kuriuo VPC ši grupė turi būti susieta, ir spustelėkite mygtuką Sukurti:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Užpildykite naujai sukurtos grupės 5432 prievado įeinančias taisykles, kaip parodyta paveikslėlyje žemiau. Negalite nurodyti prievado rankiniu būdu, bet išskleidžiamajame sąraše Tipas pasirinkite PostgreSQL.

Griežtai kalbant, reikšmė ::/0 reiškia įeinančio srauto prieinamumą į serverį iš viso pasaulio, o tai kanoniškai nėra visiškai tiesa, tačiau norėdami paanalizuoti pavyzdį, leiskite sau naudoti šį metodą:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Grįžtame į naršyklės puslapį, kuriame atidarytas „Konfigūruoti išplėstinius nustatymus“ ir skiltyje VPC saugos grupės pasirinkite -> Pasirinkti esamas VPC saugos grupes -> PostgreSQL:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Tada duomenų bazės parinktyse -> Duomenų bazės pavadinimas -> nustatykite pavadinimą - habrDB.

Likusius parametrus, išskyrus atsarginės kopijos išjungimą (atsarginės kopijos saugojimo laikotarpis – 0 dienų), stebėjimą ir Performance Insights, galime palikti pagal numatytuosius nustatymus. Spustelėkite mygtuką Sukurti duomenų bazę:
„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Siūlų tvarkytojas

Paskutiniame etape bus sukurtas Spark darbas, kuris kas dvi sekundes apdoros naujus iš Kafkos gaunamus duomenis ir įves rezultatus į duomenų bazę.

Kaip minėta pirmiau, kontroliniai taškai yra pagrindinis „SparkStreaming“ mechanizmas, kurį reikia sukonfigūruoti, kad būtų užtikrintas atsparumas gedimams. Naudosime kontrolinius taškus ir, jei procedūra nepavyks, Spark Streaming moduliui tereikės grįžti į paskutinį kontrolinį tašką ir iš jo tęsti skaičiavimus, kad atkurtų prarastus duomenis.

Kontrolinį tašką galima įjungti nustatant gedimams atsparios, patikimos failų sistemos katalogą (pvz., HDFS, S3 ir kt.), kuriame bus saugoma patikros taško informacija. Tai atliekama naudojant, pavyzdžiui:

streamingContext.checkpoint(checkpointDirectory)

Savo pavyzdyje naudosime šį metodą, ty jei checkpointDirectory egzistuoja, kontekstas bus atkurtas iš kontrolinio taško duomenų. Jei katalogo nėra (t. y. vykdomas pirmą kartą), tada funkcija functionToCreateContext iškviečiama sukurti naują kontekstą ir sukonfigūruoti DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Sukuriame „DirectStream“ objektą, kad prisijungtume prie „operacijos“ temos, naudodami KafkaUtils bibliotekos „createDirectStream“ metodą:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Gaunamų duomenų analizavimas JSON formatu:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Naudodami Spark SQL atliekame paprastą grupavimą ir rodome rezultatą konsolėje:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Užklausos teksto gavimas ir jo vykdymas naudojant „Spark SQL“:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Tada išsaugome gautus apibendrintus duomenis į AWS RDS lentelę. Norėdami išsaugoti agregavimo rezultatus duomenų bazės lentelėje, naudosime DataFrame objekto rašymo metodą:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Keletas žodžių apie ryšio su AWS RDS nustatymą. „AWS PostgreSQL diegimas“ veiksme sukūrėme vartotoją ir slaptažodį. Turėtumėte naudoti Endpoint kaip duomenų bazės serverio URL, kuris rodomas skiltyje Ryšiai ir sauga:

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Norėdami teisingai sujungti „Spark“ ir „Kafka“, turėtumėte paleisti užduotį naudodami „smark-submit“ naudodami artefaktą spark-streaming-kafka-0-8_2.11. Be to, sąveikaudami su PostgreSQL duomenų baze taip pat naudosime artefaktą; juos perkelsime naudodami paketus.

Dėl scenarijaus lankstumo kaip įvesties parametrus taip pat įtrauksime pranešimų serverio pavadinimą ir temą, iš kurios norime gauti duomenis.

Taigi, laikas paleisti ir patikrinti sistemos funkcionalumą:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Viskas pavyko! Kaip matote toliau esančiame paveikslėlyje, kai programa veikia, nauji agregavimo rezultatai išvedami kas 2 sekundes, nes kurdami StreamingContext objektą nustatėme 2 sekundžių paketų intervalą:

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

Toliau pateikiame paprastą duomenų bazės užklausą, kad patikrintume, ar lentelėje yra įrašų sandorio_srautas:

„Apache Kafka“ ir srautinio perdavimo duomenų apdorojimas naudojant „Spark Streaming“.

išvada

Šiame straipsnyje apžvelgtas informacijos srautinio apdorojimo, naudojant Spark Streaming kartu su Apache Kafka ir PostgreSQL, pavyzdys. Daugėjant duomenų iš įvairių šaltinių, sunku pervertinti praktinę „Spark Streaming“ vertę kuriant srautinio perdavimo ir realaus laiko programas.

Visą šaltinio kodą galite rasti mano saugykloje adresu GitHub.

Džiaugiuosi galėdamas aptarti šį straipsnį, laukiu jūsų komentarų, taip pat tikiuosi konstruktyvios kritikos iš visų rūpestingų skaitytojų.

Linkiu sėkmės!

Ps. Iš pradžių buvo planuota naudoti vietinę PostgreSQL duomenų bazę, tačiau, atsižvelgdamas į mano meilę AWS, nusprendžiau perkelti duomenų bazę į debesį. Kitame straipsnyje šia tema parodysiu, kaip įdiegti visą aukščiau aprašytą sistemą AWS naudojant AWS Kinesis ir AWS EMR. Sekite naujienas!

Šaltinis: www.habr.com

Добавить комментарий