ProHoster > Blog > administracja > Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming
Apache Kafka i strumieniowe przetwarzanie danych za pomocą Spark Streaming
Witaj, Habro! Dzisiaj zbudujemy system, który będzie przetwarzał strumienie komunikatów Apache Kafka przy użyciu Spark Streaming i zapisywał wyniki przetwarzania do bazy danych w chmurze AWS RDS.
Wyobraźmy sobie, że pewna instytucja kredytowa stawia nam zadanie obsługi transakcji przychodzących „w locie” we wszystkich swoich oddziałach. Można to zrobić w celu szybkiego obliczenia otwartej pozycji walutowej dla skarbca, limitów lub wyników finansowych transakcji itp.
Jak zrealizować ten przypadek bez użycia magii i zaklęć magicznych - przeczytaj pod nacięciem! Iść!
Oczywiście przetwarzanie dużej ilości danych w czasie rzeczywistym daje szerokie możliwości wykorzystania w nowoczesnych systemach. Jedną z najpopularniejszych kombinacji jest tandem Apache Kafka i Spark Streaming, w którym Kafka tworzy strumień przychodzących pakietów wiadomości, a Spark Streaming przetwarza te pakiety w określonym przedziale czasu.
Aby zwiększyć odporność aplikacji na błędy, wykorzystamy punkty kontrolne. Dzięki temu mechanizmowi, gdy silnik Spark Streaming musi odzyskać utracone dane, wystarczy wrócić do ostatniego punktu kontrolnego i stamtąd wznowić obliczenia.
Architektura opracowanego systemu
Wykorzystane komponenty:
Apache Kafka to rozproszony system przesyłania wiadomości typu „publikuj i subskrybuj”. Nadaje się zarówno do korzystania z wiadomości offline, jak i online. Aby zapobiec utracie danych, komunikaty Kafki są przechowywane na dysku i replikowane w klastrze. System Kafka opiera się na usłudze synchronizacji ZooKeeper;
Przesyłanie strumieniowe Apache Spark - Komponent Spark do przetwarzania danych przesyłanych strumieniowo. Moduł Spark Streaming zbudowany jest w architekturze mikro-batch, gdzie strumień danych jest interpretowany jako ciągła sekwencja małych pakietów danych. Spark Streaming pobiera dane z różnych źródeł i łączy je w małe pakiety. Nowe pakiety tworzone są w regularnych odstępach czasu. Na początku każdego przedziału czasu tworzony jest nowy pakiet, a wszelkie dane otrzymane w tym przedziale czasu są uwzględniane w pakiecie. Pod koniec interwału wzrost pakietów zatrzymuje się. Rozmiar interwału jest określony przez parametr zwany interwałem wsadowym;
Apache SparkSQL - łączy przetwarzanie relacyjne z programowaniem funkcjonalnym Spark. Dane strukturalne to dane posiadające schemat, czyli pojedynczy zestaw pól dla wszystkich rekordów. Spark SQL obsługuje dane wejściowe z różnych ustrukturyzowanych źródeł danych, a dzięki dostępności informacji o schemacie może efektywnie pobierać tylko wymagane pola rekordów, a także udostępnia interfejsy API DataFrame;
AWS RDS to stosunkowo niedroga relacyjna baza danych oparta na chmurze, usługa internetowa, która upraszcza konfigurację, obsługę i skalowanie i jest administrowana bezpośrednio przez Amazon.
Instalacja i uruchomienie serwera Kafka
Przed bezpośrednim użyciem Kafki musisz upewnić się, że masz Javę, ponieważ... JVM służy do pracy:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Następny krok jest opcjonalny. Faktem jest, że ustawienia domyślne nie pozwalają w pełni wykorzystać wszystkich możliwości Apache Kafka. Przykładowo usuń temat, kategorię, grupę, w której można publikować wiadomości. Aby to zmienić, zmodyfikujmy plik konfiguracyjny:
vim ~/kafka/config/server.properties
Dodaj następujący wpis na końcu pliku:
delete.topic.enable = true
Przed uruchomieniem serwera Kafka należy uruchomić serwer ZooKeeper; użyjemy pomocniczego skryptu dołączonego do dystrybucji Kafki:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Po pomyślnym uruchomieniu ZooKeepera uruchom serwer Kafka w osobnym terminalu:
Omińmy momenty testowania producenta i konsumenta pod kątem nowo powstałego tematu. Więcej szczegółów na temat testowania wysyłania i odbierania wiadomości znajduje się w oficjalnej dokumentacji - Wyślij kilka wiadomości. Cóż, przechodzimy do pisania producenta w Pythonie przy użyciu API KafkaProducer.
Tekst producenta
Producent będzie generował losowe dane - 100 komunikatów na sekundę. Przez dane losowe rozumiemy słownik składający się z trzech pól:
Oddział — nazwa punktu sprzedaży instytucji kredytowej;
Waluta - waluta transakcji;
ilość - Suma transakcyjna. Kwota będzie liczbą dodatnią w przypadku zakupu waluty przez Bank i liczbą ujemną w przypadku sprzedaży.
Następnie metodą send wysyłamy wiadomość na serwer na potrzebny nam temat w formacie JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Podczas uruchamiania skryptu w terminalu otrzymujemy następujące komunikaty:
Oznacza to, że wszystko działa tak jak chcieliśmy – producent generuje i wysyła komunikaty na potrzebny nam temat.
Następnym krokiem jest instalacja Sparka i przetworzenie tego strumienia komunikatów.
Instalowanie Apache Sparka
Apache Spark to uniwersalna i wysokowydajna platforma przetwarzania klastrowego.
Spark działa lepiej niż popularne implementacje modelu MapReduce, obsługując jednocześnie szerszy zakres typów obliczeń, w tym interaktywne zapytania i przetwarzanie strumieniowe. Szybkość odgrywa ważną rolę przy przetwarzaniu dużych ilości danych, ponieważ to właśnie prędkość pozwala na interaktywną pracę bez konieczności spędzania minut lub godzin na czekaniu. Jedną z największych zalet Sparka, która sprawia, że jest tak szybki, jest jego zdolność do wykonywania obliczeń w pamięci.
Ten framework jest napisany w Scali, więc musisz go najpierw zainstalować:
Uruchom poniższe polecenie po wprowadzeniu zmian w bashrc:
source ~/.bashrc
Wdrażanie AWS PostgreSQL
Pozostaje tylko wdrożyć bazę danych, do której będziemy przesyłać przetworzone informacje ze strumieni. W tym celu skorzystamy z usługi AWS RDS.
Przejdź do konsoli AWS -> AWS RDS -> Bazy danych -> Utwórz bazę danych:
Wybierz PostgreSQL i kliknij Dalej:
Ponieważ Ten przykład służy wyłącznie celom edukacyjnym; będziemy korzystać z darmowego serwera „co najmniej” (bezpłatny poziom):
Następnie zaznaczamy blok Free Tier, po czym automatycznie zostanie nam zaproponowana instancja klasy t2.micro - choć słaba, jest darmowa i całkiem odpowiednia do naszego zadania:
Następnie dochodzą bardzo ważne rzeczy: nazwa instancji bazy danych, nazwa użytkownika głównego i jego hasło. Nazwijmy instancję: myHabrTest, użytkownik główny: hab, hasło: habr12345 i kliknij przycisk Dalej:
Na kolejnej stronie znajdują się parametry odpowiedzialne za dostępność naszego serwera baz danych z zewnątrz (Dostępność publiczna) oraz dostępność portów:
Utwórzmy nowe ustawienie dla grupy bezpieczeństwa VPC, które umożliwi zewnętrzny dostęp do naszego serwera bazy danych poprzez port 5432 (PostgreSQL).
Przejdźmy do konsoli AWS w osobnym oknie przeglądarki do sekcji VPC Dashboard -> Grupy zabezpieczeń -> Utwórz grupę zabezpieczeń:
Ustalamy nazwę grupy Security - PostgreSQL, opis, wskazujemy z którym VPC ta grupa ma być powiązana i klikamy przycisk Utwórz:
Wypełnij reguły ruchu przychodzącego dla portu 5432 dla nowo utworzonej grupy, jak pokazano na poniższym obrazku. Nie możesz określić portu ręcznie, ale wybierz PostgreSQL z listy rozwijanej Typ.
Ściśle rzecz biorąc, wartość ::/0 oznacza dostępność ruchu przychodzącego na serwer z całego świata, co kanonicznie nie jest do końca prawdą, ale analizując przykład, pozwólmy sobie na takie podejście:
Wracamy do strony przeglądarki, gdzie mamy otwartą opcję „Konfiguruj ustawienia zaawansowane” i w sekcji Grupy zabezpieczeń VPC wybieramy -> Wybierz istniejące grupy zabezpieczeń VPC -> PostgreSQL:
Następnie w Opcjach bazy danych -> Nazwa bazy danych -> ustaw nazwę - habrDB.
Pozostałe parametry możemy pozostawić poza domyślnym wyłączeniem tworzenia kopii zapasowych (okres przechowywania kopii zapasowych - 0 dni), monitorowania i Performance Insights. Kliknij przycisk Utwórz bazę danych:
Osoba obsługująca wątek
Ostatnim etapem będzie opracowanie zadania Spark, które co dwie sekundy będzie przetwarzało nowe dane napływające z Kafki i wprowadzało wynik do bazy danych.
Jak wspomniano powyżej, punkty kontrolne są podstawowym mechanizmem w SparkStreaming, który należy skonfigurować, aby zapewnić odporność na błędy. Będziemy korzystać z punktów kontrolnych i jeśli procedura się nie powiedzie, moduł Spark Streaming będzie musiał jedynie wrócić do ostatniego punktu kontrolnego i wznowić z niego obliczenia, aby odzyskać utracone dane.
Punkty kontrolne można włączyć, ustawiając katalog w odpornym na błędy, niezawodnym systemie plików (takim jak HDFS, S3 itp.), w którym będą przechowywane informacje o punktach kontrolnych. Odbywa się to za pomocą np.:
streamingContext.checkpoint(checkpointDirectory)
W naszym przykładzie zastosujemy następujące podejście, a mianowicie, jeśli istnieje checkpointDirectory, to kontekst zostanie odtworzony z danych punktu kontrolnego. Jeśli katalog nie istnieje (tj. jest wykonywany po raz pierwszy), to wywoływana jest funkcja FunctionToCreateContext w celu utworzenia nowego kontekstu i skonfigurowania DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Tworzymy obiekt DirectStream, aby połączyć się z tematem „transakcji” za pomocą metody createDirectStream biblioteki KafkaUtils:
Używając Spark SQL robimy proste grupowanie i wyświetlamy wynik w konsoli:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Pobieranie tekstu zapytania i uruchamianie go poprzez Spark SQL:
Następnie zapisujemy powstałe zagregowane dane w tabeli w AWS RDS. Aby zapisać wyniki agregacji do tabeli bazy danych skorzystamy z metody write obiektu DataFrame:
Kilka słów o konfiguracji połączenia z AWS RDS. Utworzyliśmy dla niego użytkownika i hasło na etapie „Wdrażanie AWS PostgreSQL”. Powinieneś użyć Endpoint jako adresu URL serwera bazy danych, który jest wyświetlany w sekcji Łączność i bezpieczeństwo:
Aby poprawnie połączyć Sparka i Kafkę, powinieneś uruchomić zadanie poprzez smark-submit przy użyciu artefaktu Spark-streaming-kafka-0-8_2.11. Dodatkowo do interakcji z bazą danych PostgreSQL wykorzystamy także artefakt, który przekażemy poprzez --packages.
Dla elastyczności skryptu jako parametry wejściowe podamy również nazwę serwera wiadomości oraz temat, z którego chcemy otrzymywać dane.
Czas więc uruchomić i sprawdzić funkcjonalność systemu:
Wszystko się udało! Jak widać na poniższym obrazku, gdy aplikacja jest uruchomiona, nowe wyniki agregacji są generowane co 2 sekundy, ponieważ podczas tworzenia obiektu StreamingContext ustawiliśmy interwał przetwarzania wsadowego na 2 sekundy:
Następnie wykonujemy proste zapytanie do bazy danych, aby sprawdzić obecność rekordów w tabeli przepływ_transakcji:
wniosek
W tym artykule omówiono przykład strumieniowego przetwarzania informacji przy użyciu platformy Spark Streaming w połączeniu z Apache Kafka i PostgreSQL. Wraz ze wzrostem ilości danych pochodzących z różnych źródeł trudno przecenić praktyczną wartość Spark Streaming przy tworzeniu aplikacji strumieniowych i czasu rzeczywistego.
Pełny kod źródłowy można znaleźć w moim repozytorium pod adresem GitHub.
Z przyjemnością omówię ten artykuł, czekam na Twoje komentarze i liczę również na konstruktywną krytykę ze strony wszystkich troskliwych czytelników.
Życzę powodzenia!
Ps. Początkowo planowano wykorzystać lokalną bazę danych PostgreSQL, jednak biorąc pod uwagę moją miłość do AWS, zdecydowałem się przenieść bazę danych do chmury. W kolejnym artykule na ten temat pokażę jak zaimplementować cały opisany powyżej system w AWS z wykorzystaniem AWS Kinesis i AWS EMR. Śledź wiadomości!