Delta: Platformă de sincronizare și îmbogățire a datelor

În așteptarea lansării unui nou flux la rata Inginer de date Am pregătit o traducere a unui material interesant.

Delta: Platformă de sincronizare și îmbogățire a datelor

Revizuire

Vom vorbi despre un model destul de popular prin care aplicațiile folosesc mai multe depozite de date, unde fiecare magazin este folosit în scopuri proprii, de exemplu, pentru a stoca forma canonică a datelor (MySQL, etc.), pentru a oferi capabilități avansate de căutare (ElasticSearch, etc.) .), stocarea în cache (Memcached etc.) și altele. În mod obișnuit, atunci când se utilizează mai multe depozite de date, unul dintre ele acționează ca magazin principal, iar ceilalți ca depozite derivate. Singura problemă este cum să sincronizați aceste depozite de date.

Am analizat o serie de modele diferite care au încercat să rezolve problema sincronizării mai multor magazine, cum ar fi scrierile duble, tranzacțiile distribuite etc. Cu toate acestea, aceste abordări au limitări semnificative în ceea ce privește utilizarea reală, fiabilitatea și întreținerea. Pe lângă sincronizarea datelor, unele aplicații trebuie și să îmbogățească datele apelând servicii externe.

Delta a fost dezvoltat pentru a rezolva aceste probleme. Delta oferă în cele din urmă o platformă consistentă, bazată pe evenimente, pentru sincronizarea și îmbogățirea datelor.

Soluții existente

Intrare dubla

Pentru a menține sincronizate două depozite de date, puteți utiliza scrierea duală, care scrie într-un magazin și apoi scrie în celălalt imediat după aceea. Prima înregistrare poate fi reîncercată, iar a doua poate fi anulată dacă prima eșuează după ce numărul de încercări a fost epuizat. Cu toate acestea, cele două depozite de date pot deveni desincronizate dacă scrierea în al doilea depozit eșuează. Această problemă este de obicei rezolvată prin crearea unei proceduri de recuperare care poate retransfera periodic date de la prima stocare la a doua, sau face acest lucru numai dacă sunt detectate diferențe în date.

Probleme:

Efectuarea unei proceduri de recuperare este o sarcină specifică care nu poate fi refolosită. În plus, datele dintre locațiile de stocare rămân nesincronizate până când are loc procedura de restaurare. Soluția devine mai complexă dacă sunt utilizate mai mult de două depozite de date. În cele din urmă, procedura de restaurare poate adăuga încărcare sursei de date originale.

Schimbați tabelul de jurnal

Când apar modificări la un set de tabele (cum ar fi inserarea, actualizarea și ștergerea unei înregistrări), înregistrările modificărilor sunt adăugate la tabelul de jurnal ca parte a aceleiași tranzacții. Un alt fir sau proces solicită în mod constant evenimente din tabelul de jurnal și le scrie într-unul sau mai multe depozite de date, dacă este necesar, eliminând evenimentele din tabelul de jurnal după ce înregistrarea a fost confirmată de toate magazinele.

Probleme:

Acest model ar trebui implementat ca o bibliotecă și, în mod ideal, fără a schimba codul aplicației care îl folosește. Într-un mediu poliglot, o implementare a unei astfel de biblioteci ar trebui să existe în orice limbă necesară, dar asigurarea coerenței funcționalității și comportamentului între limbi este foarte dificilă.

O altă problemă constă în obținerea modificărilor de schemă în sistemele care nu acceptă modificări de schemă tranzacțională [1][2], cum ar fi MySQL. Prin urmare, modelul de a face o modificare (de exemplu, o modificare a schemei) și de a o înregistra tranzacțional în tabelul de jurnal de modificări nu va funcționa întotdeauna.

Tranzacții distribuite

Tranzacțiile distribuite pot fi utilizate pentru a împărți o tranzacție în mai multe depozite de date eterogene, astfel încât operația fie să fie angajată în toate depozitele de date utilizate, fie să nu fie angajată în niciunul dintre ele.

Probleme:

Tranzacțiile distribuite reprezintă o problemă foarte mare pentru depozitele de date eterogene. Prin natura lor, ei se pot baza doar pe cel mai mic numitor comun al sistemelor implicate. De exemplu, tranzacțiile XA blochează execuția dacă procesul de aplicare eșuează în timpul fazei de pregătire. În plus, XA nu oferă detectarea blocajului și nu acceptă scheme optimiste de control al concurenței. În plus, unele sisteme precum ElasticSearch nu acceptă XA sau orice alt model de tranzacție eterogen. Astfel, asigurarea atomicității scrierii în diverse tehnologii de stocare a datelor rămâne o sarcină foarte dificilă pentru aplicații [3].

Deltă

Delta a fost conceput pentru a aborda limitările soluțiilor existente de sincronizare a datelor și, de asemenea, permite îmbogățirea datelor din mers. Scopul nostru a fost să abstragem toate aceste complexități de la dezvoltatorii de aplicații, astfel încât aceștia să se poată concentra pe deplin pe implementarea funcționalității de afaceri. În continuare, vom descrie „Movie Search”, cazul real de utilizare pentru Delta de la Netflix.

Netflix folosește pe scară largă o arhitectură de microservicii și fiecare microserviciu servește de obicei un tip de date. Informațiile de bază despre film sunt conținute într-un microserviciu numit Movie Service, iar datele asociate, cum ar fi informații despre producători, actori, vânzători și așa mai departe, sunt gestionate de alte câteva microservicii (și anume Deal Service, Talent Service și Vendor Service).
Utilizatorii de afaceri de la Netflix Studios trebuie adesea să caute pe diferite criterii de film, motiv pentru care este foarte important pentru ei să poată căuta în toate datele legate de film.

Înainte de Delta, echipa de căutare a filmelor trebuia să extragă date din mai multe microservicii înainte de a indexa datele filmului. În plus, echipa a trebuit să dezvolte un sistem care să actualizeze periodic indexul de căutare solicitând modificări de la alte microservicii, chiar dacă nu au existat modificări deloc. Acest sistem a devenit rapid complex și dificil de întreținut.

Delta: Platformă de sincronizare și îmbogățire a datelor
Figura 1. Sistemul de votare către Delta
După utilizarea Delta, sistemul a fost simplificat la un sistem bazat pe evenimente, așa cum se arată în figura următoare. Evenimentele CDC (Change-Data-Capture) sunt trimise la subiectele Keystone Kafka folosind Delta-Connector. O aplicație Delta construită folosind Cadrul de procesare Delta Stream (bazat pe Flink) primește evenimente CDC dintr-un subiect, le îmbogățește apelând alte microservicii și în cele din urmă transmite datele îmbogățite unui index de căutare în Elasticsearch. Întregul proces are loc aproape în timp real, adică de îndată ce modificările sunt efectuate în depozitul de date, indicii de căutare sunt actualizați.

Delta: Platformă de sincronizare și îmbogățire a datelor
Figura 2. Conducta de date folosind Delta
În secțiunile următoare, vom descrie funcționarea Delta-Connector, care se conectează la stocare și publică evenimente CDC la stratul de transport, care este o infrastructură de transmisie a datelor în timp real care direcționează evenimentele CDC către subiectele Kafka. Și la sfârșit, vom vorbi despre cadrul de procesare a fluxului Delta, pe care dezvoltatorii de aplicații îl pot folosi pentru procesarea datelor și logica de îmbogățire.

CDC (modificare-capturare-date)

Am dezvoltat un serviciu CDC numit Delta-Connector, care poate captura modificările comise din depozitul de date în timp real și le poate scrie într-un flux. Modificările în timp real sunt preluate din jurnalul de tranzacții și din depozitele de stocare. Dump-urile sunt folosite deoarece jurnalele de tranzacții de obicei nu stochează întregul istoric al modificărilor. Modificările sunt de obicei serializate ca evenimente Delta, astfel încât destinatarul nu trebuie să-și facă griji de unde vine schimbarea.

Delta-Connector acceptă mai multe funcții suplimentare, cum ar fi:

  • Abilitatea de a scrie date personalizate de ieșire dincolo de Kafka.
  • Posibilitatea de a activa în orice moment depozitele manuale pentru toate tabelele, un anumit tabel sau pentru anumite chei primare.
  • Dumpsele pot fi recuperate în bucăți, deci nu este nevoie să începeți totul din nou în caz de eșec.
  • Nu este nevoie să plasați blocări pe tabele, ceea ce este foarte important pentru a vă asigura că traficul de scriere a bazei de date nu este niciodată blocat de serviciul nostru.
  • Disponibilitate ridicată datorită instanțelor redundante din zonele de disponibilitate AWS.

În prezent, acceptăm MySQL și Postgres, inclusiv implementări pe AWS RDS și Aurora. O susținem și pe Cassandra (multi-master). Puteți afla mai multe detalii despre Delta-Connector aici postare pe blog.

Kafka și stratul de transport

Stratul de transport de evenimente al Delta este construit pe serviciul de mesagerie al platformei Keystone.

Din punct de vedere istoric, postarea pe Netflix a fost optimizată mai degrabă pentru accesibilitate decât pentru longevitate (vezi mai jos). articolul anterior). Compensația a fost o potențială inconsecvență a datelor brokerului în diferite scenarii marginale. De exemplu, alegere necurată de lider este responsabil pentru ca destinatarul să aibă evenimente dublate sau pierdute.

Cu Delta, ne-am dorit garanții de durabilitate mai puternice pentru a asigura livrarea evenimentelor CDC către magazinele derivate. În acest scop, am propus un cluster Kafka special conceput ca obiect de primă clasă. Puteți vedea câteva setări ale brokerului din tabelul de mai jos:

Delta: Platformă de sincronizare și îmbogățire a datelor

În clusterele Keystone Kafka, alegere necurată de lider de obicei incluse pentru a asigura accesibilitatea editorilor. Acest lucru poate duce la pierderea mesajului dacă o replică nesincronizată este aleasă ca lider. Pentru un nou cluster Kafka de înaltă disponibilitate, opțiunea alegere necurată de lider oprit pentru a preveni pierderea mesajului.

Am crescut și noi factor de replicare de la 2 la 3 și replici nesincronizate minime 1 la 2. Editorii care scriu în acest cluster necesită ack-uri de la toate celelalte, asigurându-se că 2 din 3 replici au cele mai recente mesaje trimise de editor.

Când o instanță de broker se încheie, o instanță nouă o înlocuiește pe cea veche. Cu toate acestea, noul broker va trebui să ajungă din urmă cu replicile nesincronizate, ceea ce poate dura câteva ore. Pentru a reduce timpul de recuperare pentru acest scenariu, am început să folosim stocarea datelor în bloc (Amazon Elastic Block Store) în loc de discuri de broker local. Când o instanță nouă înlocuiește o instanță de broker terminată, aceasta atașează volumul EBS pe care îl avea instanța încheiată și începe să ajungă din urmă cu mesaje noi. Acest proces reduce timpul de eliminare a restanțelor de la ore la minute, deoarece noua instanță nu mai trebuie să se repete dintr-o stare goală. În general, stocarea separată și ciclurile de viață ale brokerului reduc semnificativ impactul schimbării brokerului.

Pentru a crește și mai mult garanția de livrare a datelor, am folosit sistem de urmărire a mesajelor pentru a detecta orice pierdere de mesaje în condiții extreme (de exemplu, desincronizarea ceasului în liderul partiției).

Cadrul de procesare a fluxului

Stratul de procesare al Delta este construit pe platforma Netflix SPaaS, care asigură integrarea Apache Flink cu ecosistemul Netflix. Platforma oferă o interfață cu utilizatorul care gestionează implementarea joburilor Flink și orchestrarea clusterelor Flink pe platforma noastră de gestionare a containerelor Titus. Interfața gestionează, de asemenea, configurațiile job-urilor și permite utilizatorilor să facă modificări de configurare în mod dinamic, fără a fi nevoie să recompileze joburile Flink.

Delta oferă un cadru de procesare a fluxului bazat pe Flink și SPaaS care utilizează bazate pe adnotări DSL (Domain Specific Language) pentru a rezuma detalii tehnice. De exemplu, pentru a defini pasul la care evenimentele vor fi îmbogățite prin apelarea serviciilor externe, utilizatorii trebuie să scrie următorul DSL, iar cadrul va crea un model bazat pe acesta, care va fi executat de Flink.

Delta: Platformă de sincronizare și îmbogățire a datelor
Figura 3. Exemplu de îmbogățire pe DSL în Delta

Cadrul de procesare nu numai că reduce curba de învățare, dar oferă și caracteristici comune de procesare a fluxului, cum ar fi deduplicarea, schematizarea și flexibilitatea și rezistența pentru a rezolva probleme operaționale comune.

Cadrul de procesare Delta Stream este format din două module cheie, modulul DSL și API și modulul Runtime. Modulul DSL și API oferă API-uri DSL și UDF (User-Defined-Function), astfel încât utilizatorii să își poată scrie propria logică de procesare (cum ar fi filtrarea sau transformările). Modulul Runtime oferă o implementare a unui parser DSL care construiește o reprezentare internă a pașilor de procesare în modelele DAG. Componenta de execuție interpretează modelele DAG pentru a inițializa instrucțiunile Flink reale și în cele din urmă a rula aplicația Flink. Arhitectura cadrului este ilustrată în figura următoare.

Delta: Platformă de sincronizare și îmbogățire a datelor
Figura 4. Arhitectura cadrului de procesare Delta Stream

Această abordare are mai multe avantaje:

  • Utilizatorii se pot concentra pe logica lor de afaceri fără a fi nevoiți să se aprofundeze în specificul Flink sau al structurii SPaaS.
  • Optimizarea poate fi realizată într-un mod care este transparent pentru utilizatori, iar erorile pot fi remediate fără a necesita modificări ale codului utilizatorului (UDF).
  • Experiența aplicației Delta este simplificată pentru utilizatori, deoarece platforma oferă flexibilitate și rezistență imediată și colectează o varietate de metrici detaliate care pot fi utilizate pentru alerte.

Utilizarea în producție

Delta este în producție de peste un an și joacă un rol cheie în multe aplicații Netflix Studio. Ea a ajutat echipele să implementeze cazuri de utilizare, cum ar fi indexarea căutărilor, stocarea datelor și fluxurile de lucru bazate pe evenimente. Mai jos este o prezentare generală a arhitecturii de nivel înalt a platformei Delta.

Delta: Platformă de sincronizare și îmbogățire a datelor
Figura 5. Arhitectura de nivel înalt a Delta.

Mulțumiri

Dorim să mulțumim următoarelor persoane care s-au implicat în crearea și dezvoltarea Delta la Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang și Zhenzhong Xu.

surse

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Procesarea online a evenimentelor. comun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Înscrieți-vă pentru un webinar gratuit: „Instrument de creare a datelor pentru stocarea Amazon Redshift.”

Sursa: www.habr.com

Adauga un comentariu