Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Witaj, Habro! Dzisiaj zbudujemy system, który będzie przetwarzał strumienie komunikatów Apache Kafka przy użyciu Spark Streaming i zapisywał wyniki przetwarzania do bazy danych w chmurze AWS RDS.

Wyobraźmy sobie, że pewna instytucja kredytowa stawia nam zadanie obsługi transakcji przychodzących „w locie” we wszystkich swoich oddziałach. Można to zrobić w celu szybkiego obliczenia otwartej pozycji walutowej dla skarbca, limitów lub wyników finansowych transakcji itp.

Jak zrealizować ten przypadek bez użycia magii i zaklęć magicznych - przeczytaj pod nacięciem! Iść!

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming
(Źródło obrazu)

Wprowadzenie

Oczywiście przetwarzanie dużej ilości danych w czasie rzeczywistym daje szerokie możliwości wykorzystania w nowoczesnych systemach. Jedną z najpopularniejszych kombinacji jest tandem Apache Kafka i Spark Streaming, w którym Kafka tworzy strumień przychodzących pakietów wiadomości, a Spark Streaming przetwarza te pakiety w określonym przedziale czasu.

Aby zwiększyć odporność aplikacji na błędy, wykorzystamy punkty kontrolne. Dzięki temu mechanizmowi, gdy silnik Spark Streaming musi odzyskać utracone dane, wystarczy wrócić do ostatniego punktu kontrolnego i stamtąd wznowić obliczenia.

Architektura opracowanego systemu

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Wykorzystane komponenty:

  • Apache Kafka to rozproszony system przesyłania wiadomości typu „publikuj i subskrybuj”. Nadaje się zarówno do korzystania z wiadomości offline, jak i online. Aby zapobiec utracie danych, komunikaty Kafki są przechowywane na dysku i replikowane w klastrze. System Kafka opiera się na usłudze synchronizacji ZooKeeper;
  • Przesyłanie strumieniowe Apache Spark - Komponent Spark do przetwarzania danych przesyłanych strumieniowo. Moduł Spark Streaming zbudowany jest w architekturze mikro-batch, gdzie strumień danych jest interpretowany jako ciągła sekwencja małych pakietów danych. Spark Streaming pobiera dane z różnych źródeł i łączy je w małe pakiety. Nowe pakiety tworzone są w regularnych odstępach czasu. Na początku każdego przedziału czasu tworzony jest nowy pakiet, a wszelkie dane otrzymane w tym przedziale czasu są uwzględniane w pakiecie. Pod koniec interwału wzrost pakietów zatrzymuje się. Rozmiar interwału jest określony przez parametr zwany interwałem wsadowym;
  • Apache SparkSQL - łączy przetwarzanie relacyjne z programowaniem funkcjonalnym Spark. Dane strukturalne to dane posiadające schemat, czyli pojedynczy zestaw pól dla wszystkich rekordów. Spark SQL obsługuje dane wejściowe z różnych ustrukturyzowanych źródeł danych, a dzięki dostępności informacji o schemacie może efektywnie pobierać tylko wymagane pola rekordów, a także udostępnia interfejsy API DataFrame;
  • AWS RDS to stosunkowo niedroga relacyjna baza danych oparta na chmurze, usługa internetowa, która upraszcza konfigurację, obsługę i skalowanie i jest administrowana bezpośrednio przez Amazon.

Instalacja i uruchomienie serwera Kafka

Przed bezpośrednim użyciem Kafki musisz upewnić się, że masz Javę, ponieważ... JVM służy do pracy:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Utwórzmy nowego użytkownika do pracy z Kafką:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Następnie pobierz dystrybucję z oficjalnej strony Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Rozpakuj pobrane archiwum:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Następny krok jest opcjonalny. Faktem jest, że ustawienia domyślne nie pozwalają w pełni wykorzystać wszystkich możliwości Apache Kafka. Przykładowo usuń temat, kategorię, grupę, w której można publikować wiadomości. Aby to zmienić, zmodyfikujmy plik konfiguracyjny:

vim ~/kafka/config/server.properties

Dodaj następujący wpis na końcu pliku:

delete.topic.enable = true

Przed uruchomieniem serwera Kafka należy uruchomić serwer ZooKeeper; użyjemy pomocniczego skryptu dołączonego do dystrybucji Kafki:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Po pomyślnym uruchomieniu ZooKeepera uruchom serwer Kafka w osobnym terminalu:

bin/kafka-server-start.sh config/server.properties

Utwórzmy nowy temat o nazwie Transakcja:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Upewnijmy się, że został utworzony temat z wymaganą liczbą partycji i replikacji:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Omińmy momenty testowania producenta i konsumenta pod kątem nowo powstałego tematu. Więcej szczegółów na temat testowania wysyłania i odbierania wiadomości znajduje się w oficjalnej dokumentacji - Wyślij kilka wiadomości. Cóż, przechodzimy do pisania producenta w Pythonie przy użyciu API KafkaProducer.

Tekst producenta

Producent będzie generował losowe dane - 100 komunikatów na sekundę. Przez dane losowe rozumiemy słownik składający się z trzech pól:

  • Oddział — nazwa punktu sprzedaży instytucji kredytowej;
  • Waluta - waluta transakcji;
  • ilość - Suma transakcyjna. Kwota będzie liczbą dodatnią w przypadku zakupu waluty przez Bank i liczbą ujemną w przypadku sprzedaży.

Kod producenta wygląda następująco:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Następnie metodą send wysyłamy wiadomość na serwer na potrzebny nam temat w formacie JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Podczas uruchamiania skryptu w terminalu otrzymujemy następujące komunikaty:

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Oznacza to, że wszystko działa tak jak chcieliśmy – producent generuje i wysyła komunikaty na potrzebny nam temat.
Następnym krokiem jest instalacja Sparka i przetworzenie tego strumienia komunikatów.

Instalowanie Apache Sparka

Apache Spark to uniwersalna i wysokowydajna platforma przetwarzania klastrowego.

Spark działa lepiej niż popularne implementacje modelu MapReduce, obsługując jednocześnie szerszy zakres typów obliczeń, w tym interaktywne zapytania i przetwarzanie strumieniowe. Szybkość odgrywa ważną rolę przy przetwarzaniu dużych ilości danych, ponieważ to właśnie prędkość pozwala na interaktywną pracę bez konieczności spędzania minut lub godzin na czekaniu. Jedną z największych zalet Sparka, która sprawia, że ​​jest tak szybki, jest jego zdolność do wykonywania obliczeń w pamięci.

Ten framework jest napisany w Scali, więc musisz go najpierw zainstalować:

sudo apt-get install scala

Pobierz dystrybucję Spark z oficjalnej strony:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Rozpakuj archiwum:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Dodaj ścieżkę do Sparka do pliku bash:

vim ~/.bashrc

Dodaj następujące wiersze za pomocą edytora:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Uruchom poniższe polecenie po wprowadzeniu zmian w bashrc:

source ~/.bashrc

Wdrażanie AWS PostgreSQL

Pozostaje tylko wdrożyć bazę danych, do której będziemy przesyłać przetworzone informacje ze strumieni. W tym celu skorzystamy z usługi AWS RDS.

Przejdź do konsoli AWS -> AWS RDS -> Bazy danych -> Utwórz bazę danych:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Wybierz PostgreSQL i kliknij Dalej:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Ponieważ Ten przykład służy wyłącznie celom edukacyjnym; będziemy korzystać z darmowego serwera „co najmniej” (bezpłatny poziom):
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Następnie zaznaczamy blok Free Tier, po czym automatycznie zostanie nam zaproponowana instancja klasy t2.micro - choć słaba, jest darmowa i całkiem odpowiednia do naszego zadania:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Następnie dochodzą bardzo ważne rzeczy: nazwa instancji bazy danych, nazwa użytkownika głównego i jego hasło. Nazwijmy instancję: myHabrTest, użytkownik główny: hab, hasło: habr12345 i kliknij przycisk Dalej:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Na kolejnej stronie znajdują się parametry odpowiedzialne za dostępność naszego serwera baz danych z zewnątrz (Dostępność publiczna) oraz dostępność portów:

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Utwórzmy nowe ustawienie dla grupy bezpieczeństwa VPC, które umożliwi zewnętrzny dostęp do naszego serwera bazy danych poprzez port 5432 (PostgreSQL).
Przejdźmy do konsoli AWS w osobnym oknie przeglądarki do sekcji VPC Dashboard -> Grupy zabezpieczeń -> Utwórz grupę zabezpieczeń:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Ustalamy nazwę grupy Security - PostgreSQL, opis, wskazujemy z którym VPC ta grupa ma być powiązana i klikamy przycisk Utwórz:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Wypełnij reguły ruchu przychodzącego dla portu 5432 dla nowo utworzonej grupy, jak pokazano na poniższym obrazku. Nie możesz określić portu ręcznie, ale wybierz PostgreSQL z listy rozwijanej Typ.

Ściśle rzecz biorąc, wartość ::/0 oznacza dostępność ruchu przychodzącego na serwer z całego świata, co kanonicznie nie jest do końca prawdą, ale analizując przykład, pozwólmy sobie na takie podejście:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Wracamy do strony przeglądarki, gdzie mamy otwartą opcję „Konfiguruj ustawienia zaawansowane” i w sekcji Grupy zabezpieczeń VPC wybieramy -> Wybierz istniejące grupy zabezpieczeń VPC -> PostgreSQL:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Następnie w Opcjach bazy danych -> Nazwa bazy danych -> ustaw nazwę - habrDB.

Pozostałe parametry możemy pozostawić poza domyślnym wyłączeniem tworzenia kopii zapasowych (okres przechowywania kopii zapasowych - 0 dni), monitorowania i Performance Insights. Kliknij przycisk Utwórz bazę danych:
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Osoba obsługująca wątek

Ostatnim etapem będzie opracowanie zadania Spark, które co dwie sekundy będzie przetwarzało nowe dane napływające z Kafki i wprowadzało wynik do bazy danych.

Jak wspomniano powyżej, punkty kontrolne są podstawowym mechanizmem w SparkStreaming, który należy skonfigurować, aby zapewnić odporność na błędy. Będziemy korzystać z punktów kontrolnych i jeśli procedura się nie powiedzie, moduł Spark Streaming będzie musiał jedynie wrócić do ostatniego punktu kontrolnego i wznowić z niego obliczenia, aby odzyskać utracone dane.

Punkty kontrolne można włączyć, ustawiając katalog w odpornym na błędy, niezawodnym systemie plików (takim jak HDFS, S3 itp.), w którym będą przechowywane informacje o punktach kontrolnych. Odbywa się to za pomocą np.:

streamingContext.checkpoint(checkpointDirectory)

W naszym przykładzie zastosujemy następujące podejście, a mianowicie, jeśli istnieje checkpointDirectory, to kontekst zostanie odtworzony z danych punktu kontrolnego. Jeśli katalog nie istnieje (tj. jest wykonywany po raz pierwszy), to wywoływana jest funkcja FunctionToCreateContext w celu utworzenia nowego kontekstu i skonfigurowania DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Tworzymy obiekt DirectStream, aby połączyć się z tematem „transakcji” za pomocą metody createDirectStream biblioteki KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analizowanie danych przychodzących w formacie JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Używając Spark SQL robimy proste grupowanie i wyświetlamy wynik w konsoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Pobieranie tekstu zapytania i uruchamianie go poprzez Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Następnie zapisujemy powstałe zagregowane dane w tabeli w AWS RDS. Aby zapisać wyniki agregacji do tabeli bazy danych skorzystamy z metody write obiektu DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Kilka słów o konfiguracji połączenia z AWS RDS. Utworzyliśmy dla niego użytkownika i hasło na etapie „Wdrażanie AWS PostgreSQL”. Powinieneś użyć Endpoint jako adresu URL serwera bazy danych, który jest wyświetlany w sekcji Łączność i bezpieczeństwo:

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Aby poprawnie połączyć Sparka i Kafkę, powinieneś uruchomić zadanie poprzez smark-submit przy użyciu artefaktu Spark-streaming-kafka-0-8_2.11. Dodatkowo do interakcji z bazą danych PostgreSQL wykorzystamy także artefakt, który przekażemy poprzez --packages.

Dla elastyczności skryptu jako parametry wejściowe podamy również nazwę serwera wiadomości oraz temat, z którego chcemy otrzymywać dane.

Czas więc uruchomić i sprawdzić funkcjonalność systemu:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Wszystko się udało! Jak widać na poniższym obrazku, gdy aplikacja jest uruchomiona, nowe wyniki agregacji są generowane co 2 sekundy, ponieważ podczas tworzenia obiektu StreamingContext ustawiliśmy interwał przetwarzania wsadowego na 2 sekundy:

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

Następnie wykonujemy proste zapytanie do bazy danych, aby sprawdzić obecność rekordów w tabeli przepływ_transakcji:

Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming

wniosek

W tym artykule omówiono przykład strumieniowego przetwarzania informacji przy użyciu platformy Spark Streaming w połączeniu z Apache Kafka i PostgreSQL. Wraz ze wzrostem ilości danych pochodzących z różnych źródeł trudno przecenić praktyczną wartość Spark Streaming przy tworzeniu aplikacji strumieniowych i czasu rzeczywistego.

Pełny kod źródłowy można znaleźć w moim repozytorium pod adresem GitHub.

Z przyjemnością omówię ten artykuł, czekam na Twoje komentarze i liczę również na konstruktywną krytykę ze strony wszystkich troskliwych czytelników.

Życzę powodzenia!

Ps. Początkowo planowano wykorzystać lokalną bazę danych PostgreSQL, jednak biorąc pod uwagę moją miłość do AWS, zdecydowałem się przenieść bazę danych do chmury. W kolejnym artykule na ten temat pokażę jak zaimplementować cały opisany powyżej system w AWS z wykorzystaniem AWS Kinesis i AWS EMR. Śledź wiadomości!

Źródło: www.habr.com

Dodaj komentarz