Analisi di 25 TB cù AWK è R

Analisi di 25 TB cù AWK è R
Cumu leghje stu articulu: Scusate chì u testu hè cusì longu è caòticu. Per risparmià u tempu, aghju principiatu ogni capitulu cù una introduzione "Ciò chì aghju amparatu", chì riassume l'essenza di u capitulu in una o duie frasi.

"Mostrami solu a suluzione!" Se vulete vede da induve sò vinutu, saltate à u capitulu "Diventà più inventiva", ma pensu chì hè più interessante è utile per leghje nantu à u fallimentu.

Aghju statu recentemente incaricatu di stabilisce un prucessu per processà un grande volume di sequenze di DNA crudu (tecnicamente un chip SNP). U bisognu era di ottene rapidamente dati nantu à un locu geneticu determinatu (chjamatu SNP) per a mudeli sussegwente è altre attività. Utilizendu R è AWK, aghju pussutu pulizziari è urganizà e dati in modu naturali, accelendu assai l'elaborazione di e dumande. Questu ùn era micca faciule per mè è necessitava numerosi iterazioni. Questu articulu vi aiuterà à evità alcuni di i mo sbaglii è vi mostrarà ciò chì aghju finitu.

Prima, alcune spiegazioni introduttive.

dati

U nostru centru di trattamentu di l'infurmazione genetica universitaria ci hà furnitu dati in forma di un TSV 25 TB. L'aghju ricivutu divisu in 5 pacchetti compressi in Gzip, ognunu cuntene circa 240 schedarii di quattru gigabyte. Ogni fila cuntene dati per un SNP da un individuu. In totale, i dati nantu à ~ 2,5 milioni di SNP è ~ 60 mila persone sò stati trasmessi. In più di l'infurmazioni SNP, i schedari cuntenenu numerosi culonni cù numeri chì riflettenu diverse caratteristiche, cum'è l'intensità di lettura, a frequenza di diversi alleli, etc. In totale ci era circa 30 colonne cù valori unichi.

Goal

Cum'è cù qualsiasi prughjettu di gestione di dati, u più impurtante era di determinà cumu si usanu e dati. In stu casu selezziunemu soprattuttu mudelli è flussi di travagliu per SNP basatu annantu à SNP. Vale à dì, avemu bisognu di dati solu nantu à un SNP à tempu. Aviu avutu à amparà à ricuperà tutti i registri assuciati cù unu di i 2,5 milioni di SNP cum'è facilmente, rapidamente è ecunomicu pussibule.

Cumu ùn fà micca questu

Per cita un cliché adattatu:

Ùn aghju micca fallutu mille volte, aghju solu scupertu mille manere per evità di analizà una mansa di dati in un formatu amichevule à query.

Prima pruvà

Chì aghju amparatu: Ùn ci hè micca un modu economicu per analizà 25 TB à tempu.

Dopu avè pigliatu u corsu "Metudi Avanzati per u Trattamentu di Big Data" à l'Università Vanderbilt, era sicuru chì u truccu era in u saccu. Ci hà da piglià prubabilmente una ora o duie per stallà u servitore Hive per eseguisce tutte e dati è rappurtate u risultatu. Siccomu i nostri dati sò almacenati in AWS S3, aghju utilizatu u serviziu Athena, chì vi permette di applicà e dumande SQL Hive à i dati S3. Ùn avete bisognu di stallà / crià un cluster Hive, è ancu pagà solu per i dati chì vo circate.

Dopu avè dimustratu à Athena i mo dati è u so furmatu, aghju eseguitu qualchi teste cù dumande cum'è questu:

select * from intensityData limit 10;

È hà ricevutu rapidamente risultati ben strutturati. Pronta.

Finu à pruvà à aduprà e dati in u nostru travagliu ...

Hè statu dumandatu à tirà tutte l'infurmazioni SNP per pruvà u mudellu. Aghju fattu a dumanda:


select * from intensityData 
where snp = 'rs123456';

... è cuminciò à aspittà. Dopu ottu minuti è più di 4 TB di dati dumandati, aghju ricevutu u risultatu. Athena carica per u voluminu di dati truvati, $ 5 per terabyte. Allora sta dumanda unica costa $ 20 è ottu minuti d'attesa. Per eseguisce u mudellu nantu à tutte e dati, duvemu aspittà 38 anni è pagà $ 50. Ovviamente, questu ùn era micca adattatu per noi.

Era necessariu aduprà Parquet...

Chì aghju amparatu: Attenti cù a dimensione di i vostri schedari Parquet è a so urganizazione.

Prima aghju pruvatu à riparà a situazione cunvertisce tutti i TSV in File di parquet. Sò cunvenuti per travaglià cù grande setti di dati, perchè l'infurmazioni in elli sò almacenati in forma di colonna: ogni colonna si trova in u so propiu segmentu di memoria / discu, in cuntrastu à i schedarii di testu, in quale fila cuntene elementi di ogni colonna. È s'ellu ci vole à truvà qualcosa, allora basta à leghje a colonna necessaria. Inoltre, ogni fugliale guarda un intervallu di valori in una colonna, dunque se u valore chì cercate ùn hè micca in a gamma di a colonna, Spark ùn perde micca u tempu à scansà tuttu u schedariu.

Aghju fattu un compitu simplice Cola AWS per cunvertisce i nostri TSV in Parquet è abbandunò i novi schedari in Athena. Pigliò circa 5 ore. Ma quandu aghju eseguitu a dumanda, hà pigliatu circa a listessa quantità di tempu è un pocu menu di soldi per compie. U fattu hè chì Spark, circannu à ottimisà u compitu, simpricimenti unpacked un chunk TSV è mette lu in u so propriu chunk Parquet. È perchè ogni pezzu era abbastanza grande per cuntene i registri sanu di parechje persone, ogni schedariu cuntene tutti i SNP, cusì Spark hà avutu à apre tutti i schedari per estrae l'infurmazioni necessarii.

Curiosamente, u tipu di compressione predeterminatu (è cunsigliatu) di Parquet, snappy, ùn hè micca splittable. Per quessa, ogni esecutore era chjappu nantu à u compitu di unpacking è di scaricamentu di u cumpletu di 3,5 GB di dataset.

Analisi di 25 TB cù AWK è R

Capemu u prublema

Chì aghju amparatu: Sorting hè difficiule, soprattuttu se i dati sò distribuiti.

Mi paria chì avà aghju capitu l'essenza di u prublema. Aviu solu bisognu di sorte e dati per colonna SNP, micca da persone. Allora parechji SNP seranu guardati in un pezzu di dati separatu, è dopu a funzione "intelligente" di Parquet "apre solu s'ellu u valore hè in u range" si mostrarà in tutta a so gloria. Sfurtunatamente, a classificazione di miliardi di fila spargugliati in un cluster hè statu un compitu difficiule.

AWS definitivamente ùn vole micca emette un rimborsu per via di a ragione "Sò un studiente distrattu". Dopu chì aghju rializatu a classificazione nantu à Amazon Glue, hà currettu per 2 ghjorni è s'hè lampatu.

E partizioni ?

Chì aghju amparatu: Partizioni in Spark deve esse equilibratu.

Allora aghju avutu l'idea di particionà e dati in cromusomi. Ci sò 23 d'elli (è parechji più si pigliate in contu l'ADN mitocondriale è e regioni senza mappa).
Questu permetterà di sparte i dati in pezzi più chjuchi. Se aghjunghje una sola linea à a funzione d'esportazione Spark in u script Glue partition_by = "chr", allura i dati deve esse divisu in buckets.

Analisi di 25 TB cù AWK è R
U genoma hè custituitu da numerosi frammenti chjamati cromusomi.

Sfurtunatamente, ùn hà micca travagliatu. I cromosomi anu diverse dimensioni, chì significheghja diverse quantità di informazioni. Questu significa chì i travaglii chì Spark hà mandatu à i travagliadori ùn sò micca stati equilibrati è cumpleti lentamente perchè certi nodi anu finitu prima è eranu inattivi. Tuttavia, i travaglii sò stati cumpletati. Ma quandu dumandava un SNP, u sbilanciamentu hà causatu di novu prublemi. U costu di trasfurmà SNP nantu à cromusomi più grande (vale à dì, induve vulemu ottene dati) hè solu diminuitu da circa un fattore di 10. Assai, ma micca abbastanza.

Et si l'on divise en parties encore plus petites ?

Chì aghju amparatu: Ùn pruvate mai di fà 2,5 milioni di partizioni in tuttu.

Aghju decisu di andà in tuttu è sparte ogni SNP. Questu hà assicuratu chì e partizioni eranu di uguali dimensione. ERA UNA GATTIVA IDEA. Aghju utilizatu Glue è aghjunse una linea innocente partition_by = 'snp'. U compitu principia è cuminciò à eseguisce. Un ghjornu dopu aghju verificatu è vistu chì ùn ci era ancu nunda di scrittu à S3, cusì aghju tombu u compitu. Sembra chì Glue scriveva i schedarii intermedi in un locu oculatu in S3, assai schedari, forsi un paru di milioni. In u risultatu, u mo sbagliu costava più di mille dollari è ùn hà micca piace à u mo mentore.

Spartizione + ordinamentu

Chì aghju amparatu: Sorting hè sempre difficiule, cum'è tuning Spark.

U mo ultimu tentativu di spartizione m'hà implicatu à particionà i cromusomi è dopu à sorte ogni partizione. In tiurìa, chistu accellerà ogni dumanda perchè i dati SNP desiderati duvia esse in uni pochi di pezzi di Parquet in un intervallu datu. Sfurtunatamente, l'ordine ancu di dati partizionati hè statu un compitu difficiule. In u risultatu, aghju cambiatu à EMR per un cluster persunalizatu è utilizatu ottu istanze putenti (C5.4xl) è Sparklyr per creà un flussu di travagliu più flexible...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... in ogni modu, u compitu ùn era micca finitu. L'aghju cunfiguratu in diverse manere: hà aumentatu l'allocazione di memoria per ogni esecutore di query, utilizatu nodi cù una grande quantità di memoria, utilizatu variabili di broadcast (variabili di trasmissione), ma ogni volta questi sò diventati a mità di misure, è gradualmente l'esecutori cuminciaru. per fallu finu à chì tuttu si ferma.

Sò diventatu più creativa

Chì aghju amparatu: Calchì volta dati spiciali richiede suluzioni spiciali.

Ogni SNP hà un valore di pusizioni. Questu hè un numeru chì currisponde à u numeru di basi longu u so cromusomu. Questu hè un modu bellu è naturali per urganizà i nostri dati. À u principiu, vulia sparte per regioni di ogni cromusomu. Per esempiu, pusizioni 1 - 2000, 2001 - 4000, etc. Ma u prublema hè chì i SNP ùn sò micca distribuiti uniformemente in i cromusomi, per quessa, e dimensioni di u gruppu varieranu assai.

Analisi di 25 TB cù AWK è R

In u risultatu, sò ghjuntu à una ripartizione di pusizioni in categurie (rank). Utilizendu e dati scaricati, aghju fattu una dumanda per ottene una lista di SNP unichi, e so pusizioni è i cromusomi. Allora aghju ordinatu i dati in ogni cromusomu è cullucatu SNP in gruppi (bin) di una certa dimensione. Diciamu 1000 SNP ognunu. Questu m'hà datu a relazione SNP-à-gruppu-per-cromusomu.

In fine, aghju fattu gruppi (bin) di 75 SNP, u mutivu serà spiegatu quì sottu.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Prima pruvate cù Spark

Chì aghju amparatu: L'aggregazione di Spark hè rapida, ma a partizione hè sempre caru.

Vuliu leghje stu picculu quadru di dati (2,5 milioni di file) in Spark, combina cù i dati crudi, è poi particionà da a colonna appena aghjuntu. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Aghju utilizatu sdf_broadcast(), cusì Spark sapi chì deve mandà u quadru di dati à tutti i nodi. Questu hè utile se i dati sò chjuchi in grandezza è necessariu per tutti i travaglii. Altrimenti, Spark prova à esse intelligente è distribuisce e dati cum'è necessariu, chì pò causà rallentamenti.

E dinò, a mo idea ùn hà micca travagliatu: i travaglii anu travagliatu per qualchì tempu, cumpletu l'unione, è dopu, cum'è l'esecutori lanciati da partizioni, cuminciaru à fallu.

Aghjunghjendu AWK

Chì aghju amparatu: Ùn dorme micca quandu vi sò insignati i principii. Di sicuru, qualchissia hà digià risoltu u vostru prublema in l'anni 1980.

Finu à questu puntu, u mutivu di tutti i mo fallimenti cù Spark era a cunfusione di dati in u cluster. Forsi a situazione pò esse migliurata cù pre-trattamentu. Aghju decisu di pruvà à sparte i dati di testu crudu in culonni di cromusomi, cusì sperava di furnisce Spark cù dati "pre-partizione".

Aghju cercatu in StackOverflow cumu si sparte per valori di colonna è trovu una risposta cusì grande. Cù AWK pudete sparte un schedariu di testu per valori di colonna scrivendulu in un script invece di mandà i risultati à stdout.

Aghju scrittu un script Bash per pruvà. Scaricatu unu di i TSV imballati, dopu sbulicà utilizendu gzip è mandatu à awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Hà travagliatu!

Riempite i core

Chì aghju amparatu: gnu parallel - hè una cosa magica, ognunu deve aduprà.

A separazione era abbastanza lenta è quandu aghju cuminciatu htopper verificà l'usu di un putente (è caru) istanza EC2, hè stata chì aghju utilizatu solu un core è circa 200 MB di memoria. Per risolviri u prublema è ùn perde micca assai soldi, avemu avutu à capisce cumu si paralleli u travagliu. Fortunatamente, in un libru assolutamente maravigghiusu Data Science à a Linea di Command Aghju trovu un capitulu di Jeron Janssens nantu à a parallelizazione. Da ellu aghju amparatu gnu parallel, un metudu assai flexible per implementà multithreading in Unix.

Analisi di 25 TB cù AWK è R
Quandu aghju principiatu a particione cù u novu prucessu, tuttu era bè, ma ci era sempre un collu di buttiglia - a scaricamentu di l'uggetti S3 à u discu ùn era micca assai veloce è micca cumpletamente parallelizatu. Per riparà questu, aghju fattu questu:

  1. Aghju scupertu chì hè pussibule implementà u stadiu di scaricamentu S3 direttamente in u pipeline, eliminendu cumplettamente l'almacenamiento intermediu nantu à u discu. Questu significa chì possu evità di scrive dati crudi à u discu è aduprà un almacenamentu ancu più chjucu, è dunque più prezzu, in AWS.
  2. squadra aws configure set default.s3.max_concurrent_requests 50 hà aumentatu assai u numeru di fili chì AWS CLI usa (per difettu ci sò 10).
  3. Aghju cambiatu à una istanza EC2 ottimizzata per a velocità di a rete, cù a lettera n in u nome. Aghju trovu chì a perdita di putenza di trasfurmazioni quandu si usa n-istanze hè più cà compensata da l'aumentu di a velocità di carica. Per a maiò parte di i travaglii aghju utilizatu c5n.4xl.
  4. Cambiatu gzip nantu pigz, Questu hè un strumentu gzip chì pò fà cose cool per parallelizà u compitu inizialmente micca parallelizatu di decompressing files (questu hà aiutatu u minimu).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Sti passi sò cumminati cù l'altri per fà tuttu u travagliu assai rapidamente. Aumentendu a velocità di scaricamentu è eliminendu e scrittura di discu, puderia avà processà un pacchettu di 5 terabyte in pocu ore.

Stu tweet duveria esse citatu "TSV". Aiò.

Utilizà i dati appena analizati

Chì aghju amparatu: A Spark li piace i dati senza cumpressione è ùn piace micca cumminà partizioni.

Avà i dati eranu in S3 in un formatu unpacked (lettu: spartutu) è semi-urdinatu, è puderia turnà à Spark di novu. Una sorpresa m'aspittava : ùn aghju micca riesciutu à ghjunghje à ciò chì vulia ! Era assai difficiuli di dì à Spark esattamente cumu i dati sò stati partizionati. E ancu quandu aghju fattu questu, hè risultatu chì ci era troppu partizioni (95 mila), è quandu aghju utilizatu coalesce riduciutu u so numeru à limiti ragiunate, questu hà distruttu u mo particionamentu. Sò sicuru chì questu pò esse riparatu, ma dopu un paru di ghjorni di ricerca ùn aghju micca pussutu truvà una suluzione. Finalmente aghju finitu tutti i travaglii in Spark, ancu s'ellu hà pigliatu un pocu di tempu è i mo split Parquet files ùn eranu micca assai chjuchi (~ 200 KB). Tuttavia, i dati era induve era necessariu.

Analisi di 25 TB cù AWK è R
Troppu picculu è irregolare, maravigliosu!

Pruvate e dumande Spark lucali

Chì aghju amparatu: Spark hà troppu overhead quandu risolve i prublemi simplici.

Scaricatu i dati in un formatu intelligente, aghju pussutu pruvà a velocità. Configurate un script R per eseguisce un servitore Spark lucale, è poi caricate un quadru di dati Spark da l'almacenamiento di u gruppu Parquet specificatu (bin). Aghju pruvatu à carricà tutte e dati ma ùn pudia micca ottene Sparklyr per ricunnosce u particionamentu.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

L'esecuzione hà pigliatu 29,415 seconde. Moltu megliu, ma micca troppu bonu per a prova di massa di qualcosa. Inoltre, ùn aghju micca pussutu accelerà e cose cù a cache perchè quandu aghju pruvatu à cache un frame di dati in memoria, Spark sempre s'hè lampatu, ancu quandu aghju attribuitu più di 50 GB di memoria à un dataset chì pesava menu di 15.

Ritorna à AWK

Chì aghju amparatu: L'arrays associativi in ​​AWK sò assai efficaci.

Aghju capitu chì puderia ottene velocità più altu. Aghju ricurdatu chì in una maraviglia Tutorial AWK di Bruce Barnett Aghju lettu nantu à una funzione fantastica chjamata "arrays assuciativi" Essenzialmente, questi sò coppie chjave-valore, chì per una certa ragione sò stati chjamati in modu diversu in AWK, è per quessa ùn aghju micca pensatu assai à elli. Roman Cheplyaka ricurdò chì u terminu "arrays associative" hè assai più anticu cà u terminu "coppiu chjave-valore". Ancu s'è tù cercate a chjave-valore in Google Ngram, Ùn vi vede micca questu termu quì, ma truverete arrays associative ! Inoltre, u "coppiu chjave-valore" hè più spessu assuciatu cù basa di dati, cusì hè assai più sensu di paragunà cù un hashmap. Aghju realizatu chì puderia usà questi arrays associativi per associà i mo SNP cù una tavola bin è dati crudi senza aduprà Spark.

Per fà questu, in u script AWK aghju utilizatu u bloccu BEGIN. Questu hè un pezzu di codice chì hè eseguitu prima chì a prima linea di dati hè passata à u corpu principale di u script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

squadra while(getline...) caricatu tutte e fila da u gruppu CSV (bin), stabilisce a prima colonna (nome SNP) cum'è a chjave per l'array associative bin è u sicondu valore (gruppu) cum'è u valore. Allora in u bloccu { }, chì hè eseguitu in tutte e linee di u schedariu principale, ogni linea hè mandata à u schedariu di output, chì riceve un nome unicu secondu u so gruppu (bin): ..._bin_"bin[$1]"_....

Variabili batch_num и chunk_id currisponde à i dati furniti da u pipeline, evitendu una cundizione di razza, è ogni filu di esecuzione in esecuzione parallel, hà scrittu à u so propiu schedariu unicu.

Siccomu aghju spargugliatu tutte e dati crudi in cartulare nantu à i cromusomi lasciati da u mo esperimentu precedente cù AWK, avà puderia scrive un altru script Bash per processà un cromusomu à u tempu è mandà dati partizionati più profondi à S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

U script hà duie sezzioni parallel.

In a prima rùbbrica, i dati sò leghjite da tutti i schedari chì cuntenenu l'infurmazioni nantu à u cromusomu desideratu, allora sta dati hè distribuitu in i filamenti, chì distribuiscenu i schedari in i gruppi appropritati (bin). Per evità e cundizioni di razza quandu parechji fili scrivenu à u stessu schedariu, AWK passa i nomi di u schedariu per scrive dati in diversi posti, per esempiu. chr_10_bin_52_batch_2_aa.csv. In u risultatu, assai picculi schedari sò creati nantu à u discu (per questu aghju utilizatu volumi EBS terabyte).

Conveyor da a seconda seccione parallel passa per i gruppi (bin) è combina i so schedarii individuali in CSV cumuni c cate poi li manda per l'esportazione.

Trasmissione in R?

Chì aghju amparatu: Pudete cuntattate stdin и stdout da un script R, è dunque l'utilizanu in u pipeline.

Puderete avè nutatu sta linea in u vostru script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce tutti i schedarii di gruppu cuncatenati (bin) in u script R sottu. {} hè una tecnica speciale parallel, chì inserisce qualsiasi dati chì manda à u flussu specificatu direttamente in u cumandamentu stessu. Opzione {#} furnisce un ID di filu unicu, è {%} rapprisenta u numeru di slot di travagliu (ripetiu, ma mai simultaneamente). Una lista di tutte l'opzioni pò esse truvata in documentazione.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Quandu una variabile file("stdin") trasmessa à readr::read_csv, I dati tradutti in u script R hè carricu in un quadru, chì hè allora in a forma .rds-file usendu aws.s3 scrittu direttamente à S3.

RDS hè qualcosa cum'è una versione junior di Parquet, senza i fronzoli di u almacenamentu di parlanti.

Dopu avè finitu u script Bash aghju avutu un bundle .rds-files situatu in S3, chì m'hà permessu di utilizà cumpressione efficiente è tipi integrati.

Malgradu l'usu di frenu R, tuttu hà travagliatu assai rapidamente. Ùn hè micca surprisante, e parte di R chì leghje è scrive dati sò assai ottimizzati. Dopu avè pruvatu nantu à un cromusomu mediu, u travagliu hè finitu nantu à una istanza C5n.4xl in circa duie ore.

S3 Limitazioni

Chì aghju amparatu: Grazie à l'implementazione di u percorsu intelligente, S3 pò trattà parechji schedari.

Eru preoccupatu s'ellu S3 puderia trattà i numerosi schedari chì sò stati trasferiti. Puderia fà chì i nomi di i fugliali facenu sensu, ma cumu avaristi S3 à circà?

Analisi di 25 TB cù AWK è R
I cartulare in S3 sò solu per mostra, in fattu u sistema ùn hè micca interessatu à u simbulu /. Da a pagina FAQ S3.

Sembra chì S3 rapprisenta u percorsu à un schedariu particulari cum'è una chjave simplice in una sorta di table hash o basa di dati basata in documentu. Un bucket pò esse pensatu cum'è una tavula, è i schedari ponu esse cunsiderati registri in quella tavula.

Siccomu a rapidità è l'efficienza sò impurtanti per fà un prufittu in Amazon, ùn hè micca surprisa chì stu sistema di chjave-as-a-file-path hè freaking ottimizzatu. Aghju pruvatu à truvà un equilibriu: per ùn avè micca bisognu di fà assai richieste, ma chì e dumande sò state eseguite rapidamente. Si girò fora chì hè megliu à fà circa 20 mila bin files. Pensu chì se continuemu à ottimisà, pudemu ottene un aumentu di a veloce (per esempiu, facendu un bucket speciale solu per i dati, riducendu cusì a dimensione di a tavola di ricerca). Ma ùn ci era micca tempu o soldi per più esperimenti.

Chì ci hè a cumpatibilità croce?

Ciò chì aghju amparatu: A prima causa di u tempu perdu hè ottimisà u vostru metudu di almacenamiento prematuremente.

À questu puntu, hè assai impurtante di dumandà sè stessu: "Perchè aduprà un furmatu di schedariu propiu?" U mutivu si trova in a velocità di carica (i fugliali CSV gzipped anu pigliatu 7 volte più di carica) è a cumpatibilità cù i nostri flussi di travagliu. Puderaghju ricunsiderà s'ellu R pò facirmenti carricà i schedari Parquet (o Arrow) senza a carica Spark. Tutti in u nostru labburatoriu usa R, è s'è aghju bisognu di cunvertisce i dati in un altru furmatu, aghju sempre i dati di u testu originale, perchè possu solu currerà u pipeline di novu.

Divisione di u travagliu

Chì aghju amparatu: Ùn pruvate d'ottimisà i travaglii manualmente, lasciate l'urdinatore.

Aghju debugged u flussu di travagliu nantu à un cromusomu, avà aghju bisognu di processà tutte l'altri dati.
Vuliu crià parechje istanze EC2 per a cunversione, ma à u listessu tempu aghju avutu a paura di ottene una carica assai sbilanciata in diversi travaglii di trasfurmazioni (cum'è Spark hà patitu partizioni sbilanciate). Inoltre, ùn era micca interessatu à elevà una istanza per cromusomu, perchè per i cunti AWS ci hè un limitu predeterminatu di 10 casi.

Allora aghju decisu di scrive un script in R per ottimisà i travaglii di trasfurmazioni.

Prima, aghju dumandatu à S3 di calculà quantu spaziu di almacenamentu ogni cromusomu occupatu.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Allora aghju scrittu una funzione chì piglia a dimensione tutale, mescola l'ordine di i cromusomi, li divide in gruppi. num_jobs è vi dici quantu sò diffirenti e dimensioni di tutti i travaglii di trasfurmazioni.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Allora aghju passatu mille shuffles cù purrr è sceltu u megliu.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Allora aghju finitu cun un settore di tarei chì eranu assai simili in grandezza. Allora tuttu ciò chì restava era di imballà u mo script Bash precedente in un grande ciclu for. Questa ottimisazione hà pigliatu circa 10 minuti per scrive. È questu hè assai menu di ciò chì spenderebbe per creà manualmente e attività s'ellu eranu sbilanciati. Dunque, pensu chì eru ghjustu cù questa ottimisazione preliminare.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

À a fine aghju aghjunghje l'ordine di spegnimentu:

sudo shutdown -h now

... è tuttu hà travagliatu ! Utilizendu l'AWS CLI, aghju risuscitatu istanze cù l'opzione user_data li detti scripts Bash di i so compiti per u prucessu. Corru è chjusu automaticamente, cusì ùn aghju micca pagatu per una putenza di trasfurmazioni extra.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Imballemu !

Chì aghju amparatu: L'API deve esse simplice per a facilità è a flessibilità di usu.

Finalmente aghju avutu i dati in u locu ghjustu è a forma. Tuttu ciò chì restava era di simplificà u prucessu di utilizà e dati quant'è pussibule per fà più faciule per i mo culleghi. Vuliu fà una API simplice per creà richieste. Se in u futuru decide di cambià da .rds à i schedari Parquet, allura stu deve esse un prublema per mè, micca per i mio culleghi. Per questu aghju decisu di fà un pacchettu R internu.

Custruisce è documenta un pacchettu assai simplice chì cuntene solu uni pochi di funzioni di accessu à dati urganizati intornu à una funzione get_snp. Aghju fattu ancu un situ web per i mo culleghi pkgdown, cusì ponu vede facilmente esempi è documentazione.

Analisi di 25 TB cù AWK è R

Caching intelligente

Chì aghju amparatu: Se i vostri dati sò ben preparati, a caching serà faciule!

Siccomu unu di i flussi di travagliu principali applicà u listessu mudellu di analisi à u pacchettu SNP, decisu di utilizà binning à u mo vantaghju. Quandu trasmette dati via SNP, tutte l'infurmazioni da u gruppu (bin) sò attaccati à l'ughjettu tornatu. Questu hè, e dumande vechje ponu (in teoria) accelerà u processu di e dumande novi.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Quandu custruì u pacchettu, aghju eseguitu assai benchmarks per paragunà a velocità quandu anu utilizatu metudi diffirenti. I ricumandemu micca di trascuratà questu, perchè qualchì volta i risultati sò inespettati. Per esempiu, dplyr::filter era assai più veloce di catturà fila cù u filtru basatu in l'indexazione, è ricuperà una sola colonna da un quadru di dati filtratu era assai più veloce di l'usu di a sintassi di l'indexazione.

Per piacè nutate chì l'ughjettu prev_snp_results cuntene a chjave snps_in_bin. Questu hè un array di tutti i SNP unichi in un gruppu (bin), chì vi permette di verificà rapidamente se avete digià dati da una dumanda precedente. Hè ancu facilitu per passà in tutti i SNP in un gruppu (bin) cù stu codice:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Risultati

Avà pudemu (è avemu principiatu à seriamente) eseguisce mudelli è scenarii chì prima eranu inaccessibili per noi. U megliu hè chì i mo culleghi di labburatoriu ùn anu micca da pensà à alcuna cumplicazione. Hanu solu una funzione chì travaglia.

E ancu s'ellu u pacchettu li risparmia i dettagli, aghju pruvatu à fà u furmatu di dati abbastanza simplice chì puderianu capisce s'ellu sparissi di colpu dumane...

A velocità hè aumentata notevolmente. Di solitu scannemu frammenti di genoma funziunale significativu. Nanzu, ùn pudemu micca fà questu (hè risultatu troppu caru), ma avà, grazia à a struttura di u gruppu (bin) è u caching, una dumanda per un SNP dura in media menu di 0,1 seconde, è l'usu di dati hè cusì bassu. chì u costu di S3 hè cacahuè.

cunchiusioni

Questu articulu ùn hè micca una guida in tuttu. A suluzione hè stata individuale, è quasi certamenti micca ottimali. Piuttostu, hè un libru di viaghju. Vogliu chì l'altri capiscenu chì tali decisioni ùn pare micca cumplettamente furmatu in a testa, sò u risultatu di prucessu è errore. Inoltre, s'è vo circate un scientist di dati, tenite in mente chì l'usu di sti arnesi in modu efficau richiede sperienza, è l'esperienza costa soldi. Sò cuntentu chì aghju avutu i mezi di pagà, ma parechji altri chì ponu fà u stessu travagliu megliu cà mè ùn anu mai l'uppurtunità per mancanza di soldi per pruvà ancu.

I strumenti di big data sò versatili. Sè vo avete u tempu, pudete guasi certamente scrive una suluzione più veloce utilizendu tecniche intelligenti di pulizia, almacenamiento è estrazione di dati. Infine, si tratta di una analisi di u costu-benefiziu.

Ciò chì aghju amparatu:

  • ùn ci hè micca un modu economicu per analizà 25 TB à tempu;
  • esse attentu à a dimensione di i vostri schedari Parquet è a so urganizazione;
  • Partizioni in Spark deve esse equilibratu;
  • In generale, ùn pruvate mai di fà 2,5 milioni di partizioni;
  • A classificazione hè sempre difficiule, cum'è a stallazione di Spark;
  • qualchì volta dati spiciali richiede suluzioni spiciali;
  • L'aggregazione di Spark hè rapida, ma a partizione hè sempre caru;
  • ùn dorme micca quandu vi insegnanu i principii, qualchissia probabilmente hà digià risoltu u vostru prublema in l'anni 1980;
  • gnu parallel - questu hè una cosa magica, ognunu deve aduprà;
  • Spark li piace a dati senza cumpressione è ùn piace micca cumminà partizioni;
  • Spark hà troppu overhead quandu risolve i prublemi simplici;
  • L'arrays associativi di AWK sò assai efficaci;
  • pudete cuntattate stdin и stdout da un script R, è dunque aduprà in u pipeline;
  • Grazie à l'implementazione di u percorsu intelligente, S3 pò processà parechji schedari;
  • U mutivu principalu di perde u tempu hè l'optimizazione prematura di u vostru metudu di almacenamiento;
  • ùn pruvate micca di ottimisà i travaglii manualmente, lasciate l'urdinatore fà;
  • L'API deve esse simplice per a facilità è a flessibilità di usu;
  • Se i vostri dati sò ben preparati, u caching serà faciule!

Source: www.habr.com

Add a comment