Delta: piattaforma di sincronizzazione e arricchimento dei dati

In previsione del lancio di un nuovo flusso al ritmo Ingegnere dei dati Abbiamo preparato una traduzione di materiale interessante.

Delta: piattaforma di sincronizzazione e arricchimento dei dati

panoramica

Parleremo di un modello abbastanza popolare in base al quale le applicazioni utilizzano più archivi dati, in cui ciascun archivio viene utilizzato per i propri scopi, ad esempio, per archiviare la forma canonica dei dati (MySQL, ecc.), fornire funzionalità di ricerca avanzate (ElasticSearch, ecc.) .), caching (Memcached, ecc.) e altri. In genere, quando si utilizzano più archivi dati, uno di essi funge da archivio primario e gli altri da archivi derivati. L'unico problema è come sincronizzare questi archivi dati.

Abbiamo esaminato una serie di modelli diversi che tentavano di risolvere il problema della sincronizzazione di più archivi, come doppie scritture, transazioni distribuite, ecc. Tuttavia, questi approcci presentano limitazioni significative in termini di utilizzo nella vita reale, affidabilità e manutenzione. Oltre alla sincronizzazione dei dati, alcune applicazioni necessitano anche di arricchire i dati chiamando servizi esterni.

Delta è stato sviluppato per risolvere questi problemi. Delta fornisce in definitiva una piattaforma coerente e basata sugli eventi per la sincronizzazione e l'arricchimento dei dati.

Soluzioni esistenti

doppia entrata

Per mantenere sincronizzati due archivi dati, è possibile utilizzare la doppia scrittura, che scrive in un archivio e poi scrive nell'altro immediatamente dopo. È possibile ritentare la prima registrazione e interrompere la seconda se la prima fallisce una volta esaurito il numero di tentativi. Tuttavia, i due archivi dati potrebbero non essere sincronizzati se la scrittura nel secondo archivio non riesce. Questo problema viene solitamente risolto creando una procedura di ripristino in grado di ritrasferire periodicamente i dati dal primo archivio al secondo, oppure farlo solo se vengono rilevate differenze nei dati.

Problemi:

L'esecuzione di una procedura di ripristino è un lavoro specifico che non può essere riutilizzato. Inoltre, i dati tra le posizioni di archiviazione rimangono non sincronizzati finché non viene eseguita la procedura di ripristino. La soluzione diventa più complessa se vengono utilizzati più di due archivi dati. Infine, la procedura di ripristino può aggiungere carico all'origine dati originale.

Modifica tabella di registro

Quando si verificano modifiche a un set di tabelle (come l'inserimento, l'aggiornamento e l'eliminazione di un record), i record delle modifiche vengono aggiunti alla tabella di registro come parte della stessa transazione. Un altro thread o processo richiede costantemente eventi dalla tabella di log e li scrive in uno o più archivi dati, se necessario, rimuovendo eventi dalla tabella di log dopo che il record è stato confermato da tutti gli archivi.

Problemi:

Questo modello dovrebbe essere implementato come libreria e, idealmente, senza modificare il codice dell'applicazione che lo utilizza. In un ambiente poliglotta, un’implementazione di tale libreria dovrebbe esistere in qualsiasi lingua necessaria, ma garantire la coerenza delle funzionalità e del comportamento tra le lingue è molto difficile.

Un altro problema risiede nell'ottenere modifiche allo schema nei sistemi che non supportano le modifiche allo schema transazionale [1] [2], come MySQL. Pertanto, il modello di apportare una modifica (ad esempio, una modifica dello schema) e di registrarla a livello transazionale nella tabella del registro delle modifiche non sempre funzionerà.

Transazioni distribuite

Le transazioni distribuite possono essere utilizzate per suddividere una transazione tra più archivi dati eterogenei in modo che l'operazione sia vincolata a tutti gli archivi dati utilizzati o non sia vincolata a nessuno di essi.

Problemi:

Le transazioni distribuite rappresentano un grosso problema per gli archivi dati eterogenei. Per loro natura, possono contare solo sul minimo comune denominatore dei sistemi coinvolti. Ad esempio, le transazioni XA bloccano l'esecuzione se il processo di richiesta fallisce durante la fase di preparazione. Inoltre, XA non fornisce il rilevamento dei deadlock né supporta schemi di controllo della concorrenza ottimistica. Inoltre, alcuni sistemi come ElasticSearch non supportano XA o qualsiasi altro modello di transazione eterogeneo. Pertanto, garantire l’atomicità della scrittura in varie tecnologie di archiviazione dei dati rimane un compito molto impegnativo per le applicazioni [3].

Delta

Delta è stato progettato per risolvere i limiti delle soluzioni di sincronizzazione dei dati esistenti e consente anche l'arricchimento dei dati al volo. Il nostro obiettivo era allontanare tutte queste complessità dagli sviluppatori di applicazioni in modo che potessero concentrarsi completamente sull'implementazione delle funzionalità aziendali. Successivamente descriveremo "Ricerca film", il caso d'uso effettivo per Delta di Netflix.

Netflix utilizza ampiamente un'architettura di microservizi e ciascun microservizio in genere serve un tipo di dati. Le informazioni di base sul film sono contenute in un microservizio chiamato Movie Service e i dati associati come informazioni su produttori, attori, venditori e così via sono gestiti da diversi altri microservizi (vale a dire Deal Service, Talent Service e Vendor Service).
Gli utenti aziendali di Netflix Studios hanno spesso bisogno di effettuare ricerche in base a vari criteri di film, motivo per cui è molto importante per loro poter effettuare ricerche in tutti i dati relativi ai film.

Prima di Delta, il team di ricerca di film doveva estrarre dati da più microservizi prima di indicizzarli. Inoltre, il team ha dovuto sviluppare un sistema che aggiornasse periodicamente l’indice di ricerca richiedendo modifiche ad altri microservizi, anche se non vi fosse alcuna modifica. Questo sistema divenne rapidamente complesso e difficile da mantenere.

Delta: piattaforma di sincronizzazione e arricchimento dei dati
Figura 1. Sistema di polling per Delta
Dopo aver utilizzato Delta, il sistema è stato semplificato in un sistema guidato dagli eventi, come mostrato nella figura seguente. Gli eventi CDC (Change-Data-Capture) vengono inviati agli argomenti Keystone Kafka utilizzando Delta-Connector. Un'applicazione Delta creata utilizzando il Delta Stream Processing Framework (basato su Flink) riceve eventi CDC da un argomento, li arricchisce chiamando altri microservizi e infine passa i dati arricchiti a un indice di ricerca in Elasticsearch. L'intero processo avviene quasi in tempo reale, ovvero non appena vengono apportate modifiche al data warehouse, gli indici di ricerca vengono aggiornati.

Delta: piattaforma di sincronizzazione e arricchimento dei dati
Figura 2. Pipeline di dati che utilizza Delta
Nelle sezioni seguenti descriveremo il funzionamento del Delta-Connector, che si connette allo storage e pubblica gli eventi CDC sul livello di trasporto, che è un'infrastruttura di trasmissione dati in tempo reale che instrada gli eventi CDC verso argomenti Kafka. E alla fine parleremo del framework di elaborazione del flusso Delta, che gli sviluppatori di applicazioni possono utilizzare per l'elaborazione dei dati e la logica di arricchimento.

CDC (Cambia-acquisizione-dati)

Abbiamo sviluppato un servizio CDC chiamato Delta-Connector, in grado di acquisire le modifiche apportate dall'archivio dati in tempo reale e scriverle in un flusso. Le modifiche in tempo reale vengono prese dal registro delle transazioni e dai dump di archiviazione. I dump vengono utilizzati perché i log delle transazioni in genere non memorizzano l'intera cronologia delle modifiche. Le modifiche vengono in genere serializzate come eventi Delta, quindi il destinatario non deve preoccuparsi della provenienza della modifica.

Delta-Connector supporta diverse funzionalità aggiuntive come:

  • Possibilità di scrivere dati di output personalizzati oltre Kafka.
  • Possibilità di attivare dump manuali in qualsiasi momento per tutte le tabelle, una tabella specifica o per chiavi primarie specifiche.
  • I dump possono essere recuperati in blocchi, quindi non è necessario ricominciare tutto da capo in caso di errore.
  • Non è necessario applicare blocchi alle tabelle, il che è molto importante per garantire che il traffico di scrittura del database non venga mai bloccato dal nostro servizio.
  • Disponibilità elevata grazie alle istanze ridondanti nelle zone di disponibilità AWS.

Attualmente supportiamo MySQL e Postgres, incluse le distribuzioni su AWS RDS e Aurora. Supportiamo anche Cassandra (multi-master). Puoi trovare maggiori dettagli su Delta-Connector qui blog.

Kafka e lo strato di trasporto

Il livello di trasporto degli eventi di Delta è basato sul servizio di messaggistica della piattaforma Chiave di volta.

Storicamente, la pubblicazione su Netflix è stata ottimizzata per l'accessibilità piuttosto che per la longevità (vedi sotto). articolo precedente). Il compromesso era la potenziale incoerenza dei dati del broker in vari scenari limite. Per esempio, elezione del leader impuro è responsabile del fatto che il destinatario abbia potenzialmente eventi duplicati o persi.

Con Delta, volevamo garanzie di durabilità più forti per garantire la consegna degli eventi CDC ai negozi derivati. A questo scopo abbiamo proposto un cluster Kafka appositamente progettato come oggetto di prima classe. Puoi consultare alcune impostazioni del broker nella tabella seguente:

Delta: piattaforma di sincronizzazione e arricchimento dei dati

Nei cluster Keystone Kafka, elezione del leader impuro solitamente incluso per garantire l'accessibilità dell'editore. Ciò potrebbe causare la perdita del messaggio se una replica non sincronizzata viene eletta come leader. Per un nuovo cluster Kafka ad alta disponibilità, l'opzione elezione del leader impuro disattivato per evitare la perdita dei messaggi.

Siamo anche aumentati fattore di replicazione dalle 2 alle 3 e repliche minime non sincronizzate Da 1 a 2. Gli editori che scrivono su questo cluster richiedono gli ack di tutti gli altri, garantendo che 2 repliche su 3 dispongano dei messaggi più recenti inviati dall'editore.

Quando un'istanza del broker termina, una nuova istanza sostituisce quella vecchia. Tuttavia, il nuovo broker dovrà mettersi al passo con le repliche non sincronizzate, operazione che potrebbe richiedere diverse ore. Per ridurre i tempi di ripristino per questo scenario, abbiamo iniziato a utilizzare l'archiviazione dei dati a blocchi (Amazon Elastic Block Store) anziché i dischi del broker locale. Quando una nuova istanza sostituisce un'istanza del broker terminata, collega il volume EBS che aveva l'istanza terminata e inizia a recuperare il ritardo con i nuovi messaggi. Questo processo riduce il tempo di eliminazione del backlog da ore a minuti perché la nuova istanza non deve più replicarsi da uno stato vuoto. Nel complesso, cicli di vita separati di storage e broker riducono significativamente l'impatto del cambio di broker.

Per aumentare ulteriormente la garanzia di consegna dei dati, abbiamo utilizzato sistema di tracciamento dei messaggi per rilevare qualsiasi perdita di messaggi in condizioni estreme (ad esempio, la desincronizzazione dell'orologio nel leader della partizione).

Framework di elaborazione del flusso

Il livello di elaborazione di Delta è basato sulla piattaforma Netflix SPaaS, che fornisce l'integrazione di Apache Flink con l'ecosistema Netflix. La piattaforma fornisce un'interfaccia utente che gestisce la distribuzione dei lavori Flink e l'orchestrazione dei cluster Flink sulla nostra piattaforma di gestione dei contenitori Titus. L'interfaccia gestisce inoltre le configurazioni dei lavori e consente agli utenti di apportare modifiche alla configurazione in modo dinamico senza dover ricompilare i lavori Flink.

Delta fornisce un framework di elaborazione del flusso basato su Flink e SPaaS che utilizza basato su annotazioni DSL (Domain Specific Language) per astrarre i dettagli tecnici. Ad esempio, per definire la fase in cui gli eventi verranno arricchiti chiamando servizi esterni, gli utenti dovranno scrivere il seguente DSL e il framework creerà un modello basato su di esso, che verrà eseguito da Flink.

Delta: piattaforma di sincronizzazione e arricchimento dei dati
Figura 3. Esempio di arricchimento su DSL in Delta

Il framework di elaborazione non solo riduce la curva di apprendimento, ma fornisce anche funzionalità comuni di elaborazione del flusso come deduplicazione, schematizzazione, flessibilità e resilienza per risolvere problemi operativi comuni.

Delta Stream Processing Framework è costituito da due moduli chiave, il modulo DSL e API e il modulo Runtime. Il modulo DSL e API fornisce API DSL e UDF (User-Defined-Function) in modo che gli utenti possano scrivere la propria logica di elaborazione (come filtraggi o trasformazioni). Il modulo Runtime fornisce un'implementazione di un parser DSL che crea una rappresentazione interna delle fasi di elaborazione nei modelli DAG. Il componente di esecuzione interpreta i modelli DAG per inizializzare le istruzioni Flink effettive e infine eseguire l'applicazione Flink. L'architettura del framework è illustrata nella figura seguente.

Delta: piattaforma di sincronizzazione e arricchimento dei dati
Figura 4. Architettura del framework di elaborazione Delta Stream

Questo approccio ha diversi vantaggi:

  • Gli utenti possono concentrarsi sulla propria logica aziendale senza dover approfondire le specifiche di Flink o la struttura SPaaS.
  • L'ottimizzazione può essere eseguita in modo trasparente per gli utenti e gli errori possono essere corretti senza richiedere alcuna modifica al codice utente (UDF).
  • L'esperienza dell'applicazione Delta è semplificata per gli utenti perché la piattaforma offre flessibilità e resilienza pronte all'uso e raccoglie una varietà di parametri dettagliati che possono essere utilizzati per gli avvisi.

Uso produttivo

Delta è in produzione da oltre un anno e svolge un ruolo chiave in molte applicazioni Netflix Studio. Ha aiutato i team a implementare casi d'uso come l'indicizzazione della ricerca, l'archiviazione dei dati e flussi di lavoro basati sugli eventi. Di seguito è riportata una panoramica dell'architettura di alto livello della piattaforma Delta.

Delta: piattaforma di sincronizzazione e arricchimento dei dati
Figura 5. Architettura di alto livello di Delta.

Ringraziamenti

Desideriamo ringraziare le seguenti persone coinvolte nella creazione e nello sviluppo di Delta presso Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang e Zhenzhong Xu.

fonti

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-rollback.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Elaborazione di eventi online. Comune. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Iscriviti a un webinar gratuito: "Strumento di creazione dati per lo storage Amazon Redshift."

Fonte: habr.com

Aggiungi un commento