Analisi di 25TB utilizzando AWK e R

Analisi di 25TB utilizzando AWK e R
Come leggere questo articolo: Mi scuso per il testo così lungo e caotico. Per farti risparmiare tempo, inizio ogni capitolo con un'introduzione "Cosa ho imparato", che riassume l'essenza del capitolo in una o due frasi.

"Mostrami solo la soluzione!" Se vuoi solo vedere da dove vengo, passa al capitolo “Diventare più inventivo”, ma penso che sia più interessante e utile leggere del fallimento.

Recentemente mi è stato assegnato il compito di impostare un processo per elaborare un grande volume di sequenze di DNA grezzo (tecnicamente un chip SNP). La necessità era quella di ottenere rapidamente dati su una determinata posizione genetica (chiamata SNP) per la successiva modellazione e altri compiti. Utilizzando R e AWK, sono stato in grado di pulire e organizzare i dati in modo naturale, accelerando notevolmente l'elaborazione delle query. Questo non è stato facile per me e ha richiesto numerose iterazioni. Questo articolo ti aiuterà a evitare alcuni dei miei errori e ti mostrerà cosa ho ottenuto.

Innanzitutto alcune spiegazioni introduttive.

Dati

Il nostro centro universitario di elaborazione delle informazioni genetiche ci ha fornito i dati sotto forma di un TSV da 25 TB. Li ho ricevuti divisi in 5 pacchetti compressi con Gzip, ciascuno contenente circa 240 file da quattro gigabyte. Ogni riga conteneva dati per un SNP di un individuo. In totale sono stati trasmessi dati su circa 2,5 milioni di SNP e circa 60mila persone. Oltre alle informazioni SNP, i file contenevano numerose colonne con numeri che riflettevano varie caratteristiche, come l'intensità della lettura, la frequenza dei diversi alleli, ecc. In totale c'erano circa 30 colonne con valori univoci.

bersaglio

Come per qualsiasi progetto di gestione dei dati, la cosa più importante era determinare come sarebbero stati utilizzati i dati. In questo caso selezioneremo principalmente modelli e flussi di lavoro per SNP basati su SNP. Cioè, avremo bisogno di dati solo su un SNP alla volta. Ho dovuto imparare come recuperare tutti i record associati a uno dei 2,5 milioni di SNP nel modo più semplice, rapido ed economico possibile.

Come non farlo

Per citare un cliché adatto:

Non ho fallito mille volte, ho solo scoperto mille modi per evitare di analizzare un mucchio di dati in un formato facile da interrogare.

Primo tentativo

Cosa ho imparato: Non esiste un modo economico per analizzare 25 TB alla volta.

Dopo aver seguito il corso “Advanced Methods for Big Data Processing” presso la Vanderbilt University, ero sicuro che il trucco fosse nella borsa. Probabilmente ci vorranno un'ora o due per configurare il server Hive in modo che esegua tutti i dati e riporti il ​​risultato. Poiché i nostri dati sono archiviati in AWS S3, ho utilizzato il servizio Athena, che consente di applicare query Hive SQL ai dati S3. Non è necessario configurare/creare un cluster Hive e paghi solo per i dati che cerchi.

Dopo aver mostrato ad Athena i miei dati e il loro formato, ho eseguito alcuni test con query come questa:

select * from intensityData limit 10;

E ha ricevuto rapidamente risultati ben strutturati. Pronto.

Fino a quando non abbiamo provato a utilizzare i dati nel nostro lavoro...

Mi è stato chiesto di estrarre tutte le informazioni SNP su cui testare il modello. Ho eseguito la query:


select * from intensityData 
where snp = 'rs123456';

...e cominciò ad aspettare. Dopo otto minuti e più di 4 TB di dati richiesti, ho ricevuto il risultato. Athena addebita il volume di dati trovati, $ 5 per terabyte. Quindi questa singola richiesta costa $ 20 e otto minuti di attesa. Per eseguire il modello su tutti i dati abbiamo dovuto aspettare 38 anni e pagare 50 milioni di dollari, cosa che ovviamente non era adatta a noi.

È stato necessario utilizzare il Parquet...

Cosa ho imparato: Fai attenzione alla dimensione dei tuoi file Parquet e alla loro organizzazione.

Per prima cosa ho provato a risolvere la situazione convertendo tutti i TSV in Lime per parquet. Sono convenienti per lavorare con insiemi di dati di grandi dimensioni perché le informazioni in essi contenute sono archiviate in forma di colonne: ogni colonna si trova nel proprio segmento di memoria/disco, a differenza dei file di testo, in cui le righe contengono elementi di ciascuna colonna. E se hai bisogno di trovare qualcosa, leggi semplicemente la colonna richiesta. Inoltre, ogni file memorizza un intervallo di valori in una colonna, quindi se il valore che stai cercando non è nell'intervallo della colonna, Spark non perderà tempo a scansionare l'intero file.

Ho eseguito un compito semplice Colla AWS per convertire i nostri TSV in Parquet e inserire i nuovi file in Athena. Ci sono volute circa 5 ore. Ma quando ho eseguito la richiesta, il completamento ha richiesto più o meno lo stesso tempo e un po' meno denaro. Il fatto è che Spark, cercando di ottimizzare l'attività, ha semplicemente decompresso un pezzo TSV e inserirlo nel proprio pezzo Parquet. E poiché ogni blocco era abbastanza grande da contenere l’intero record di molte persone, ogni file conteneva tutti gli SNP, quindi Spark ha dovuto aprire tutti i file per estrarre le informazioni di cui aveva bisogno.

È interessante notare che il tipo di compressione predefinito (e consigliato) di Parquet, scattante, non è divisibile. Pertanto, ogni esecutore era costretto a decomprimere e scaricare l'intero set di dati da 3,5 GB.

Analisi di 25TB utilizzando AWK e R

Capiamo il problema

Cosa ho imparato: L'ordinamento è difficile, soprattutto se i dati sono distribuiti.

Mi è sembrato che ora avessi capito l'essenza del problema. Avevo solo bisogno di ordinare i dati per colonna SNP, non per persone. Quindi diversi SNP verranno archiviati in un blocco di dati separato, quindi la funzione "intelligente" di Parquet "apri solo se il valore è compreso nell'intervallo" si mostrerà in tutto il suo splendore. Sfortunatamente, ordinare miliardi di righe sparse in un cluster si è rivelato un compito difficile.

AWS sicuramente non vuole emettere un rimborso a causa del motivo "Sono uno studente distratto". Dopo aver eseguito l'ordinamento su Amazon Glue, è rimasto in funzione per 2 giorni e si è bloccato.

E il partizionamento?

Cosa ho imparato: Le partizioni in Spark devono essere bilanciate.

Poi mi è venuta l'idea di suddividere i dati nei cromosomi. Ce ne sono 23 (e molti di più se si tiene conto del DNA mitocondriale e delle regioni non mappate).
Ciò ti consentirà di dividere i dati in blocchi più piccoli. Se aggiungi solo una riga alla funzione di esportazione Spark nello script Glue partition_by = "chr", i dati dovrebbero essere divisi in bucket.

Analisi di 25TB utilizzando AWK e R
Il genoma è costituito da numerosi frammenti chiamati cromosomi.

Sfortunatamente, non ha funzionato. I cromosomi hanno dimensioni diverse, il che significa quantità diverse di informazioni. Ciò significa che le attività inviate da Spark ai lavoratori non sono state bilanciate e sono state completate lentamente perché alcuni nodi hanno terminato in anticipo ed erano inattivi. Tuttavia, i compiti sono stati completati. Ma quando si è chiesto un SNP, lo squilibrio ha nuovamente causato problemi. Il costo dell’elaborazione degli SNP sui cromosomi più grandi (cioè dove vogliamo ottenere i dati) è diminuito solo di un fattore 10 circa. Molto, ma non abbastanza.

E se lo dividessimo in parti ancora più piccole?

Cosa ho imparato: Non tentare mai di eseguire 2,5 milioni di partizioni.

Ho deciso di fare il possibile e partizionare ogni SNP. Ciò garantiva che le partizioni fossero di uguali dimensioni. ERA UNA CATTIVA IDEA. Ho usato la colla e ho aggiunto una linea innocente partition_by = 'snp'. L'attività è stata avviata e l'esecuzione è iniziata. Il giorno dopo ho controllato e ho visto che non c'era ancora nulla di scritto su S3, quindi ho interrotto l'attività. Sembra che Glue stesse scrivendo file intermedi in una posizione nascosta in S3, molti file, forse un paio di milioni. Di conseguenza, il mio errore è costato più di mille dollari e non ha soddisfatto il mio mentore.

Partizionamento + ordinamento

Cosa ho imparato: L'ordinamento è ancora difficile, così come l'ottimizzazione di Spark.

Il mio ultimo tentativo di partizionamento prevedeva la suddivisione dei cromosomi e l'ordinamento di ciascuna partizione. In teoria, ciò accelererebbe ogni query perché i dati SNP desiderati dovevano trovarsi entro pochi blocchi Parquet all'interno di un determinato intervallo. Sfortunatamente, anche l'ordinamento dei dati partizionati si è rivelato un compito difficile. Di conseguenza, sono passato a EMR per un cluster personalizzato e ho utilizzato otto potenti istanze (C5.4xl) e Sparklyr per creare un flusso di lavoro più flessibile...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tuttavia, l'attività non è stata ancora completata. L'ho configurato in diversi modi: ho aumentato l'allocazione di memoria per ciascun esecutore di query, ho utilizzato nodi con una grande quantità di memoria, ho utilizzato variabili di trasmissione (variabili di trasmissione), ma ogni volta queste si sono rivelate mezze misure e gradualmente gli esecutori hanno iniziato fallire fino a quando tutto si è fermato.

Sto diventando più creativo

Cosa ho imparato: A volte i dati speciali richiedono soluzioni speciali.

Ogni SNP ha un valore di posizione. Questo è un numero corrispondente al numero di basi lungo il suo cromosoma. Questo è un modo carino e naturale per organizzare i nostri dati. All'inizio volevo suddividere ciascun cromosoma in regioni. Ad esempio, le posizioni 1 - 2000, 2001 - 4000, ecc. Ma il problema è che gli SNP non sono distribuiti uniformemente sui cromosomi, quindi le dimensioni dei gruppi varieranno notevolmente.

Analisi di 25TB utilizzando AWK e R

Di conseguenza, sono arrivato a una suddivisione delle posizioni in categorie (rango). Utilizzando i dati già scaricati, ho eseguito una richiesta per ottenere un elenco di SNP unici, le loro posizioni e cromosomi. Quindi ho ordinato i dati all'interno di ciascun cromosoma e ho raccolto gli SNP in gruppi (bin) di una determinata dimensione. Diciamo 1000 SNP ciascuno. Questo mi ha dato la relazione SNP-gruppo-per-cromosoma.

Alla fine ho realizzato dei gruppi (bin) da 75 SNP, il motivo verrà spiegato di seguito.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Prova prima con Spark

Cosa ho imparato: l'aggregazione di Spark è rapida, ma il partizionamento è comunque costoso.

Volevo leggere questo piccolo frame di dati (2,5 milioni di righe) in Spark, combinarlo con i dati grezzi e quindi partizionarlo in base alla colonna appena aggiunta bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ero solito sdf_broadcast(), quindi Spark sa che dovrebbe inviare il frame di dati a tutti i nodi. Ciò è utile se i dati sono di piccole dimensioni e necessari per tutte le attività. Altrimenti, Spark cerca di essere intelligente e distribuisce i dati secondo necessità, il che può causare rallentamenti.

E ancora, la mia idea non ha funzionato: i compiti hanno funzionato per qualche tempo, hanno completato l'unione e poi, come gli esecutori lanciati dal partizionamento, hanno cominciato a fallire.

Aggiunta di AWK

Cosa ho imparato: Non dormire quando ti vengono insegnate le basi. Sicuramente qualcuno ha già risolto il tuo problema negli anni '1980.

Fino a questo punto, il motivo di tutti i miei insuccessi con Spark era la confusione di dati nel cluster. Forse la situazione può essere migliorata con il pretrattamento. Ho deciso di provare a dividere i dati di testo grezzi in colonne di cromosomi, quindi speravo di fornire a Spark dati "pre-partizionati".

Ho cercato su StackOverflow come dividere per valori di colonna e ho trovato una risposta così fantastica. Con AWK puoi dividere un file di testo in base ai valori delle colonne scrivendolo in uno script anziché inviare i risultati a stdout.

Ho scritto uno script Bash per provarlo. Ho scaricato uno dei TSV nel pacchetto, quindi lo ho decompresso utilizzando gzip e inviato a awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ha funzionato!

Riempimento dei nuclei

Cosa ho imparato: gnu parallel - è una cosa magica, tutti dovrebbero usarla.

La separazione è stata piuttosto lenta e quando ho iniziato htopper verificare l'utilizzo di una potente (e costosa) istanza EC2, si è scoperto che stavo utilizzando un solo core e circa 200 MB di memoria. Per risolvere il problema e non perdere molti soldi, dovevamo capire come parallelizzare il lavoro. Fortunatamente, in un libro assolutamente straordinario Data Science alla riga di comando Ho trovato un capitolo di Jeron Janssens sulla parallelizzazione. Da esso ho imparato a conoscere gnu parallel, un metodo molto flessibile per implementare il multithreading in Unix.

Analisi di 25TB utilizzando AWK e R
Quando ho iniziato il partizionamento utilizzando il nuovo processo, tutto andava bene, ma c'era ancora un collo di bottiglia: il download degli oggetti S3 sul disco non era molto veloce e non era completamente parallelizzato. Per risolvere questo problema, ho fatto questo:

  1. Ho scoperto che è possibile implementare la fase di download di S3 direttamente in pipeline, eliminando completamente lo storage intermedio su disco. Ciò significa che posso evitare di scrivere dati grezzi su disco e utilizzare uno spazio di archiviazione ancora più piccolo, e quindi più economico, su AWS.
  2. squadra aws configure set default.s3.max_concurrent_requests 50 aumentato notevolmente il numero di thread utilizzati da AWS CLI (per impostazione predefinita ce ne sono 10).
  3. Sono passato a un'istanza EC2 ottimizzata per la velocità della rete, con la lettera n nel nome. Ho scoperto che la perdita di potenza di elaborazione quando si utilizzano n istanze è più che compensata dall'aumento della velocità di caricamento. Per la maggior parte delle attività ho utilizzato c5n.4xl.
  4. Cambiato gzip su pigz, questo è uno strumento gzip che può fare cose interessanti per parallelizzare il compito inizialmente non parallelo di decomprimere i file (questo è stato il minimo di aiuto).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Questi passaggi sono combinati tra loro per far funzionare tutto molto rapidamente. Aumentando la velocità di download ed eliminando le scritture su disco, ora potrei elaborare un pacchetto da 5 terabyte in poche ore.

Questo tweet avrebbe dovuto menzionare "TSV". Ahimè.

Utilizzando i dati appena analizzati

Cosa ho imparato: A Spark piacciono i dati non compressi e non piace combinare le partizioni.

Ora i dati erano in S3 in un formato non imballato (leggi: condiviso) e semi-ordinato e potevo tornare nuovamente a Spark. Mi aspettava una sorpresa: ancora una volta non sono riuscito a ottenere ciò che volevo! È stato molto difficile dire a Spark esattamente come fossero partizionati i dati. E anche quando l'ho fatto, si è scoperto che c'erano troppe partizioni (95mila), e quando le ho usate coalesce ridotto il loro numero a limiti ragionevoli, questo ha distrutto il mio partizionamento. Sono sicuro che il problema possa essere risolto, ma dopo un paio di giorni di ricerca non sono riuscito a trovare una soluzione. Alla fine ho completato tutte le attività in Spark, anche se ci è voluto un po' di tempo e i miei file Parquet divisi non erano molto piccoli (~200 KB). Tuttavia, i dati erano dove erano necessari.

Analisi di 25TB utilizzando AWK e R
Troppo piccolo e irregolare, meraviglioso!

Test delle query Spark locali

Cosa ho imparato: Spark ha un sovraccarico eccessivo durante la risoluzione di problemi semplici.

Scaricando i dati in un formato intelligente, ho potuto testare la velocità. Configurare uno script R per eseguire un server Spark locale, quindi caricare un frame di dati Spark dall'archivio del gruppo Parquet specificato (bin). Ho provato a caricare tutti i dati ma non sono riuscito a far riconoscere a Sparklyr il partizionamento.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

L'esecuzione durò 29,415 secondi. Molto meglio, ma non troppo buono per testare in massa qualsiasi cosa. Inoltre, non sono riuscito a velocizzare le cose con la memorizzazione nella cache perché quando provavo a memorizzare nella cache un frame di dati in memoria, Spark si bloccava sempre, anche quando allocavo più di 50 GB di memoria a un set di dati che pesava meno di 15.

Ritorna all'AWK

Cosa ho imparato: Gli array associativi in ​​AWK sono molto efficienti.

Mi sono reso conto che avrei potuto raggiungere velocità più elevate. Me lo sono ricordato in modo meraviglioso Tutorial AWK di Bruce Barnett Ho letto di una funzionalità interessante chiamata "array associativi" Essenzialmente, si tratta di coppie chiave-valore, che per qualche motivo sono state chiamate diversamente in AWK, e quindi in qualche modo non ci ho pensato molto. Chepliaka romana ha ricordato che il termine “array associativi” è molto più antico del termine “coppia chiave-valore”. Anche se tu cerca il valore-chiave in Google Ngram, non vedrai questo termine lì, ma troverai array associativi! Inoltre, la “coppia chiave-valore” è spesso associata ai database, quindi ha molto più senso confrontarla con una hashmap. Mi sono reso conto che avrei potuto utilizzare questi array associativi per associare i miei SNP a una tabella bin e a dati grezzi senza utilizzare Spark.

Per fare questo, nello script AWK ho utilizzato il blocco BEGIN. Si tratta di una parte di codice che viene eseguita prima che la prima riga di dati venga passata al corpo principale dello script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Squadra while(getline...) caricato tutte le righe dal gruppo CSV (bin), imposta la prima colonna (nome SNP) come chiave per l'array associativo bin e il secondo valore (gruppo) come valore. Poi nel blocco { }, che viene eseguito su tutte le righe del file principale, ogni riga viene inviata al file di output, che riceve un nome univoco a seconda del suo gruppo (bin): ..._bin_"bin[$1]"_....

variabili batch_num и chunk_id corrispondeva ai dati forniti dalla pipeline, evitando una condizione di competizione e ogni thread di esecuzione in esecuzione parallel, ha scritto nel proprio file univoco.

Dato che ho sparso tutti i dati grezzi in cartelle sui cromosomi rimasti dal mio precedente esperimento con AWK, ora potrei scrivere un altro script Bash per elaborare un cromosoma alla volta e inviare dati partizionati più in profondità a S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

La sceneggiatura ha due sezioni parallel.

Nella prima sezione, i dati vengono letti da tutti i file contenenti informazioni sul cromosoma desiderato, quindi questi dati vengono distribuiti tra i thread, che distribuiscono i file nei gruppi appropriati (bin). Per evitare condizioni di competizione quando più thread scrivono sullo stesso file, AWK passa i nomi dei file per scrivere i dati in posti diversi, ad es. chr_10_bin_52_batch_2_aa.csv. Di conseguenza, sul disco vengono creati molti piccoli file (per questo ho utilizzato volumi EBS da terabyte).

Trasportatore dalla seconda sezione parallel passa attraverso i gruppi (bin) e combina i singoli file in CSV comune c cate poi li invia per l'esportazione.

Trasmettere in R?

Cosa ho imparato: Puoi contattare stdin и stdout da uno script R e quindi utilizzarlo nella pipeline.

Potresti aver notato questa riga nel tuo script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce tutti i file di gruppo concatenati (bin) nello script R riportato di seguito. {} è una tecnica speciale parallel, che inserisce tutti i dati inviati al flusso specificato direttamente nel comando stesso. Opzione {#} fornisce un ID thread univoco e {%} rappresenta il numero dello slot di lavoro (ripetuto, ma mai simultaneamente). Un elenco di tutte le opzioni è disponibile in documentazione.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Quando una variabile file("stdin") trasmesso a readr::read_csv, i dati tradotti nello script R vengono caricati in un frame, che si trova quindi nel form .rds-file utilizzando aws.s3 scritto direttamente su S3.

RDS è qualcosa come una versione junior di Parquet, senza i fronzoli dello stoccaggio degli altoparlanti.

Dopo aver terminato lo script Bash ho ricevuto un pacchetto .rds-file situati in S3, che mi hanno permesso di utilizzare una compressione efficiente e tipi integrati.

Nonostante l'uso del freno R, tutto ha funzionato molto velocemente. Non sorprende che le parti di R che leggono e scrivono i dati siano altamente ottimizzate. Dopo il test su un cromosoma di medie dimensioni, il lavoro è stato completato su un'istanza C5n.4xl in circa due ore.

Limitazioni S3

Cosa ho imparato: Grazie all'implementazione del percorso intelligente, S3 può gestire molti file.

Ero preoccupato se S3 sarebbe stato in grado di gestire i numerosi file che vi erano stati trasferiti. Potrei dare un senso ai nomi dei file, ma come li cercherebbe S3?

Analisi di 25TB utilizzando AWK e R
Le cartelle in S3 sono solo per mostrare, infatti il ​​sistema non è interessato al simbolo /. Dalla pagina delle domande frequenti su S3.

Sembra che S3 rappresenti il ​​percorso di un particolare file come una semplice chiave in una sorta di tabella hash o database basato su documenti. Un bucket può essere pensato come una tabella e i file possono essere considerati record in quella tabella.

Poiché la velocità e l'efficienza sono importanti per realizzare un profitto su Amazon, non sorprende che questo sistema di percorso chiave come file sia dannatamente ottimizzato. Ho cercato di trovare un equilibrio: in modo da non dover fare molte richieste di get, ma che le richieste venissero eseguite rapidamente. Si è scoperto che è meglio creare circa 20mila file bin. Penso che se continuiamo a ottimizzare, possiamo ottenere un aumento della velocità (ad esempio, creando un bucket speciale solo per i dati, riducendo così la dimensione della tabella di ricerca). Ma non c'erano né tempo né denaro per ulteriori esperimenti.

E la compatibilità incrociata?

Cosa ho imparato: la causa principale della perdita di tempo è l'ottimizzazione prematura del metodo di archiviazione.

A questo punto è molto importante chiedersi: “Perché utilizzare un formato file proprietario?” Il motivo risiede nella velocità di caricamento (il caricamento dei file CSV compressi con gzip richiedeva 7 volte più tempo) e nella compatibilità con i nostri flussi di lavoro. Potrei riconsiderare se R può caricare facilmente file Parquet (o Arrow) senza il caricamento di Spark. Tutti nel nostro laboratorio utilizzano R e, se devo convertire i dati in un altro formato, ho ancora i dati di testo originali, quindi posso semplicemente eseguire nuovamente la pipeline.

Divisione del lavoro

Cosa ho imparato: Non cercare di ottimizzare i lavori manualmente, lascia che sia il computer a farlo.

Ho eseguito il debug del flusso di lavoro su un cromosoma, ora devo elaborare tutti gli altri dati.
Volevo generare diverse istanze EC2 per la conversione, ma allo stesso tempo temevo di ottenere un carico molto sbilanciato tra diversi lavori di elaborazione (proprio come Spark soffriva di partizioni sbilanciate). Inoltre, non ero interessato a generare un'istanza per cromosoma, perché per gli account AWS esiste un limite predefinito di 10 istanze.

Quindi ho deciso di scrivere uno script in R per ottimizzare i lavori di elaborazione.

Innanzitutto, ho chiesto a S3 di calcolare quanto spazio di archiviazione occupava ciascun cromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Poi ho scritto una funzione che prende la dimensione totale, mescola l'ordine dei cromosomi, li divide in gruppi num_jobs e indica quanto sono diverse le dimensioni di tutti i lavori di elaborazione.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Poi ho eseguito mille mescolate usando purrr e ho scelto il migliore.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Quindi mi sono ritrovato con una serie di attività di dimensioni molto simili. Quindi tutto ciò che restava era avvolgere il mio precedente script Bash in un grande loop for. La scrittura di questa ottimizzazione ha richiesto circa 10 minuti. E questo è molto meno di quanto spenderei per creare manualmente attività se fossero sbilanciate. Pertanto, penso di aver avuto ragione con questa ottimizzazione preliminare.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Alla fine aggiungo il comando shutdown:

sudo shutdown -h now

...e tutto ha funzionato! Utilizzando l'AWS CLI, ho generato istanze utilizzando l'opzione user_data ha dato loro gli script Bash dei loro compiti da elaborare. Funzionavano e si spegnevano automaticamente, quindi non dovevo pagare per una potenza di elaborazione aggiuntiva.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Facciamo le valigie!

Cosa ho imparato: L'API dovrebbe essere semplice per motivi di facilità e flessibilità di utilizzo.

Alla fine ho ottenuto i dati nel posto e nella forma giusti. Non restava che semplificare il più possibile il processo di utilizzo dei dati per renderlo più semplice per i miei colleghi. Volevo creare una semplice API per creare richieste. Se in futuro decidessi di passare da .rds alle lime Parquet, questo dovrebbe essere un problema per me, non per i miei colleghi. Per questo ho deciso di creare un pacchetto R interno.

Costruisci e documenta un pacchetto molto semplice contenente solo poche funzioni di accesso ai dati organizzate attorno a una funzione get_snp. Ho anche realizzato un sito web per i miei colleghi pkgdown, in modo che possano vedere facilmente esempi e documentazione.

Analisi di 25TB utilizzando AWK e R

Cache intelligente

Cosa ho imparato: Se i tuoi dati sono ben preparati, la memorizzazione nella cache sarà facile!

Poiché uno dei flussi di lavoro principali applicava lo stesso modello di analisi al pacchetto SNP, ho deciso di utilizzare il binning a mio vantaggio. Quando si trasmettono dati tramite SNP, tutte le informazioni del gruppo (bin) vengono allegate all'oggetto restituito. Cioè, le vecchie query possono (in teoria) accelerare l'elaborazione di nuove query.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Durante la creazione del pacchetto, ho eseguito molti benchmark per confrontare la velocità quando si utilizzavano metodi diversi. Raccomando di non trascurare questo aspetto, perché a volte i risultati sono inaspettati. Per esempio, dplyr::filter era molto più veloce rispetto all'acquisizione di righe utilizzando il filtro basato sull'indicizzazione e il recupero di una singola colonna da un frame di dati filtrato era molto più rapido rispetto all'utilizzo della sintassi di indicizzazione.

Si prega di notare che l'oggetto prev_snp_results contiene la chiave snps_in_bin. Si tratta di un array di tutti gli SNP univoci in un gruppo (bin), che ti consente di verificare rapidamente se disponi già di dati da una query precedente. Inoltre semplifica il ciclo di tutti gli SNP in un gruppo (bin) con questo codice:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Giudizio

Ora possiamo (e abbiamo iniziato a farlo seriamente) eseguire modelli e scenari che prima ci erano inaccessibili. La cosa migliore è che i miei colleghi di laboratorio non devono pensare ad alcuna complicazione. Hanno solo una funzione che funziona.

E anche se il pacchetto risparmia loro i dettagli, ho cercato di rendere il formato dei dati abbastanza semplice da poterlo capire se domani fossi improvvisamente scomparso...

La velocità è aumentata notevolmente. Di solito eseguiamo la scansione di frammenti di genoma funzionalmente significativi. In precedenza, non potevamo farlo (era troppo costoso), ma ora, grazie alla struttura del gruppo (bin) e alla memorizzazione nella cache, una richiesta per un SNP richiede in media meno di 0,1 secondi e l'utilizzo dei dati è così basso che il costo di S3 è una nocciolina.

conclusione

Questo articolo non è affatto una guida. La soluzione si è rivelata individuale e quasi certamente non ottimale. Piuttosto, è un diario di viaggio. Voglio che gli altri capiscano che tali decisioni non appaiono completamente formate nella testa, sono il risultato di tentativi ed errori. Inoltre, se stai cercando un data scientist, tieni presente che l’utilizzo di questi strumenti in modo efficace richiede esperienza e l’esperienza costa denaro. Sono felice di aver avuto i mezzi per pagare, ma molti altri che sanno fare lo stesso lavoro meglio di me non avranno mai l'opportunità per mancanza di soldi nemmeno di provarci.

Gli strumenti per i Big Data sono versatili. Se hai tempo, puoi quasi sicuramente scrivere una soluzione più rapida utilizzando tecniche intelligenti di pulizia, archiviazione ed estrazione dei dati. Alla fine si tratta di un’analisi costi-benefici.

Quello che ho imparato:

  • non esiste un modo economico per analizzare 25 TB alla volta;
  • fai attenzione alle dimensioni dei tuoi file Parquet e alla loro organizzazione;
  • Le partizioni in Spark devono essere bilanciate;
  • In generale, non provare mai a creare 2,5 milioni di partizioni;
  • L'ordinamento è ancora difficile, così come la configurazione di Spark;
  • a volte dati speciali richiedono soluzioni speciali;
  • L'aggregazione di Spark è rapida, ma il partizionamento è comunque costoso;
  • non dormire quando ti insegnano le basi, probabilmente qualcuno ha già risolto il tuo problema negli anni '1980;
  • gnu parallel - questa è una cosa magica, tutti dovrebbero usarla;
  • A Spark piacciono i dati non compressi e non gli piace combinare le partizioni;
  • Spark ha un sovraccarico eccessivo quando risolve problemi semplici;
  • Gli array associativi di AWK sono molto efficienti;
  • Puoi contattare stdin и stdout da uno script R, e quindi utilizzarlo in pipeline;
  • Grazie all'implementazione del percorso intelligente, S3 può elaborare molti file;
  • Il motivo principale della perdita di tempo è l'ottimizzazione prematura del metodo di archiviazione;
  • non cercare di ottimizzare le attività manualmente, lascia che lo faccia il computer;
  • L'API dovrebbe essere semplice per ragioni di facilità e flessibilità di utilizzo;
  • Se i tuoi dati sono ben preparati, la memorizzazione nella cache sarà facile!

Fonte: habr.com

Aggiungi un commento