Delta: platforma synchronizacji i wzbogacania danych

W oczekiwaniu na uruchomienie nowego przepływu w tempie Inżynier danych Przygotowaliśmy tłumaczenie ciekawych materiałów.

Delta: platforma synchronizacji i wzbogacania danych

Przegląd

Porozmawiamy o dość popularnym schemacie, według którego aplikacje korzystają z wielu magazynów danych, gdzie każdy magazyn jest wykorzystywany do własnych celów, na przykład do przechowywania kanonicznej postaci danych (MySQL itp.), Zapewnia zaawansowane możliwości wyszukiwania (ElasticSearch, itp.) .), buforowanie (Memcached itp.) i inne. Zazwyczaj w przypadku korzystania z wielu magazynów danych jeden z nich pełni rolę magazynu głównego, a inne magazyny pochodne. Jedynym problemem jest synchronizacja tych magazynów danych.

Przyjrzeliśmy się wielu różnym wzorcom, które próbowały rozwiązać problem synchronizacji wielu sklepów, takim jak podwójne zapisy, transakcje rozproszone itp. Jednakże podejścia te mają znaczne ograniczenia w zakresie rzeczywistego użytkowania, niezawodności i konserwacji. Oprócz synchronizacji danych niektóre aplikacje muszą także wzbogacać dane, wywołując usługi zewnętrzne.

Delta została opracowana, aby rozwiązać te problemy. Ostatecznie Delta zapewnia spójną, sterowaną zdarzeniami platformę do synchronizacji i wzbogacania danych.

Istniejące rozwiązania

podwójne wejście

Aby zachować synchronizację dwóch magazynów danych, można zastosować zapis podwójny, który powoduje zapis w jednym magazynie, a następnie natychmiastowy zapis w drugim. Można ponowić pierwszą próbę, a drugą przerwać, jeśli pierwsza nie powiedzie się po wyczerpaniu liczby prób. Jednakże te dwa magazyny danych mogą utracić synchronizację, jeśli zapis w drugim magazynie nie powiedzie się. Problem ten zwykle rozwiązuje się poprzez utworzenie procedury odzyskiwania, która może okresowo ponownie przesyłać dane z pierwszego magazynu do drugiego lub robić to tylko w przypadku wykrycia różnic w danych.

Problemy:

Wykonanie procedury odzyskiwania to specyficzne zadanie, którego nie można ponownie wykorzystać. Ponadto dane pomiędzy lokalizacjami przechowywania pozostają niezsynchronizowane do czasu przeprowadzenia procedury przywracania. Rozwiązanie staje się bardziej złożone, jeśli używane są więcej niż dwa magazyny danych. Na koniec procedura przywracania może zwiększyć obciążenie oryginalnego źródła danych.

Zmień tabelę dziennika

Kiedy w zestawie tabel zachodzą zmiany (takie jak wstawianie, aktualizowanie i usuwanie rekordu), rekordy zmian są dodawane do tabeli dziennika w ramach tej samej transakcji. Inny wątek lub proces stale żąda zdarzeń z tabeli dziennika i zapisuje je w jednym lub większej liczbie magazynów danych, jeśli to konieczne, usuwając zdarzenia z tabeli dziennika po potwierdzeniu zapisu przez wszystkie magazyny.

Problemy:

Wzorzec ten powinien zostać zaimplementowany jako biblioteka, a najlepiej bez zmiany kodu aplikacji, która go używa. W środowisku poliglotycznym implementacja takiej biblioteki powinna istnieć w dowolnym niezbędnym języku, jednak zapewnienie spójności funkcjonalności i zachowania w różnych językach jest bardzo trudne.

Kolejnym problemem jest uzyskiwanie zmian schematów w systemach, które nie obsługują zmian schematów transakcyjnych [1] [2], takich jak MySQL. Dlatego też schemat dokonywania zmiany (na przykład zmiany schematu) i transakcyjnego rejestrowania jej w tabeli dziennika zmian nie zawsze będzie działał.

Transakcje rozproszone

Transakcje rozproszone mogą służyć do dzielenia transakcji na wiele heterogenicznych magazynów danych, tak aby operacja została zatwierdzona do wszystkich używanych magazynów danych lub nie została zatwierdzona do żadnego z nich.

Problemy:

Transakcje rozproszone stanowią bardzo duży problem w przypadku heterogenicznych magazynów danych. Ze swej natury mogą polegać jedynie na najniższym wspólnym mianowniku zaangażowanych systemów. Na przykład transakcje XA blokują wykonanie, jeśli proces aplikacji zakończy się niepowodzeniem w fazie przygotowania. Ponadto XA nie zapewnia wykrywania zakleszczeń ani nie obsługuje optymistycznych schematów kontroli współbieżności. Ponadto niektóre systemy, takie jak ElasticSearch, nie obsługują XA ani żadnego innego heterogenicznego modelu transakcji. Zatem zapewnienie atomowości zapisu w różnych technologiach przechowywania danych pozostaje bardzo trudnym zadaniem dla aplikacji [3].

Delta

Delta została zaprojektowana, aby przezwyciężyć ograniczenia istniejących rozwiązań do synchronizacji danych, a także umożliwia wzbogacanie danych w locie. Naszym celem było odciągnięcie od twórców aplikacji wszystkich tych zawiłości, aby mogli w pełni skupić się na wdrażaniu funkcjonalności biznesowych. Następnie opiszemy „Wyszukiwanie filmów”, faktyczny przypadek użycia Delta serwisu Netflix.

Netflix powszechnie korzysta z architektury mikrousług, a każda mikrousługa zazwyczaj obsługuje jeden typ danych. Podstawowe informacje o filmie zawarte są w mikroserwisie o nazwie Movie Service, a powiązane dane, takie jak informacje o producentach, aktorach, sprzedawcach itd., są zarządzane przez kilka innych mikrousług (mianowicie Deal Service, Talent Service i Vendor Service).
Użytkownicy biznesowi w Netflix Studios często muszą wyszukiwać według różnych kryteriów filmów, dlatego bardzo ważna jest dla nich możliwość przeszukiwania wszystkich danych związanych z filmami.

Przed wprowadzeniem rozwiązania Delta zespół zajmujący się wyszukiwaniem filmów musiał pobierać dane z wielu mikrousług przed zindeksowaniem danych filmów. Dodatkowo zespół musiał opracować system, który okresowo aktualizowałby indeks wyszukiwania, żądając zmian od innych mikroserwisów, nawet jeśli w ogóle nie było żadnych zmian. System ten szybko stał się skomplikowany i trudny w utrzymaniu.

Delta: platforma synchronizacji i wzbogacania danych
Rysunek 1. System odpytywania do Delta
Po użyciu systemu Delta system został uproszczony do systemu sterowanego zdarzeniami, jak pokazano na poniższym rysunku. Zdarzenia CDC (Change-Data-Capture) są wysyłane do tematów Keystone Kafka za pomocą złącza Delta. Aplikacja Delta zbudowana przy użyciu Delta Stream Processing Framework (oparta na Flink) odbiera zdarzenia CDC z tematu, wzbogaca je wywołując inne mikrousługi, a na koniec przekazuje wzbogacone dane do indeksu wyszukiwania w Elasticsearch. Cały proces odbywa się niemal w czasie rzeczywistym, czyli po zatwierdzeniu zmian w hurtowni danych następuje aktualizacja indeksów wyszukiwania.

Delta: platforma synchronizacji i wzbogacania danych
Rysunek 2. Potok danych wykorzystujący technologię Delta
W kolejnych rozdziałach opiszemy działanie Delta-Connectora, który łączy się z magazynem i publikuje zdarzenia CDC do warstwy transportowej, będącej infrastrukturą transmisji danych w czasie rzeczywistym, która kieruje zdarzenia CDC do tematów Kafki. A na sam koniec porozmawiamy o frameworku przetwarzania strumieni Delta, który twórcy aplikacji mogą wykorzystać do przetwarzania danych i logiki wzbogacania.

CDC (przechwytywanie danych zmian)

Opracowaliśmy usługę CDC o nazwie Delta-Connector, która może w czasie rzeczywistym przechwytywać zatwierdzone zmiany z magazynu danych i zapisywać je w strumieniu. Zmiany w czasie rzeczywistym są pobierane z dziennika transakcji i zrzutów pamięci. Zrzuty są stosowane, ponieważ dzienniki transakcji zwykle nie przechowują całej historii zmian. Zmiany są zazwyczaj serializowane jako zdarzenia Delta, więc odbiorca nie musi się martwić, skąd pochodzi zmiana.

Delta-Connector obsługuje kilka dodatkowych funkcji, takich jak:

  • Możliwość zapisywania niestandardowych danych wyjściowych w Kafce.
  • Możliwość aktywacji ręcznych zrzutów w dowolnym momencie dla wszystkich tabel, konkretnej tabeli lub dla określonych kluczy podstawowych.
  • Zrzuty można pobierać fragmentami, więc w przypadku awarii nie ma potrzeby rozpoczynania wszystkiego od nowa.
  • Nie ma potrzeby zakładania blokad na tabele, co jest bardzo ważne, aby ruch zapisu do bazy danych nigdy nie był blokowany przez naszą usługę.
  • Wysoka dostępność dzięki nadmiarowym instancjom w Strefach Dostępności AWS.

Obecnie obsługujemy MySQL i Postgres, w tym wdrożenia na AWS RDS i Aurora. Obsługujemy również Cassandrę (multi-master). Więcej szczegółów na temat Delta-Connector można znaleźć tutaj post na blogu.

Kafka i warstwa transportowa

Warstwa transportu zdarzeń Delta jest zbudowana na usłudze przesyłania wiadomości platformy Zwornik.

Historycznie rzecz biorąc, publikowanie w serwisie Netflix było zoptymalizowane pod kątem dostępności, a nie trwałości (patrz poniżej). poprzedni artykuł). Kompromisem była potencjalna niespójność danych brokera w różnych scenariuszach brzegowych. Na przykład, nieczysty wybór przywódcy jest odpowiedzialny za to, że odbiorca może mieć zduplikowane lub utracone wydarzenia.

W przypadku Delta chcieliśmy silniejszych gwarancji trwałości, aby zapewnić dostawę wydarzeń CDC do sklepów pochodnych. W tym celu zaproponowaliśmy specjalnie zaprojektowany klaster Kafki jako obiekt najwyższej klasy. W poniższej tabeli możesz sprawdzić niektóre ustawienia brokera:

Delta: platforma synchronizacji i wzbogacania danych

W klastrach Keystone Kafki, nieczysty wybór przywódcy zwykle dołączane, aby zapewnić dostępność wydawcy. Może to spowodować utratę wiadomości, jeśli na lidera zostanie wybrana niezsynchronizowana replika. W przypadku nowego klastra Kafka o wysokiej dostępności opcja nieczysty wybór przywódcy wyłączone, aby zapobiec utracie wiadomości.

Zwiększyliśmy również współczynnik replikacji od 2 do 3 i minimalne repliki niezsynchronizowane 1 do 2. Wydawcy piszący do tego klastra wymagają potwierdzeń od wszystkich innych, co gwarantuje, że 2 z 3 replik mają najnowsze wiadomości wysłane przez wydawcę.

Po zakończeniu instancji brokera nowa instancja zastępuje starą. Jednak nowy broker będzie musiał nadrobić niezsynchronizowane repliki, co może zająć kilka godzin. Aby skrócić czas odzyskiwania w tym scenariuszu, zaczęliśmy używać blokowego przechowywania danych (Amazon Elastic Block Store) zamiast lokalnych dysków brokerskich. Kiedy nowa instancja zastępuje zakończoną instancję brokera, dołącza wolumin EBS, który posiadała zakończona instancja i zaczyna nadrabiać zaległości w zakresie nowych komunikatów. Ten proces skraca czas usuwania zaległości z godzin do minut, ponieważ nowa instancja nie musi już replikować ze stanu pustego. Ogólnie rzecz biorąc, oddzielne cykle życia pamięci masowej i brokera znacznie zmniejszają wpływ zmiany brokera.

Aby jeszcze bardziej zwiększyć gwarancję dostarczenia danych, zastosowaliśmy system śledzenia wiadomości w celu wykrycia utraty wiadomości w ekstremalnych warunkach (na przykład desynchronizacja zegara w liderze partycji).

Struktura przetwarzania strumieniowego

Warstwa przetwarzania Delta jest zbudowana na platformie Netflix SPAaS, która zapewnia integrację Apache Flink z ekosystemem Netflix. Platforma zapewnia interfejs użytkownika, który zarządza wdrażaniem zadań Flink i orkiestracją klastrów Flink na naszej platformie zarządzania kontenerami Titus. Interfejs zarządza również konfiguracjami zadań i umożliwia użytkownikom dynamiczne wprowadzanie zmian w konfiguracji bez konieczności ponownej kompilacji zadań Flink.

Delta zapewnia platformę przetwarzania strumieniowego opartą na Flink i SPAaS, która wykorzystuje oparte na adnotacjach DSL (język specyficzny dla domeny) w celu uzyskania abstrakcyjnych szczegółów technicznych. Przykładowo, aby zdefiniować krok, w którym zdarzenia będą wzbogacane o wywoływanie usług zewnętrznych, użytkownicy muszą napisać poniższy DSL, a framework na jego podstawie utworzy model, który zostanie wykonany przez Flink.

Delta: platforma synchronizacji i wzbogacania danych
Rysunek 3. Przykład wzbogacenia na DSL w Delcie

Struktura przetwarzania nie tylko skraca czas uczenia się, ale także zapewnia typowe funkcje przetwarzania strumieniowego, takie jak deduplikacja, schematyzacja oraz elastyczność i odporność na rozwiązywanie typowych problemów operacyjnych.

Delta Stream Processing Framework składa się z dwóch kluczowych modułów, modułu DSL i API oraz modułu Runtime. Moduł DSL i API udostępnia interfejsy API DSL i UDF (funkcja zdefiniowana przez użytkownika), dzięki czemu użytkownicy mogą pisać własną logikę przetwarzania (taką jak filtrowanie lub transformacje). Moduł Runtime zapewnia implementację parsera DSL, który buduje wewnętrzną reprezentację etapów przetwarzania w modelach DAG. Komponent Execution interpretuje modele DAG w celu zainicjowania rzeczywistych instrukcji Flink i ostatecznie uruchomienia aplikacji Flink. Architekturę frameworka przedstawiono na poniższym rysunku.

Delta: platforma synchronizacji i wzbogacania danych
Rysunek 4. Architektura Delta Stream Processing Framework

To podejście ma kilka zalet:

  • Użytkownicy mogą skupić się na swojej logice biznesowej bez konieczności zagłębiania się w specyfikę Flink czy struktury SPAaS.
  • Optymalizację można przeprowadzić w sposób przejrzysty dla użytkowników, a błędy można naprawić bez konieczności wprowadzania jakichkolwiek zmian w kodzie użytkownika (UDF).
  • Korzystanie z aplikacji Delta jest uproszczone dla użytkowników, ponieważ platforma zapewnia elastyczność i odporność od razu po wyjęciu z pudełka oraz gromadzi wiele szczegółowych metryk, które można wykorzystać w alertach.

Zastosowanie produkcyjne

Delta jest w produkcji od ponad roku i odgrywa kluczową rolę w wielu aplikacjach Netflix Studio. Pomagała zespołom wdrażać przypadki użycia, takie jak indeksowanie wyszukiwania, przechowywanie danych i przepływy pracy sterowane zdarzeniami. Poniżej znajduje się przegląd architektury wysokiego poziomu platformy Delta.

Delta: platforma synchronizacji i wzbogacania danych
Rysunek 5. Architektura wysokiego poziomu Delta.

Podziękowanie

Chcielibyśmy podziękować następującym osobom, które były zaangażowane w tworzenie i rozwój Delta w Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang i Zhenzhong Xu.

Źródła informacji

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Przetwarzanie wydarzeń online. komuna. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Zapisz się na bezpłatne webinarium: „Narzędzie do tworzenia danych dla magazynu Amazon Redshift.”

Źródło: www.habr.com

Dodaj komentarz