Apache Kafka i Streaming Data Processing s Spark Streamingom

Pozdrav, Habr! Danas ćemo izgraditi sustav koji će obrađivati ​​Apache Kafka tokove poruka koristeći Spark Streaming i zapisivati ​​rezultate obrade u AWS RDS bazu podataka u oblaku.

Zamislimo da nam određena kreditna institucija postavi zadatak obrade dolaznih transakcija "u hodu" u svim njezinim podružnicama. To se može učiniti u svrhu promptnog izračuna otvorene valutne pozicije za riznicu, limita ili financijskih rezultata za transakcije itd.

Kako provesti ovaj slučaj bez upotrebe magije i magičnih čarolija - pročitajte pod rezom! Ići!

Apache Kafka i Streaming Data Processing s Spark Streamingom
(Izvor slike)

Uvod

Naravno, obrada velike količine podataka u stvarnom vremenu pruža široke mogućnosti za korištenje u modernim sustavima. Jedna od najpopularnijih kombinacija za to je tandem Apache Kafka i Spark Streaming, gdje Kafka stvara tok dolaznih paketa poruka, a Spark Streaming obrađuje te pakete u zadanom vremenskom intervalu.

Kako bismo povećali toleranciju na pogreške aplikacije, koristit ćemo kontrolne točke. S ovim mehanizmom, kada mehanizam Spark Streaminga treba vratiti izgubljene podatke, treba se samo vratiti na posljednju kontrolnu točku i od tamo nastaviti s izračunima.

Arhitektura razvijenog sustava

Apache Kafka i Streaming Data Processing s Spark Streamingom

Korištene komponente:

  • Apache Kafka je distribuirani sustav objavljivanja i pretplate na poruke. Prikladno za potrošnju poruka izvan mreže i na mreži. Kako bi se spriječio gubitak podataka, Kafka poruke se pohranjuju na disk i repliciraju unutar klastera. Kafka sustav izgrađen je na vrhu usluge sinkronizacije ZooKeeper;
  • Apache Spark Streaming - Spark komponenta za obradu strujanja podataka. Modul Spark Streaming izgrađen je korištenjem mikro-batch arhitekture, gdje se tok podataka tumači kao kontinuirani niz malih paketa podataka. Spark Streaming uzima podatke iz različitih izvora i kombinira ih u male pakete. Novi paketi se kreiraju u redovitim intervalima. Na početku svakog vremenskog intervala kreira se novi paket, a svi podaci primljeni tijekom tog intervala uključeni su u paket. Na kraju intervala, rast paketa prestaje. Veličina intervala određena je parametrom koji se naziva batch interval;
  • Apache Spark SQL - kombinira relacijsku obradu sa Spark funkcionalnim programiranjem. Strukturirani podaci su podaci koji imaju shemu, odnosno jedan skup polja za sve zapise. Spark SQL podržava unos iz različitih izvora strukturiranih podataka i, zahvaljujući dostupnosti informacija o shemi, može učinkovito dohvatiti samo potrebna polja zapisa, a također pruža DataFrame API-je;
  • AWS RDS je relativno jeftina relacijska baza podataka temeljena na oblaku, web usluga koja pojednostavljuje postavljanje, rad i skaliranje, a njome upravlja izravno Amazon.

Instalacija i pokretanje Kafka poslužitelja

Prije izravnog korištenja Kafke morate provjeriti imate li Javu jer... JVM se koristi za rad:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kreirajmo novog korisnika za rad s Kafkom:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Zatim preuzmite distribuciju sa službene web stranice Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Raspakirajte preuzetu arhivu:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Sljedeći korak nije obavezan. Činjenica je da zadane postavke ne dopuštaju da u potpunosti iskoristite sve mogućnosti Apache Kafke. Na primjer, izbrišite temu, kategoriju, grupu u kojoj se poruke mogu objaviti. Da biste to promijenili, uredimo konfiguracijsku datoteku:

vim ~/kafka/config/server.properties

Dodajte sljedeće na kraj datoteke:

delete.topic.enable = true

Prije pokretanja Kafka servera potrebno je pokrenuti ZooKeeper server, mi ćemo koristiti pomoćnu skriptu koja dolazi uz Kafka distribuciju:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Nakon što je ZooKeeper uspješno pokrenut, pokrenite Kafka server u zasebnom terminalu:

bin/kafka-server-start.sh config/server.properties

Kreirajmo novu temu pod nazivom Transakcija:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Uvjerimo se da je kreirana tema s potrebnim brojem particija i replikacije:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka i Streaming Data Processing s Spark Streamingom

Propustimo trenutke testiranja proizvođača i potrošača za novostvorenu temu. Više detalja o tome kako možete testirati slanje i primanje poruka napisano je u službenoj dokumentaciji - Pošalji nekoliko poruka. Pa, prelazimo na pisanje proizvođača u Pythonu koristeći KafkaProducer API.

Producentsko pisanje

Proizvođač će generirati nasumične podatke - 100 poruka svake sekunde. Pod slučajnim podacima podrazumijevamo rječnik koji se sastoji od tri polja:

  • Grana — naziv prodajnog mjesta kreditne institucije;
  • Valuta — valuta transakcije;
  • Iznos - iznos transakcije. Iznos će biti pozitivan broj ako se radi o kupnji valute od strane Banke, a negativan broj ako se radi o prodaji.

Kod za proizvođača izgleda ovako:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Zatim, koristeći metodu slanja, šaljemo poruku poslužitelju, na temu koja nam je potrebna, u JSON formatu:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Prilikom pokretanja skripte u terminalu dobivamo sljedeće poruke:

Apache Kafka i Streaming Data Processing s Spark Streamingom

To znači da sve radi kako smo htjeli - producent generira i šalje poruke na temu koja nam je potrebna.
Sljedeći korak je instalirati Spark i obraditi ovaj tok poruka.

Instalacija Apache Spark

Apache Spark je univerzalna klaster računalna platforma visokih performansi.

Spark radi bolje od popularnih implementacija modela MapReduce dok podržava širi raspon tipova izračuna, uključujući interaktivne upite i obradu toka. Brzina igra važnu ulogu pri obradi velike količine podataka, budući da je brzina ta koja vam omogućuje interaktivni rad bez trošenja minuta ili sati čekanja. Jedna od najvećih snaga Spark-a koja ga čini tako brzim je njegova sposobnost izvođenja izračuna u memoriji.

Ovaj okvir je napisan u Scali, tako da ga morate prvo instalirati:

sudo apt-get install scala

Preuzmite distribuciju Spark sa službene web stranice:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Raspakirajte arhivu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Dodajte put do Spark u bash datoteku:

vim ~/.bashrc

Dodajte sljedeće retke kroz uređivač:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Pokrenite naredbu u nastavku nakon što napravite promjene u bashrc:

source ~/.bashrc

Uvođenje AWS PostgreSQL

Ostaje još samo postaviti bazu podataka u koju ćemo uploadati obrađene informacije iz streamova. Za to ćemo koristiti uslugu AWS RDS.

Idite na AWS konzolu -> AWS RDS -> Baze podataka -> Kreiraj bazu podataka:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Odaberite PostgreSQL i kliknite Dalje:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Jer Ovaj je primjer samo u obrazovne svrhe; koristit ćemo besplatni poslužitelj "najmanje" (Free Tier):
Apache Kafka i Streaming Data Processing s Spark Streamingom

Zatim stavljamo kvačicu u blok Free Tier, a nakon toga će nam se automatski ponuditi instanca klase t2.micro - iako slaba, besplatna je i sasvim prikladna za naš zadatak:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Zatim dolaze vrlo važne stvari: ime instance baze podataka, ime glavnog korisnika i njegova lozinka. Imenujmo instancu: myHabrTest, glavni korisnik: habr, zaporka: habr12345 i kliknite na gumb Dalje:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Na sljedećoj stranici nalaze se parametri odgovorni za dostupnost našeg poslužitelja baze podataka izvana (Public accessibility) i dostupnost porta:

Apache Kafka i Streaming Data Processing s Spark Streamingom

Kreirajmo novu postavku za VPC sigurnosnu grupu, koja će omogućiti vanjski pristup našem poslužitelju baze podataka preko porta 5432 (PostgreSQL).
Idemo na AWS konzolu u zasebnom prozoru preglednika na VPC nadzornu ploču -> Sigurnosne grupe -> odjeljak Stvori sigurnosnu grupu:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Postavljamo naziv za sigurnosnu grupu - PostgreSQL, opis, naznačujemo s kojim VPC-om ova grupa treba biti povezana i kliknemo gumb Kreiraj:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Ispunite Ulazna pravila za port 5432 za novostvorenu grupu, kao što je prikazano na slici ispod. Port ne možete odrediti ručno, već odaberite PostgreSQL s padajućeg popisa Vrsta.

Strogo govoreći, vrijednost ::/0 znači dostupnost dolaznog prometa na poslužitelj iz cijelog svijeta, što kanonski nije sasvim točno, ali za analizu primjera dopustimo si korištenje ovog pristupa:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Vraćamo se na stranicu preglednika, gdje imamo otvorenu “Konfiguracija naprednih postavki” i odabiremo u odjeljku VPC sigurnosne grupe -> Odaberite postojeće VPC sigurnosne grupe -> PostgreSQL:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Zatim, u opcijama baze podataka -> Naziv baze podataka -> postavite naziv - habrDB.

Preostale parametre, osim onemogućavanja sigurnosne kopije (razdoblje zadržavanja sigurnosne kopije - 0 dana), nadzor i Performance Insights, možemo ostaviti prema zadanim postavkama. Kliknite na gumb Stvorite bazu podataka:
Apache Kafka i Streaming Data Processing s Spark Streamingom

Rukovatelj niti

Završna faza bit će razvoj Spark posla koji će svake dvije sekunde obrađivati ​​nove podatke koji dolaze iz Kafke i unositi rezultat u bazu podataka.

Kao što je gore navedeno, kontrolne točke su temeljni mehanizam u SparkStreamingu koji se mora konfigurirati kako bi se osigurala tolerancija na pogreške. Koristit ćemo kontrolne točke i, ako postupak ne uspije, modul Spark Streaming samo će se morati vratiti na posljednju kontrolnu točku i nastaviti s izračunima s nje kako bi povratio izgubljene podatke.

Kontrolne točke se mogu omogućiti postavljanjem direktorija na pouzdanom datotečnom sustavu otpornom na pogreške (kao što je HDFS, S3, itd.) u kojem će biti pohranjene informacije o kontrolnim točkama. To se radi pomoću, na primjer:

streamingContext.checkpoint(checkpointDirectory)

U našem primjeru koristit ćemo sljedeći pristup, naime, ako checkpointDirectory postoji, tada će se kontekst ponovno stvoriti iz podataka o kontrolnim točkama. Ako direktorij ne postoji (tj. izvodi se prvi put), tada se poziva functionToCreateContext za stvaranje novog konteksta i konfiguriranje DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Stvaramo DirectStream objekt za povezivanje s temom "transakcija" pomoću metode createDirectStream biblioteke KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Raščlanjivanje dolaznih podataka u JSON formatu:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Koristeći Spark SQL, radimo jednostavno grupiranje i prikazujemo rezultat u konzoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Dohvaćanje teksta upita i njegovo pokretanje kroz Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Zatim spremamo dobivene agregirane podatke u tablicu u AWS RDS. Za spremanje rezultata agregacije u tablicu baze podataka, koristit ćemo metodu pisanja DataFrame objekta:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Nekoliko riječi o postavljanju veze na AWS RDS. Stvorili smo korisnika i lozinku za njega u koraku “Uvođenje AWS PostgreSQL”. Trebali biste koristiti Endpoint kao url poslužitelja baze podataka, koji je prikazan u odjeljku Povezivost i sigurnost:

Apache Kafka i Streaming Data Processing s Spark Streamingom

Kako biste ispravno povezali Spark i Kafku, trebali biste pokrenuti posao putem smark-submita koristeći artefakt spark-streaming-kafka-0-8_2.11. Osim toga, također ćemo koristiti artefakt za interakciju s PostgreSQL bazom podataka; prenosit ćemo ih putem --packages.

Radi fleksibilnosti skripte, također ćemo uključiti kao ulazne parametre naziv poslužitelja poruka i temu iz koje želimo primati podatke.

Dakle, vrijeme je za pokretanje i provjeru funkcionalnosti sustava:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Sve je uspjelo! Kao što možete vidjeti na slici ispod, dok je aplikacija pokrenuta, novi rezultati agregacije izlaze svake 2 sekunde, jer smo postavili interval skupljanja na 2 sekunde kada smo kreirali StreamingContext objekt:

Apache Kafka i Streaming Data Processing s Spark Streamingom

Zatim postavljamo jednostavan upit bazi podataka kako bismo provjerili prisutnost zapisa u tablici tijek_transakcije:

Apache Kafka i Streaming Data Processing s Spark Streamingom

Zaključak

U ovom članku razmatran je primjer toka obrade informacija pomoću Spark Streaminga u kombinaciji s Apache Kafka i PostgreSQL. S porastom podataka iz različitih izvora, teško je precijeniti praktičnu vrijednost Spark Streaminga za kreiranje streaminga i aplikacija u stvarnom vremenu.

Puni izvorni kod možete pronaći u mom repozitoriju na GitHub.

Rado ću raspravljati o ovom članku, veselim se vašim komentarima, a nadam se i konstruktivnim kritikama svih brižnih čitatelja.

Želim vam uspjeh!

Ps. U početku je bilo planirano koristiti lokalnu PostgreSQL bazu podataka, ali s obzirom na moju ljubav prema AWS-u, odlučio sam premjestiti bazu podataka u oblak. U sljedećem članku na ovu temu pokazat ću kako implementirati cijeli gore opisani sustav u AWS koristeći AWS Kinesis i AWS EMR. Pratite novosti!

Izvor: www.habr.com

Dodajte komentar