Cumu leghje stu articulu: Scusate chì u testu hè cusì longu è caòticu. Per risparmià u tempu, aghju principiatu ogni capitulu cù una introduzione "Ciò chì aghju amparatu", chì riassume l'essenza di u capitulu in una o duie frasi.
"Mostrami solu a suluzione!" Se vulete vede da induve sò vinutu, saltate à u capitulu "Diventà più inventiva", ma pensu chì hè più interessante è utile per leghje nantu à u fallimentu.
Aghju statu recentemente incaricatu di stabilisce un prucessu per processà un grande volume di sequenze di DNA crudu (tecnicamente un chip SNP). U bisognu era di ottene rapidamente dati nantu à un locu geneticu determinatu (chjamatu SNP) per a mudeli sussegwente è altre attività. Utilizendu R è AWK, aghju pussutu pulizziari è urganizà e dati in modu naturali, accelendu assai l'elaborazione di e dumande. Questu ùn era micca faciule per mè è necessitava numerosi iterazioni. Questu articulu vi aiuterà à evità alcuni di i mo sbaglii è vi mostrarà ciò chì aghju finitu.
Prima, alcune spiegazioni introduttive.
dati
U nostru centru di trattamentu di l'infurmazione genetica universitaria ci hà furnitu dati in forma di un TSV 25 TB. L'aghju ricivutu divisu in 5 pacchetti compressi in Gzip, ognunu cuntene circa 240 schedarii di quattru gigabyte. Ogni fila cuntene dati per un SNP da un individuu. In totale, i dati nantu à ~ 2,5 milioni di SNP è ~ 60 mila persone sò stati trasmessi. In più di l'infurmazioni SNP, i schedari cuntenenu numerosi culonni cù numeri chì riflettenu diverse caratteristiche, cum'è l'intensità di lettura, a frequenza di diversi alleli, etc. In totale ci era circa 30 colonne cù valori unichi.
Goal
Cum'è cù qualsiasi prughjettu di gestione di dati, u più impurtante era di determinà cumu si usanu e dati. In stu casu selezziunemu soprattuttu mudelli è flussi di travagliu per SNP basatu annantu à SNP. Vale à dì, avemu bisognu di dati solu nantu à un SNP à tempu. Aviu avutu à amparà à ricuperà tutti i registri assuciati cù unu di i 2,5 milioni di SNP cum'è facilmente, rapidamente è ecunomicu pussibule.
Cumu ùn fà micca questu
Per cita un cliché adattatu:
Ùn aghju micca fallutu mille volte, aghju solu scupertu mille manere per evità di analizà una mansa di dati in un formatu amichevule à query.
Prima pruvà
Chì aghju amparatu: Ùn ci hè micca un modu economicu per analizà 25 TB à tempu.
Dopu avè pigliatu u corsu "Metudi Avanzati per u Trattamentu di Big Data" à l'Università Vanderbilt, era sicuru chì u truccu era in u saccu. Ci hà da piglià prubabilmente una ora o duie per stallà u servitore Hive per eseguisce tutte e dati è rappurtate u risultatu. Siccomu i nostri dati sò almacenati in AWS S3, aghju utilizatu u serviziu Athena, chì vi permette di applicà e dumande SQL Hive à i dati S3. Ùn avete bisognu di stallà / crià un cluster Hive, è ancu pagà solu per i dati chì vo circate.
Dopu avè dimustratu à Athena i mo dati è u so furmatu, aghju eseguitu qualchi teste cù dumande cum'è questu:
select * from intensityData limit 10;
È hà ricevutu rapidamente risultati ben strutturati. Pronta.
Finu à pruvà à aduprà e dati in u nostru travagliu ...
Hè statu dumandatu à tirà tutte l'infurmazioni SNP per pruvà u mudellu. Aghju fattu a dumanda:
select * from intensityData
where snp = 'rs123456';
... è cuminciò à aspittà. Dopu ottu minuti è più di 4 TB di dati dumandati, aghju ricevutu u risultatu. Athena carica per u voluminu di dati truvati, $ 5 per terabyte. Allora sta dumanda unica costa $ 20 è ottu minuti d'attesa. Per eseguisce u mudellu nantu à tutte e dati, duvemu aspittà 38 anni è pagà $ 50. Ovviamente, questu ùn era micca adattatu per noi.
Era necessariu aduprà Parquet...
Chì aghju amparatu: Attenti cù a dimensione di i vostri schedari Parquet è a so urganizazione.
Prima aghju pruvatu à riparà a situazione cunvertisce tutti i TSV in File di parquet. Sò cunvenuti per travaglià cù grande setti di dati, perchè l'infurmazioni in elli sò almacenati in forma di colonna: ogni colonna si trova in u so propiu segmentu di memoria / discu, in cuntrastu à i schedarii di testu, in quale fila cuntene elementi di ogni colonna. È s'ellu ci vole à truvà qualcosa, allora basta à leghje a colonna necessaria. Inoltre, ogni fugliale guarda un intervallu di valori in una colonna, dunque se u valore chì cercate ùn hè micca in a gamma di a colonna, Spark ùn perde micca u tempu à scansà tuttu u schedariu.
Aghju fattu un compitu simplice Cola AWS per cunvertisce i nostri TSV in Parquet è abbandunò i novi schedari in Athena. Pigliò circa 5 ore. Ma quandu aghju eseguitu a dumanda, hà pigliatu circa a listessa quantità di tempu è un pocu menu di soldi per compie. U fattu hè chì Spark, circannu à ottimisà u compitu, simpricimenti unpacked un chunk TSV è mette lu in u so propriu chunk Parquet. È perchè ogni pezzu era abbastanza grande per cuntene i registri sanu di parechje persone, ogni schedariu cuntene tutti i SNP, cusì Spark hà avutu à apre tutti i schedari per estrae l'infurmazioni necessarii.
Curiosamente, u tipu di compressione predeterminatu (è cunsigliatu) di Parquet, snappy, ùn hè micca splittable. Per quessa, ogni esecutore era chjappu nantu à u compitu di unpacking è di scaricamentu di u cumpletu di 3,5 GB di dataset.
Capemu u prublema
Chì aghju amparatu: Sorting hè difficiule, soprattuttu se i dati sò distribuiti.
Mi paria chì avà aghju capitu l'essenza di u prublema. Aviu solu bisognu di sorte e dati per colonna SNP, micca da persone. Allora parechji SNP seranu guardati in un pezzu di dati separatu, è dopu a funzione "intelligente" di Parquet "apre solu s'ellu u valore hè in u range" si mostrarà in tutta a so gloria. Sfurtunatamente, a classificazione di miliardi di fila spargugliati in un cluster hè statu un compitu difficiule.
Mi pigliate classi di algoritmi in l'università: "Ugh, nimu ùn importa di a cumplessità computazionale di tutti questi algoritmi di classificazione"
Mi prova à sorte nantu à una colonna in un 20TB #scintilla tavula: "Perchè questu dura tantu tempu?" #DataScience lotte.
AWS definitivamente ùn vole micca emette un rimborsu per via di a ragione "Sò un studiente distrattu". Dopu chì aghju rializatu a classificazione nantu à Amazon Glue, hà currettu per 2 ghjorni è s'hè lampatu.
E partizioni ?
Chì aghju amparatu: Partizioni in Spark deve esse equilibratu.
Allora aghju avutu l'idea di particionà e dati in cromusomi. Ci sò 23 d'elli (è parechji più si pigliate in contu l'ADN mitocondriale è e regioni senza mappa).
Questu permetterà di sparte i dati in pezzi più chjuchi. Se aghjunghje una sola linea à a funzione d'esportazione Spark in u script Glue partition_by = "chr", allura i dati deve esse divisu in buckets.
U genoma hè custituitu da numerosi frammenti chjamati cromusomi.
Sfurtunatamente, ùn hà micca travagliatu. I cromosomi anu diverse dimensioni, chì significheghja diverse quantità di informazioni. Questu significa chì i travaglii chì Spark hà mandatu à i travagliadori ùn sò micca stati equilibrati è cumpleti lentamente perchè certi nodi anu finitu prima è eranu inattivi. Tuttavia, i travaglii sò stati cumpletati. Ma quandu dumandava un SNP, u sbilanciamentu hà causatu di novu prublemi. U costu di trasfurmà SNP nantu à cromusomi più grande (vale à dì, induve vulemu ottene dati) hè solu diminuitu da circa un fattore di 10. Assai, ma micca abbastanza.
Et si l'on divise en parties encore plus petites ?
Chì aghju amparatu: Ùn pruvate mai di fà 2,5 milioni di partizioni in tuttu.
Aghju decisu di andà in tuttu è sparte ogni SNP. Questu hà assicuratu chì e partizioni eranu di uguali dimensione. ERA UNA GATTIVA IDEA. Aghju utilizatu Glue è aghjunse una linea innocente partition_by = 'snp'. U compitu principia è cuminciò à eseguisce. Un ghjornu dopu aghju verificatu è vistu chì ùn ci era ancu nunda di scrittu à S3, cusì aghju tombu u compitu. Sembra chì Glue scriveva i schedarii intermedi in un locu oculatu in S3, assai schedari, forsi un paru di milioni. In u risultatu, u mo sbagliu costava più di mille dollari è ùn hà micca piace à u mo mentore.
Spartizione + ordinamentu
Chì aghju amparatu: Sorting hè sempre difficiule, cum'è tuning Spark.
U mo ultimu tentativu di spartizione m'hà implicatu à particionà i cromusomi è dopu à sorte ogni partizione. In tiurìa, chistu accellerà ogni dumanda perchè i dati SNP desiderati duvia esse in uni pochi di pezzi di Parquet in un intervallu datu. Sfurtunatamente, l'ordine ancu di dati partizionati hè statu un compitu difficiule. In u risultatu, aghju cambiatu à EMR per un cluster persunalizatu è utilizatu ottu istanze putenti (C5.4xl) è Sparklyr per creà un flussu di travagliu più flexible...
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
... in ogni modu, u compitu ùn era micca finitu. L'aghju cunfiguratu in diverse manere: hà aumentatu l'allocazione di memoria per ogni esecutore di query, utilizatu nodi cù una grande quantità di memoria, utilizatu variabili di broadcast (variabili di trasmissione), ma ogni volta questi sò diventati a mità di misure, è gradualmente l'esecutori cuminciaru. per fallu finu à chì tuttu si ferma.
Chì aghju amparatu: Calchì volta dati spiciali richiede suluzioni spiciali.
Ogni SNP hà un valore di pusizioni. Questu hè un numeru chì currisponde à u numeru di basi longu u so cromusomu. Questu hè un modu bellu è naturali per urganizà i nostri dati. À u principiu, vulia sparte per regioni di ogni cromusomu. Per esempiu, pusizioni 1 - 2000, 2001 - 4000, etc. Ma u prublema hè chì i SNP ùn sò micca distribuiti uniformemente in i cromusomi, per quessa, e dimensioni di u gruppu varieranu assai.
In u risultatu, sò ghjuntu à una ripartizione di pusizioni in categurie (rank). Utilizendu e dati scaricati, aghju fattu una dumanda per ottene una lista di SNP unichi, e so pusizioni è i cromusomi. Allora aghju ordinatu i dati in ogni cromusomu è cullucatu SNP in gruppi (bin) di una certa dimensione. Diciamu 1000 SNP ognunu. Questu m'hà datu a relazione SNP-à-gruppu-per-cromusomu.
In fine, aghju fattu gruppi (bin) di 75 SNP, u mutivu serà spiegatu quì sottu.
Chì aghju amparatu: L'aggregazione di Spark hè rapida, ma a partizione hè sempre caru.
Vuliu leghje stu picculu quadru di dati (2,5 milioni di file) in Spark, combina cù i dati crudi, è poi particionà da a colonna appena aghjuntu. bin.
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
Aghju utilizatu sdf_broadcast(), cusì Spark sapi chì deve mandà u quadru di dati à tutti i nodi. Questu hè utile se i dati sò chjuchi in grandezza è necessariu per tutti i travaglii. Altrimenti, Spark prova à esse intelligente è distribuisce e dati cum'è necessariu, chì pò causà rallentamenti.
E dinò, a mo idea ùn hà micca travagliatu: i travaglii anu travagliatu per qualchì tempu, cumpletu l'unione, è dopu, cum'è l'esecutori lanciati da partizioni, cuminciaru à fallu.
Aghjunghjendu AWK
Chì aghju amparatu: Ùn dorme micca quandu vi sò insignati i principii. Di sicuru, qualchissia hà digià risoltu u vostru prublema in l'anni 1980.
Finu à questu puntu, u mutivu di tutti i mo fallimenti cù Spark era a cunfusione di dati in u cluster. Forsi a situazione pò esse migliurata cù pre-trattamentu. Aghju decisu di pruvà à sparte i dati di testu crudu in culonni di cromusomi, cusì sperava di furnisce Spark cù dati "pre-partizione".
Aghju cercatu in StackOverflow cumu si sparte per valori di colonna è trovu una risposta cusì grande. Cù AWK pudete sparte un schedariu di testu per valori di colonna scrivendulu in un script invece di mandà i risultati à stdout.
Aghju scrittu un script Bash per pruvà. Scaricatu unu di i TSV imballati, dopu sbulicà utilizendu gzip è mandatu à awk.
Chì aghju amparatu: gnu parallel - hè una cosa magica, ognunu deve aduprà.
A separazione era abbastanza lenta è quandu aghju cuminciatu htopper verificà l'usu di un putente (è caru) istanza EC2, hè stata chì aghju utilizatu solu un core è circa 200 MB di memoria. Per risolviri u prublema è ùn perde micca assai soldi, avemu avutu à capisce cumu si paralleli u travagliu. Fortunatamente, in un libru assolutamente maravigghiusu Data Science à a Linea di Command Aghju trovu un capitulu di Jeron Janssens nantu à a parallelizazione. Da ellu aghju amparatu gnu parallel, un metudu assai flexible per implementà multithreading in Unix.
Quandu aghju principiatu a particione cù u novu prucessu, tuttu era bè, ma ci era sempre un collu di buttiglia - a scaricamentu di l'uggetti S3 à u discu ùn era micca assai veloce è micca cumpletamente parallelizatu. Per riparà questu, aghju fattu questu:
Aghju scupertu chì hè pussibule implementà u stadiu di scaricamentu S3 direttamente in u pipeline, eliminendu cumplettamente l'almacenamiento intermediu nantu à u discu. Questu significa chì possu evità di scrive dati crudi à u discu è aduprà un almacenamentu ancu più chjucu, è dunque più prezzu, in AWS.
squadra aws configure set default.s3.max_concurrent_requests 50 hà aumentatu assai u numeru di fili chì AWS CLI usa (per difettu ci sò 10).
Aghju cambiatu à una istanza EC2 ottimizzata per a velocità di a rete, cù a lettera n in u nome. Aghju trovu chì a perdita di putenza di trasfurmazioni quandu si usa n-istanze hè più cà compensata da l'aumentu di a velocità di carica. Per a maiò parte di i travaglii aghju utilizatu c5n.4xl.
Cambiatu gzip nantu pigz, Questu hè un strumentu gzip chì pò fà cose cool per parallelizà u compitu inizialmente micca parallelizatu di decompressing files (questu hà aiutatu u minimu).
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe
"awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
Sti passi sò cumminati cù l'altri per fà tuttu u travagliu assai rapidamente. Aumentendu a velocità di scaricamentu è eliminendu e scrittura di discu, puderia avà processà un pacchettu di 5 terabyte in pocu ore.
Ùn ci hè nunda di più dolce chè vede tutti i nuclei chì paghete nantu à AWS utilizatu. Grazie à gnu-parallel, possu unzip è split un csv 19gig appena cum'è possu scaricà. Ùn pudia ancu avè una scintilla per eseguisce questu. #DataScience#Linuxpic.twitter.com/Nqyba2zqEk
Chì aghju amparatu: A Spark li piace i dati senza cumpressione è ùn piace micca cumminà partizioni.
Avà i dati eranu in S3 in un formatu unpacked (lettu: spartutu) è semi-urdinatu, è puderia turnà à Spark di novu. Una sorpresa m'aspittava : ùn aghju micca riesciutu à ghjunghje à ciò chì vulia ! Era assai difficiuli di dì à Spark esattamente cumu i dati sò stati partizionati. E ancu quandu aghju fattu questu, hè risultatu chì ci era troppu partizioni (95 mila), è quandu aghju utilizatu coalesce riduciutu u so numeru à limiti ragiunate, questu hà distruttu u mo particionamentu. Sò sicuru chì questu pò esse riparatu, ma dopu un paru di ghjorni di ricerca ùn aghju micca pussutu truvà una suluzione. Finalmente aghju finitu tutti i travaglii in Spark, ancu s'ellu hà pigliatu un pocu di tempu è i mo split Parquet files ùn eranu micca assai chjuchi (~ 200 KB). Tuttavia, i dati era induve era necessariu.
Troppu picculu è irregolare, maravigliosu!
Pruvate e dumande Spark lucali
Chì aghju amparatu: Spark hà troppu overhead quandu risolve i prublemi simplici.
Scaricatu i dati in un formatu intelligente, aghju pussutu pruvà a velocità. Configurate un script R per eseguisce un servitore Spark lucale, è poi caricate un quadru di dati Spark da l'almacenamiento di u gruppu Parquet specificatu (bin). Aghju pruvatu à carricà tutte e dati ma ùn pudia micca ottene Sparklyr per ricunnosce u particionamentu.
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
L'esecuzione hà pigliatu 29,415 seconde. Moltu megliu, ma micca troppu bonu per a prova di massa di qualcosa. Inoltre, ùn aghju micca pussutu accelerà e cose cù a cache perchè quandu aghju pruvatu à cache un frame di dati in memoria, Spark sempre s'hè lampatu, ancu quandu aghju attribuitu più di 50 GB di memoria à un dataset chì pesava menu di 15.
Ritorna à AWK
Chì aghju amparatu: L'arrays associativi in AWK sò assai efficaci.
Aghju capitu chì puderia ottene velocità più altu. Aghju ricurdatu chì in una maraviglia Tutorial AWK di Bruce Barnett Aghju lettu nantu à una funzione fantastica chjamata "arrays assuciativi" Essenzialmente, questi sò coppie chjave-valore, chì per una certa ragione sò stati chjamati in modu diversu in AWK, è per quessa ùn aghju micca pensatu assai à elli. Roman Cheplyaka ricurdò chì u terminu "arrays associative" hè assai più anticu cà u terminu "coppiu chjave-valore". Ancu s'è tù cercate a chjave-valore in Google Ngram, Ùn vi vede micca questu termu quì, ma truverete arrays associative ! Inoltre, u "coppiu chjave-valore" hè più spessu assuciatu cù basa di dati, cusì hè assai più sensu di paragunà cù un hashmap. Aghju realizatu chì puderia usà questi arrays associativi per associà i mo SNP cù una tavola bin è dati crudi senza aduprà Spark.
Per fà questu, in u script AWK aghju utilizatu u bloccu BEGIN. Questu hè un pezzu di codice chì hè eseguitu prima chì a prima linea di dati hè passata à u corpu principale di u script.
squadra while(getline...) caricatu tutte e fila da u gruppu CSV (bin), stabilisce a prima colonna (nome SNP) cum'è a chjave per l'array associative bin è u sicondu valore (gruppu) cum'è u valore. Allora in u bloccu {}, chì hè eseguitu in tutte e linee di u schedariu principale, ogni linea hè mandata à u schedariu di output, chì riceve un nome unicu secondu u so gruppu (bin): ..._bin_"bin[$1]"_....
Variabili batch_num и chunk_id currisponde à i dati furniti da u pipeline, evitendu una cundizione di razza, è ogni filu di esecuzione in esecuzione parallel, hà scrittu à u so propiu schedariu unicu.
Siccomu aghju spargugliatu tutte e dati crudi in cartulare nantu à i cromusomi lasciati da u mo esperimentu precedente cù AWK, avà puderia scrive un altru script Bash per processà un cromusomu à u tempu è mandà dati partizionati più profondi à S3.
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
U script hà duie sezzioni parallel.
In a prima rùbbrica, i dati sò leghjite da tutti i schedari chì cuntenenu l'infurmazioni nantu à u cromusomu desideratu, allora sta dati hè distribuitu in i filamenti, chì distribuiscenu i schedari in i gruppi appropritati (bin). Per evità e cundizioni di razza quandu parechji fili scrivenu à u stessu schedariu, AWK passa i nomi di u schedariu per scrive dati in diversi posti, per esempiu. chr_10_bin_52_batch_2_aa.csv. In u risultatu, assai picculi schedari sò creati nantu à u discu (per questu aghju utilizatu volumi EBS terabyte).
Conveyor da a seconda seccione parallel passa per i gruppi (bin) è combina i so schedarii individuali in CSV cumuni c cate poi li manda per l'esportazione.
Trasmissione in R?
Chì aghju amparatu: Pudete cuntattate stdin и stdout da un script R, è dunque l'utilizanu in u pipeline.
Puderete avè nutatu sta linea in u vostru script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce tutti i schedarii di gruppu cuncatenati (bin) in u script R sottu. {} hè una tecnica speciale parallel, chì inserisce qualsiasi dati chì manda à u flussu specificatu direttamente in u cumandamentu stessu. Opzione {#} furnisce un ID di filu unicu, è {%} rapprisenta u numeru di slot di travagliu (ripetiu, ma mai simultaneamente). Una lista di tutte l'opzioni pò esse truvata in documentazione.
Quandu una variabile file("stdin") trasmessa à readr::read_csv, I dati tradutti in u script R hè carricu in un quadru, chì hè allora in a forma .rds-file usendu aws.s3 scrittu direttamente à S3.
RDS hè qualcosa cum'è una versione junior di Parquet, senza i fronzoli di u almacenamentu di parlanti.
Dopu avè finitu u script Bash aghju avutu un bundle .rds-files situatu in S3, chì m'hà permessu di utilizà cumpressione efficiente è tipi integrati.
Malgradu l'usu di frenu R, tuttu hà travagliatu assai rapidamente. Ùn hè micca surprisante, e parte di R chì leghje è scrive dati sò assai ottimizzati. Dopu avè pruvatu nantu à un cromusomu mediu, u travagliu hè finitu nantu à una istanza C5n.4xl in circa duie ore.
S3 Limitazioni
Chì aghju amparatu: Grazie à l'implementazione di u percorsu intelligente, S3 pò trattà parechji schedari.
Eru preoccupatu s'ellu S3 puderia trattà i numerosi schedari chì sò stati trasferiti. Puderia fà chì i nomi di i fugliali facenu sensu, ma cumu avaristi S3 à circà?
I cartulare in S3 sò solu per mostra, in fattu u sistema ùn hè micca interessatu à u simbulu /. Da a pagina FAQ S3.
Sembra chì S3 rapprisenta u percorsu à un schedariu particulari cum'è una chjave simplice in una sorta di table hash o basa di dati basata in documentu. Un bucket pò esse pensatu cum'è una tavula, è i schedari ponu esse cunsiderati registri in quella tavula.
Siccomu a rapidità è l'efficienza sò impurtanti per fà un prufittu in Amazon, ùn hè micca surprisa chì stu sistema di chjave-as-a-file-path hè freaking ottimizzatu. Aghju pruvatu à truvà un equilibriu: per ùn avè micca bisognu di fà assai richieste, ma chì e dumande sò state eseguite rapidamente. Si girò fora chì hè megliu à fà circa 20 mila bin files. Pensu chì se continuemu à ottimisà, pudemu ottene un aumentu di a veloce (per esempiu, facendu un bucket speciale solu per i dati, riducendu cusì a dimensione di a tavola di ricerca). Ma ùn ci era micca tempu o soldi per più esperimenti.
Chì ci hè a cumpatibilità croce?
Ciò chì aghju amparatu: A prima causa di u tempu perdu hè ottimisà u vostru metudu di almacenamiento prematuremente.
À questu puntu, hè assai impurtante di dumandà sè stessu: "Perchè aduprà un furmatu di schedariu propiu?" U mutivu si trova in a velocità di carica (i fugliali CSV gzipped anu pigliatu 7 volte più di carica) è a cumpatibilità cù i nostri flussi di travagliu. Puderaghju ricunsiderà s'ellu R pò facirmenti carricà i schedari Parquet (o Arrow) senza a carica Spark. Tutti in u nostru labburatoriu usa R, è s'è aghju bisognu di cunvertisce i dati in un altru furmatu, aghju sempre i dati di u testu originale, perchè possu solu currerà u pipeline di novu.
Aghju debugged u flussu di travagliu nantu à un cromusomu, avà aghju bisognu di processà tutte l'altri dati.
Vuliu crià parechje istanze EC2 per a cunversione, ma à u listessu tempu aghju avutu a paura di ottene una carica assai sbilanciata in diversi travaglii di trasfurmazioni (cum'è Spark hà patitu partizioni sbilanciate). Inoltre, ùn era micca interessatu à elevà una istanza per cromusomu, perchè per i cunti AWS ci hè un limitu predeterminatu di 10 casi.
Allora aghju decisu di scrive un script in R per ottimisà i travaglii di trasfurmazioni.
Prima, aghju dumandatu à S3 di calculà quantu spaziu di almacenamentu ogni cromusomu occupatu.
library(aws.s3)
library(tidyverse)
chr_sizes <- get_bucket_df(
bucket = '...', prefix = '...', max = Inf
) %>%
mutate(Size = as.numeric(Size)) %>%
filter(Size != 0) %>%
mutate(
# Extract chromosome from the file name
chr = str_extract(Key, 'chr.{1,4}.csv') %>%
str_remove_all('chr|.csv')
) %>%
group_by(chr) %>%
summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB
# A tibble: 27 x 2
chr total_size
<chr> <dbl>
1 0 163.
2 1 967.
3 10 541.
4 11 611.
5 12 542.
6 13 364.
7 14 375.
8 15 372.
9 16 434.
10 17 443.
# … with 17 more rows
Allora aghju scrittu una funzione chì piglia a dimensione tutale, mescola l'ordine di i cromusomi, li divide in gruppi. num_jobs è vi dici quantu sò diffirenti e dimensioni di tutti i travaglii di trasfurmazioni.
num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7
shuffle_job <- function(i){
chr_sizes %>%
sample_frac() %>%
mutate(
cum_size = cumsum(total_size),
job_num = ceiling(cum_size/job_size)
) %>%
group_by(job_num) %>%
summarise(
job_chrs = paste(chr, collapse = ','),
total_job_size = sum(total_size)
) %>%
mutate(sd = sd(total_job_size)) %>%
nest(-sd)
}
shuffle_job(1)
# A tibble: 1 x 2
sd data
<dbl> <list>
1 153. <tibble [7 × 3]>
Allora aghju passatu mille shuffles cù purrr è sceltu u megliu.
Allora aghju finitu cun un settore di tarei chì eranu assai simili in grandezza. Allora tuttu ciò chì restava era di imballà u mo script Bash precedente in un grande ciclu for. Questa ottimisazione hà pigliatu circa 10 minuti per scrive. È questu hè assai menu di ciò chì spenderebbe per creà manualmente e attività s'ellu eranu sbilanciati. Dunque, pensu chì eru ghjustu cù questa ottimisazione preliminare.
for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi
À a fine aghju aghjunghje l'ordine di spegnimentu:
sudo shutdown -h now
... è tuttu hà travagliatu ! Utilizendu l'AWS CLI, aghju risuscitatu istanze cù l'opzione user_data li detti scripts Bash di i so compiti per u prucessu. Corru è chjusu automaticamente, cusì ùn aghju micca pagatu per una putenza di trasfurmazioni extra.
Chì aghju amparatu: L'API deve esse simplice per a facilità è a flessibilità di usu.
Finalmente aghju avutu i dati in u locu ghjustu è a forma. Tuttu ciò chì restava era di simplificà u prucessu di utilizà e dati quant'è pussibule per fà più faciule per i mo culleghi. Vuliu fà una API simplice per creà richieste. Se in u futuru decide di cambià da .rds à i schedari Parquet, allura stu deve esse un prublema per mè, micca per i mio culleghi. Per questu aghju decisu di fà un pacchettu R internu.
Custruisce è documenta un pacchettu assai simplice chì cuntene solu uni pochi di funzioni di accessu à dati urganizati intornu à una funzione get_snp. Aghju fattu ancu un situ web per i mo culleghi pkgdown, cusì ponu vede facilmente esempi è documentazione.
Caching intelligente
Chì aghju amparatu: Se i vostri dati sò ben preparati, a caching serà faciule!
Siccomu unu di i flussi di travagliu principali applicà u listessu mudellu di analisi à u pacchettu SNP, decisu di utilizà binning à u mo vantaghju. Quandu trasmette dati via SNP, tutte l'infurmazioni da u gruppu (bin) sò attaccati à l'ughjettu tornatu. Questu hè, e dumande vechje ponu (in teoria) accelerà u processu di e dumande novi.
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
Quandu custruì u pacchettu, aghju eseguitu assai benchmarks per paragunà a velocità quandu anu utilizatu metudi diffirenti. I ricumandemu micca di trascuratà questu, perchè qualchì volta i risultati sò inespettati. Per esempiu, dplyr::filter era assai più veloce di catturà fila cù u filtru basatu in l'indexazione, è ricuperà una sola colonna da un quadru di dati filtratu era assai più veloce di l'usu di a sintassi di l'indexazione.
Per piacè nutate chì l'ughjettu prev_snp_results cuntene a chjave snps_in_bin. Questu hè un array di tutti i SNP unichi in un gruppu (bin), chì vi permette di verificà rapidamente se avete digià dati da una dumanda precedente. Hè ancu facilitu per passà in tutti i SNP in un gruppu (bin) cù stu codice:
# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}
Risultati
Avà pudemu (è avemu principiatu à seriamente) eseguisce mudelli è scenarii chì prima eranu inaccessibili per noi. U megliu hè chì i mo culleghi di labburatoriu ùn anu micca da pensà à alcuna cumplicazione. Hanu solu una funzione chì travaglia.
E ancu s'ellu u pacchettu li risparmia i dettagli, aghju pruvatu à fà u furmatu di dati abbastanza simplice chì puderianu capisce s'ellu sparissi di colpu dumane...
A velocità hè aumentata notevolmente. Di solitu scannemu frammenti di genoma funziunale significativu. Nanzu, ùn pudemu micca fà questu (hè risultatu troppu caru), ma avà, grazia à a struttura di u gruppu (bin) è u caching, una dumanda per un SNP dura in media menu di 0,1 seconde, è l'usu di dati hè cusì bassu. chì u costu di S3 hè cacahuè.
Recentemente aghju messu in un cambiamentu di 25+ TB di dati di genotipu crudu per u mo laboratoriu. Quandu aghju cuminciatu, aduprà spark hà pigliatu 8 minuti è costò $ 20 per dumandà un SNP. Dopu aduprà AWK + #rstats per processà, avà piglia menu di un 10th di un secondu è costa $ 0.00001. U mo persunale #BigData vince. pic.twitter.com/ANOXVGrmkk
Questu articulu ùn hè micca una guida in tuttu. A suluzione hè stata individuale, è quasi certamenti micca ottimali. Piuttostu, hè un libru di viaghju. Vogliu chì l'altri capiscenu chì tali decisioni ùn pare micca cumplettamente furmatu in a testa, sò u risultatu di prucessu è errore. Inoltre, s'è vo circate un scientist di dati, tenite in mente chì l'usu di sti arnesi in modu efficau richiede sperienza, è l'esperienza costa soldi. Sò cuntentu chì aghju avutu i mezi di pagà, ma parechji altri chì ponu fà u stessu travagliu megliu cà mè ùn anu mai l'uppurtunità per mancanza di soldi per pruvà ancu.
I strumenti di big data sò versatili. Sè vo avete u tempu, pudete guasi certamente scrive una suluzione più veloce utilizendu tecniche intelligenti di pulizia, almacenamiento è estrazione di dati. Infine, si tratta di una analisi di u costu-benefiziu.
Ciò chì aghju amparatu:
ùn ci hè micca un modu economicu per analizà 25 TB à tempu;
esse attentu à a dimensione di i vostri schedari Parquet è a so urganizazione;
Partizioni in Spark deve esse equilibratu;
In generale, ùn pruvate mai di fà 2,5 milioni di partizioni;
A classificazione hè sempre difficiule, cum'è a stallazione di Spark;
qualchì volta dati spiciali richiede suluzioni spiciali;
L'aggregazione di Spark hè rapida, ma a partizione hè sempre caru;
ùn dorme micca quandu vi insegnanu i principii, qualchissia probabilmente hà digià risoltu u vostru prublema in l'anni 1980;
gnu parallel - questu hè una cosa magica, ognunu deve aduprà;
Spark li piace a dati senza cumpressione è ùn piace micca cumminà partizioni;
Spark hà troppu overhead quandu risolve i prublemi simplici;
L'arrays associativi di AWK sò assai efficaci;
pudete cuntattate stdin и stdout da un script R, è dunque aduprà in u pipeline;
Grazie à l'implementazione di u percorsu intelligente, S3 pò processà parechji schedari;
U mutivu principalu di perde u tempu hè l'optimizazione prematura di u vostru metudu di almacenamiento;
ùn pruvate micca di ottimisà i travaglii manualmente, lasciate l'urdinatore fà;
L'API deve esse simplice per a facilità è a flessibilità di usu;
Se i vostri dati sò ben preparati, u caching serà faciule!