Apache Kafka i Streaming obrada podataka uz Spark Streaming

Zdravo, Habr! Danas ćemo izgraditi sistem koji će obraditi Apache Kafka tokove poruka koristeći Spark Streaming i zapisivati ​​rezultate obrade u AWS RDS cloud bazu podataka.

Zamislimo da nam određena kreditna institucija postavlja zadatak da obrađujemo pristigle transakcije „u hodu“ u svim svojim filijalama. Ovo se može učiniti u svrhu brzog izračunavanja otvorene valutne pozicije za trezor, limita ili finansijskih rezultata za transakcije itd.

Kako implementirati ovaj slučaj bez upotrebe magije i magijskih čarolija - pročitajte ispod! Idi!

Apache Kafka i Streaming obrada podataka uz Spark Streaming
(izvor slike)

Uvod

Naravno, obrada velike količine podataka u realnom vremenu pruža široke mogućnosti za upotrebu u savremenim sistemima. Jedna od najpopularnijih kombinacija za ovo je tandem Apache Kafka i Spark Streaming, gdje Kafka kreira tok dolaznih paketa poruka, a Spark Streaming obrađuje te pakete u datom vremenskom intervalu.

Da bismo povećali toleranciju na greške aplikacije, koristit ćemo kontrolne točke. Sa ovim mehanizmom, kada Spark Streaming motor treba da povrati izgubljene podatke, samo treba da se vrati na poslednju kontrolnu tačku i odatle nastavi proračune.

Arhitektura razvijenog sistema

Apache Kafka i Streaming obrada podataka uz Spark Streaming

Korištene komponente:

  • Apache Kafka je distribuirani sistem za razmjenu poruka za objavljivanje-pretplatu. Pogodno za offline i online konzumaciju poruka. Da bi se spriječio gubitak podataka, Kafka poruke se pohranjuju na disk i repliciraju unutar klastera. Kafka sistem je izgrađen na vrhu servisa za sinhronizaciju ZooKeeper;
  • Apache Spark Streaming - Spark komponenta za obradu streaming podataka. Modul Spark Streaming je izgrađen korištenjem mikro-batch arhitekture, gdje se tok podataka tumači kao kontinuirani niz malih paketa podataka. Spark Streaming uzima podatke iz različitih izvora i kombinuje ih u male pakete. Novi paketi se kreiraju u redovnim intervalima. Na početku svakog vremenskog intervala kreira se novi paket, a svi podaci primljeni tokom tog intervala se uključuju u paket. Na kraju intervala, rast paketa se zaustavlja. Veličina intervala je određena parametrom koji se zove interval serije;
  • Apache Spark SQL - kombinuje relacionu obradu sa Spark funkcionalnim programiranjem. Strukturirani podaci označavaju podatke koji imaju šemu, odnosno jedan skup polja za sve zapise. Spark SQL podržava unos iz različitih strukturiranih izvora podataka i, zahvaljujući dostupnosti informacija o šemi, može efikasno dohvatiti samo potrebna polja zapisa, a takođe pruža API-je za DataFrame;
  • AWS RDS je relativno jeftina relaciona baza podataka zasnovana na oblaku, web servis koji pojednostavljuje podešavanje, rad i skaliranje, a njime direktno administrira Amazon.

Instaliranje i pokretanje Kafka servera

Prije nego što direktno koristite Kafku, morate biti sigurni da imate Javu, jer... JVM se koristi za rad:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kreirajmo novog korisnika za rad sa Kafkom:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Zatim preuzmite distribuciju sa službene web stranice Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Raspakujte preuzetu arhivu:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Sljedeći korak nije obavezan. Činjenica je da vam zadane postavke ne dozvoljavaju da u potpunosti koristite sve mogućnosti Apache Kafke. Na primjer, izbrišite temu, kategoriju, grupu u kojoj se poruke mogu objavljivati. Da ovo promijenimo, uredimo konfiguracijski fajl:

vim ~/kafka/config/server.properties

Dodajte sljedeće na kraj datoteke:

delete.topic.enable = true

Prije pokretanja Kafka servera, morate pokrenuti server ZooKeeper; koristit ćemo pomoćnu skriptu koja dolazi s Kafka distribucijom:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Nakon što se ZooKeeper uspješno pokrene, pokrenite Kafka server u zasebnom terminalu:

bin/kafka-server-start.sh config/server.properties

Kreirajmo novu temu pod nazivom Transakcija:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Uvjerimo se da je kreirana tema sa potrebnim brojem particija i replikacije:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka i Streaming obrada podataka uz Spark Streaming

Propustimo trenutke testiranja proizvođača i potrošača za novonastalu temu. Više detalja o tome kako možete testirati slanje i primanje poruka napisano je u službenoj dokumentaciji - Pošaljite neke poruke. Pa, prelazimo na pisanje producenta u Pythonu koristeći KafkaProducer API.

Pisanje producenta

Proizvođač će generirati nasumične podatke - 100 poruka svake sekunde. Pod slučajnim podacima podrazumijevamo rječnik koji se sastoji od tri polja:

  • grana — naziv prodajnog mjesta kreditne institucije;
  • valuta — valuta transakcije;
  • iznos — iznos transakcije. Iznos će biti pozitivan broj ako se radi o kupovini valute od strane Banke, a negativan ako se radi o prodaji.

Kod za proizvođača izgleda ovako:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Zatim, koristeći metodu send, šaljemo poruku serveru, na temu koja nam je potrebna, u JSON formatu:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Prilikom pokretanja skripte, primamo sljedeće poruke u terminalu:

Apache Kafka i Streaming obrada podataka uz Spark Streaming

To znači da sve funkcioniše kako smo želeli - proizvođač generiše i šalje poruke na temu koja nam je potrebna.
Sljedeći korak je instaliranje Spark-a i obrada ovog toka poruka.

Instaliranje Apache Sparka

Apache Spark je univerzalna i klaster računarska platforma visokih performansi.

Spark radi bolje od popularnih implementacija MapReduce modela dok podržava širi raspon tipova izračunavanja, uključujući interaktivne upite i obradu toka. Brzina igra važnu ulogu pri obradi velikih količina podataka, jer je brzina ta koja vam omogućava da radite interaktivno bez trošenja minuta ili sati na čekanje. Jedna od najvećih prednosti Spark-a koja ga čini tako brzim je njegova sposobnost izvođenja računanja u memoriji.

Ovaj okvir je napisan u Scali, tako da ga prvo morate instalirati:

sudo apt-get install scala

Preuzmite Spark distribuciju sa službene web stranice:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Raspakujte arhivu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Dodajte putanju do Spark u bash fajl:

vim ~/.bashrc

Dodajte sljedeće redove kroz editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Pokrenite naredbu ispod nakon što izvršite promjene u bashrc:

source ~/.bashrc

Primena AWS PostgreSQL

Ostaje samo da se postavi baza podataka u koju ćemo učitati obrađene informacije iz streamova. Za ovo ćemo koristiti uslugu AWS RDS.

Idite na AWS konzolu -> AWS RDS -> Baze podataka -> Kreiraj bazu podataka:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Odaberite PostgreSQL i kliknite Dalje:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Jer Ovaj primjer je samo u obrazovne svrhe; koristit ćemo besplatni server "najmanje" (Free Tier):
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Zatim stavljamo kvačicu u blok Free Tier, a nakon toga će nam automatski biti ponuđena instanca klase t2.micro - iako slaba, besplatna je i sasvim prikladna za naš zadatak:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Slijede vrlo važne stvari: ime instance baze podataka, ime glavnog korisnika i njegova lozinka. Nazovimo instancu: myHabrTest, glavni korisnik: habr, lozinka: habr12345 i kliknite na dugme Dalje:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Na sljedećoj stranici nalaze se parametri odgovorni za pristupačnost našeg servera baze podataka izvana (Public accessibility) i dostupnost porta:

Apache Kafka i Streaming obrada podataka uz Spark Streaming

Kreirajmo novu postavku za VPC sigurnosnu grupu, koja će omogućiti eksterni pristup našem serveru baze podataka preko porta 5432 (PostgreSQL).
Idemo na AWS konzolu u zasebnom prozoru pretraživača na VPC Dashboard -> Sigurnosne grupe -> Kreiraj odjeljak sigurnosne grupe:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Postavljamo naziv za Sigurnosnu grupu - PostgreSQL, opis, označavamo sa kojim VPC-om ova grupa treba biti povezana i kliknemo na dugme Kreiraj:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Popunite ulazna pravila za port 5432 za novokreiranu grupu, kao što je prikazano na slici ispod. Ne možete ručno odrediti port, već odaberite PostgreSQL sa padajuće liste Tip.

Strogo govoreći, vrijednost ::/0 znači dostupnost dolaznog prometa na server iz cijelog svijeta, što kanonski nije sasvim tačno, ali da analiziramo primjer, dozvolimo sebi da koristimo ovaj pristup:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Vraćamo se na stranicu pretraživača, gdje imamo otvorenu “Konfiguriraj napredne postavke” i odabiremo u odjeljku VPC sigurnosne grupe -> Odaberite postojeće VPC sigurnosne grupe -> PostgreSQL:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Zatim, u opcijama baze podataka -> Ime baze podataka -> postavite ime - habrDB.

Preostale parametre, osim onemogućavanja sigurnosne kopije (period zadržavanja rezervne kopije - 0 dana), nadzora i uvida u performanse, možemo ostaviti po defaultu. Kliknite na dugme Kreirajte bazu podataka:
Apache Kafka i Streaming obrada podataka uz Spark Streaming

Thread handler

Završna faza će biti razvoj Spark posla, koji će svake dvije sekunde obrađivati ​​nove podatke koji dolaze od Kafke i unositi rezultat u bazu podataka.

Kao što je gore navedeno, kontrolne tačke su osnovni mehanizam u SparkStreaming-u koji se mora konfigurirati kako bi se osigurala tolerancija grešaka. Koristit ćemo kontrolne tačke i, ako procedura ne uspije, modul Spark Streaming će se samo trebati vratiti na posljednju kontrolnu tačku i nastaviti kalkulacije sa nje kako bi povratio izgubljene podatke.

Kontrolna tačka se može omogućiti postavljanjem direktorijuma na pouzdanom sistemu datoteka otpornom na greške (kao što je HDFS, S3, itd.) u kojem će biti pohranjene informacije o kontrolnoj tački. To se radi pomoću, na primjer:

streamingContext.checkpoint(checkpointDirectory)

U našem primjeru koristit ćemo sljedeći pristup, naime, ako checkpointDirectory postoji, onda će kontekst biti ponovo kreiran iz podataka kontrolne točke. Ako direktorij ne postoji (tj. izvršava se po prvi put), tada se poziva functionToCreateContext da kreira novi kontekst i konfigurira DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Kreiramo DirectStream objekat da se povežemo na temu "transakcije" koristeći createDirectStream metodu biblioteke KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Raščlanjivanje dolaznih podataka u JSON formatu:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Koristeći Spark SQL, radimo jednostavno grupisanje i prikazujemo rezultat u konzoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Dobivanje teksta upita i pokretanje kroz Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

A zatim spremamo rezultirajuće agregirane podatke u tabelu u AWS RDS. Za spremanje rezultata agregacije u tablicu baze podataka, koristit ćemo metodu pisanja objekta DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Nekoliko riječi o postavljanju veze na AWS RDS. Napravili smo korisnika i lozinku za njega u koraku “Primjena AWS PostgreSQL-a”. Trebali biste koristiti Endpoint kao URL servera baze podataka, koji je prikazan u odjeljku Povezivanje i sigurnost:

Apache Kafka i Streaming obrada podataka uz Spark Streaming

Da biste ispravno povezali Spark i Kafku, trebali biste pokrenuti posao putem smark-submit koristeći artefakt spark-streaming-kafka-0-8_2.11. Osim toga, koristićemo artefakt za interakciju sa PostgreSQL bazom podataka; mi ćemo ih prenijeti putem --packagesa.

Radi fleksibilnosti skripte, kao ulazne parametre uključićemo i naziv servera poruka i temu iz koje želimo da primamo podatke.

Dakle, vrijeme je da pokrenete i provjerite funkcionalnost sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Sve je uspjelo! Kao što možete vidjeti na slici ispod, dok je aplikacija pokrenuta, novi rezultati agregacije se izlaze svake 2 sekunde, jer smo postavili interval batching-a na 2 sekunde kada smo kreirali StreamingContext objekat:

Apache Kafka i Streaming obrada podataka uz Spark Streaming

Zatim pravimo jednostavan upit bazi podataka kako bismo provjerili prisustvo zapisa u tabeli transakcijski_tok:

Apache Kafka i Streaming obrada podataka uz Spark Streaming

zaključak

Ovaj članak je razmatrao primjer stream obrade informacija pomoću Spark Streaming-a u kombinaciji s Apache Kafka i PostgreSQL-om. Uz porast podataka iz različitih izvora, teško je precijeniti praktičnu vrijednost Spark Streaming-a za kreiranje streaming aplikacija i aplikacija u realnom vremenu.

Kompletan izvorni kod možete pronaći u mom spremištu na adresi GitHub.

Drago mi je da raspravljam o ovom članku, radujem se vašim komentarima, a nadam se i konstruktivnoj kritici svih brižnih čitatelja.

Želim vam uspeh!

. U početku je bilo planirano korištenje lokalne PostgreSQL baze podataka, ali s obzirom na moju ljubav prema AWS-u, odlučio sam da premjestim bazu podataka u oblak. U sljedećem članku na ovu temu pokazaću kako implementirati cijeli gore opisani sistem u AWS koristeći AWS Kinesis i AWS EMR. Pratite vijesti!

izvor: www.habr.com

Dodajte komentar