Analizarea a 25 TB folosind AWK și R

Analizarea a 25 TB folosind AWK și R
Cum să citești acest articol: Îmi cer scuze că textul este atât de lung și haotic. Pentru a economisi timp, încep fiecare capitol cu ​​o introducere „Ce am învățat”, care rezumă esența capitolului în una sau două propoziții.

„Doar arată-mi soluția!” Dacă doriți doar să vedeți de unde am venit, atunci treceți la capitolul „Devin mai inventiv”, dar cred că este mai interesant și mai util să citiți despre eșec.

Am fost recent însărcinat cu configurarea unui proces pentru procesarea unui volum mare de secvențe brute de ADN (tehnic un cip SNP). Necesitatea a fost de a obține rapid date despre o anumită locație genetică (numită SNP) pentru modelarea ulterioară și alte sarcini. Folosind R și AWK, am reușit să curăț și să organizez datele într-un mod natural, accelerând foarte mult procesarea interogărilor. Acest lucru nu a fost ușor pentru mine și a necesitat numeroase iterații. Acest articol vă va ajuta să evitați unele dintre greșelile mele și vă va arăta cu ce am ajuns.

În primul rând, câteva explicații introductive.

De date

Centrul nostru universitar de procesare a informațiilor genetice ne-a furnizat date sub forma unui TSV de 25 TB. Le-am primit împărțite în 5 pachete, comprimate de Gzip, fiecare din care conținea aproximativ 240 de fișiere de patru gigabyte. Fiecare rând conținea date pentru un SNP de la un individ. În total, au fost transmise date despre ~ 2,5 milioane de SNP-uri și ~ 60 de mii de persoane. Pe lângă informațiile SNP, fișierele conțineau numeroase coloane cu numere care reflectă diferite caracteristici, cum ar fi intensitatea citirii, frecvența diferitelor alele etc. În total au fost aproximativ 30 de coloane cu valori unice.

Scop

Ca și în cazul oricărui proiect de gestionare a datelor, cel mai important lucru a fost să se determine cum vor fi utilizate datele. În acest caz vom selecta în mare parte modele și fluxuri de lucru pentru SNP bazate pe SNP. Adică, vom avea nevoie de date doar pe un SNP la un moment dat. A trebuit să învăț cum să recuperez toate înregistrările asociate cu unul dintre cele 2,5 milioane de SNP-uri cât mai ușor, rapid și ieftin posibil.

Cum să nu faci asta

Pentru a cita un clișeu potrivit:

Nu am eșuat de o mie de ori, doar am descoperit o mie de modalități de a evita analizarea a o grămadă de date într-un format de interogare.

Prima încercare

Ce am invatat: Nu există o modalitate ieftină de a analiza 25 TB la un moment dat.

După ce am urmat cursul „Metode avansate pentru procesarea datelor mari” de la Universitatea Vanderbilt, eram sigur că trucul era în geantă. Probabil că va dura o oră sau două pentru a configura serverul Hive pentru a rula toate datele și a raporta rezultatul. Deoarece datele noastre sunt stocate în AWS S3, am folosit serviciul Athena, care vă permite să aplicați interogări Hive SQL la datele S3. Nu trebuie să configurați/creșteți un cluster Hive și, de asemenea, plătiți numai pentru datele pe care le căutați.

După ce i-am arătat Athenei datele mele și formatul lor, am rulat câteva teste cu interogări ca aceasta:

select * from intensityData limit 10;

Și a primit rapid rezultate bine structurate. Gata.

Până când am încercat să folosim datele în munca noastră...

Mi s-a cerut să scot toate informațiile SNP pentru a testa modelul. Am rulat interogarea:


select * from intensityData 
where snp = 'rs123456';

...si a inceput sa astepte. După opt minute și peste 4 TB de date solicitate, am primit rezultatul. Athena taxează după volumul de date găsit, 5 USD pe terabyte. Deci, această cerere unică a costat 20 de dolari și opt minute de așteptare. Pentru a rula modelul pe toate datele, a trebuit să așteptăm 38 de ani și să plătim 50 de milioane de dolari, evident, acest lucru nu era potrivit pentru noi.

A fost necesar să folosim parchet...

Ce am invatat: Aveți grijă la dimensiunea fișierelor dvs. Parquet și la organizarea lor.

Am încercat mai întâi să remediez situația transformând toate TSV-urile în Pile de parchet. Sunt convenabile pentru lucrul cu seturi mari de date, deoarece informațiile din ele sunt stocate în formă de coloană: fiecare coloană se află în propriul segment de memorie/disc, spre deosebire de fișierele text, în care rândurile conțin elemente ale fiecărei coloane. Și dacă trebuie să găsiți ceva, atunci citiți doar coloana necesară. În plus, fiecare fișier stochează un interval de valori într-o coloană, așa că dacă valoarea pe care o căutați nu se află în intervalul coloanei, Spark nu va pierde timpul scanând întregul fișier.

Am executat o sarcină simplă AWS Adeziv pentru a converti TSV-urile noastre în Parquet și a aruncat noile fișiere în Athena. A durat aproximativ 5 ore. Dar când am executat cererea, a durat aproximativ aceeași cantitate de timp și ceva mai puțini bani pentru a finaliza. Faptul este că Spark, încercând să optimizeze sarcina, pur și simplu a despachetat o bucată TSV și a pus-o în propria bucată de parchet. Și pentru că fiecare bucată era suficient de mare pentru a conține înregistrările întregi ale multor persoane, fiecare fișier conținea toate SNP-urile, așa că Spark a trebuit să deschidă toate fișierele pentru a extrage informațiile de care avea nevoie.

Interesant este că tipul de compresie implicit (și recomandat) al Parquet, snappy, nu poate fi împărțit. Prin urmare, fiecare executant a fost blocat cu sarcina de a despacheta și descărca setul de date complet de 3,5 GB.

Analizarea a 25 TB folosind AWK și R

Să înțelegem problema

Ce am invatat: Sortarea este dificilă, mai ales dacă datele sunt distribuite.

Mi s-a părut că acum am înțeles esența problemei. Am avut nevoie doar să sortez datele după coloana SNP, nu după oameni. Apoi mai multe SNP-uri vor fi stocate într-o bucată de date separată, iar apoi funcția „inteligentă” a lui Parquet „se deschide numai dacă valoarea este în interval” se va arăta în toată gloria. Din păcate, sortarea miliardelor de rânduri împrăștiate într-un cluster s-a dovedit a fi o sarcină dificilă.

AWS cu siguranță nu dorește să emită o rambursare din cauza motivului „Sunt un student distras”. După ce am rulat sortarea pe Amazon Glue, a funcționat timp de 2 zile și s-a prăbușit.

Ce zici de partiţionare?

Ce am invatat: Partițiile din Spark trebuie să fie echilibrate.

Apoi mi-a venit ideea de a împărți datele în cromozomi. Există 23 dintre ele (și mai multe dacă luați în considerare ADN-ul mitocondrial și regiunile necartografiate).
Acest lucru vă va permite să împărțiți datele în bucăți mai mici. Dacă adăugați o singură linie la funcția de export Spark în scriptul Glue partition_by = "chr", apoi datele ar trebui împărțite în găleți.

Analizarea a 25 TB folosind AWK și R
Genomul este format din numeroase fragmente numite cromozomi.

Din păcate, nu a funcționat. Cromozomii au dimensiuni diferite, ceea ce înseamnă cantități diferite de informații. Aceasta înseamnă că sarcinile pe care Spark le-a trimis lucrătorilor nu au fost echilibrate și finalizate lent, deoarece unele noduri au terminat mai devreme și au fost inactive. Cu toate acestea, sarcinile au fost finalizate. Dar când a cerut un SNP, dezechilibrul a creat din nou probleme. Costul procesării SNP-urilor pe cromozomi mai mari (adică de unde vrem să obținem date) a scăzut doar cu aproximativ 10 factor. Mult, dar nu suficient.

Dacă îl împărțim în părți și mai mici?

Ce am invatat: Nu încercați niciodată să faceți 2,5 milioane de partiții.

Am decis să fac totul și să împărțim fiecare SNP. Acest lucru a asigurat că partițiile erau de dimensiuni egale. A fost o idee proasta. Am folosit Glue și am adăugat o linie inocentă partition_by = 'snp'. Sarcina a început și a început să se execute. O zi mai târziu, am verificat și am văzut că încă nu era nimic scris pe S3, așa că am oprit sarcina. Se pare că Glue scria fișiere intermediare într-o locație ascunsă în S3, o mulțime de fișiere, poate câteva milioane. Drept urmare, greșeala mea a costat mai mult de o mie de dolari și nu l-a mulțumit mentorului meu.

Partiționare + sortare

Ce am invatat: Sortarea este încă dificilă, la fel ca și reglarea Spark.

Ultima mea încercare de partiție a implicat să împărțim cromozomii și apoi să sortez fiecare partiție. În teorie, acest lucru ar accelera fiecare interogare, deoarece datele SNP dorite trebuiau să fie în câteva bucăți Parquet într-un interval dat. Din păcate, sortarea chiar și a datelor partiționate s-a dovedit a fi o sarcină dificilă. Ca rezultat, am trecut la EMR pentru un cluster personalizat și am folosit opt ​​instanțe puternice (C5.4xl) și Sparklyr pentru a crea un flux de lucru mai flexibil...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... cu toate acestea, sarcina încă nu a fost finalizată. L-am configurat în diferite moduri: am crescut alocarea de memorie pentru fiecare executant de interogare, am folosit noduri cu o cantitate mare de memorie, am folosit variabile de difuzare (variabile de difuzare), dar de fiecare dată acestea s-au dovedit a fi jumătate de măsură și, treptat, executanții au început să eșueze până când totul s-a oprit.

Devin din ce în ce mai creativ

Ce am invatat: Uneori, datele speciale necesită soluții speciale.

Fiecare SNP are o valoare de poziție. Acesta este un număr corespunzător numărului de baze de-a lungul cromozomului său. Acesta este un mod frumos și natural de a ne organiza datele. La început am vrut să împărțim pe regiuni ale fiecărui cromozom. De exemplu, pozițiile 1 - 2000, 2001 - 4000 etc. Dar problema este că SNP-urile nu sunt distribuite uniform între cromozomi, astfel încât mărimile grupurilor vor varia foarte mult.

Analizarea a 25 TB folosind AWK și R

Ca urmare, am ajuns la o împărțire a pozițiilor pe categorii (rang). Folosind datele deja descărcate, am lansat o solicitare pentru a obține o listă de SNP-uri unice, pozițiile și cromozomii acestora. Apoi am sortat datele din fiecare cromozom și am colectat SNP-uri în grupuri (bină) de o anumită dimensiune. Să spunem 1000 de SNP fiecare. Acest lucru mi-a dat relația SNP-la-grup-pe-cromozom.

La final, am făcut grupuri (bin) de 75 de SNP, motivul va fi explicat mai jos.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Mai întâi încearcă cu Spark

Ce am invatat: agregarea Spark este rapidă, dar partiționarea este încă costisitoare.

Am vrut să citesc acest mic cadru de date (2,5 milioane de rânduri) în Spark, să-l combin cu datele brute și apoi să îl parționez după coloana nou adăugată bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

obisnuiam sdf_broadcast(), așa că Spark știe că ar trebui să trimită cadrul de date către toate nodurile. Acest lucru este util dacă datele sunt de dimensiuni mici și sunt necesare pentru toate sarcinile. În caz contrar, Spark încearcă să fie inteligent și distribuie datele după cum este necesar, ceea ce poate provoca încetiniri.

Și iarăși, ideea mea nu a funcționat: sarcinile au funcționat de ceva timp, au finalizat unirea și apoi, ca și executorii lansati prin partiționare, au început să eșueze.

Adăugarea AWK

Ce am invatat: Nu dormi când ești învățat elementele de bază. Cu siguranță cineva a rezolvat deja problema ta în anii 1980.

Până în acest moment, motivul pentru toate eșecurile mele cu Spark a fost amestecul de date din cluster. Poate că situația poate fi îmbunătățită cu un pre-tratament. Am decis să încerc să împart datele text brute în coloane de cromozomi, așa că am sperat să ofer Spark date „pre-partiționate”.

Am căutat pe StackOverflow cum să împart după valorile coloanei și am găsit un răspuns atât de grozav. Cu AWK puteți împărți un fișier text după valorile coloanei scriindu-l într-un script, mai degrabă decât trimițând rezultatele către stdout.

Am scris un script Bash pentru a-l încerca. Am descărcat unul dintre TSV-urile ambalate, apoi l-am despachetat folosind gzip și trimis la awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

A mers!

Umplerea miezurilor

Ce am invatat: gnu parallel - este un lucru magic, toată lumea ar trebui să-l folosească.

Despărțirea a fost destul de lentă și când am început htoppentru a verifica utilizarea unei instanțe EC2 puternice (și costisitoare), s-a dovedit că foloseam doar un nucleu și aproximativ 200 MB de memorie. Pentru a rezolva problema și a nu pierde mulți bani, a trebuit să ne dăm seama cum să paralelizăm munca. Din fericire, într-o carte absolut uimitoare Știința datelor la linia de comandă Am găsit un capitol al lui Jeron Janssens despre paralelizare. Din el am aflat despre gnu parallel, o metodă foarte flexibilă pentru implementarea multithreading-ului în Unix.

Analizarea a 25 TB folosind AWK și R
Când am început partiționarea folosind noul proces, totul a fost în regulă, dar a existat încă un blocaj - descărcarea obiectelor S3 pe disc nu a fost foarte rapidă și nu a fost complet paralelizată. Pentru a remedia asta, am făcut asta:

  1. Am aflat că este posibil să implementez etapa de descărcare S3 direct în pipeline, eliminând complet stocarea intermediară pe disc. Acest lucru înseamnă că pot evita să scriu date brute pe disc și să folosesc spațiu de stocare și mai mic și, prin urmare, mai ieftin pe AWS.
  2. echipă aws configure set default.s3.max_concurrent_requests 50 a crescut foarte mult numărul de fire pe care le utilizează AWS CLI (în mod implicit sunt 10).
  3. Am trecut la o instanță EC2 optimizată pentru viteza rețelei, cu litera n în nume. Am descoperit că pierderea puterii de procesare atunci când se utilizează n-instanțe este mai mult decât compensată de creșterea vitezei de încărcare. Pentru majoritatea sarcinilor am folosit c5n.4xl.
  4. Schimbat gzip pe pigz, acesta este un instrument gzip care poate face lucruri interesante pentru a paraleliza sarcina inițial neparalelizată de decomprimare a fișierelor (acest lucru a ajutat cel mai puțin).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Acești pași sunt combinați unul cu altul pentru ca totul să funcționeze foarte repede. Prin creșterea vitezei de descărcare și prin eliminarea scrierilor pe disc, acum am putut procesa un pachet de 5 terabyți în doar câteva ore.

Acest tweet ar fi trebuit să menționeze „TSV”. Vai.

Folosind date nou analizate

Ce am invatat: Lui Spark îi plac datele necomprimate și nu-i place să combine partițiile.

Acum datele erau în S3 într-un format despachetat (a se citi: partajat) și semi-ordonat și puteam reveni din nou la Spark. Mă aștepta o surpriză: din nou nu am reușit să realizez ceea ce mi-am dorit! A fost foarte dificil să-i spui lui Spark exact cum erau partiționate datele. Și chiar și când am făcut asta, sa dovedit că erau prea multe partiții (95 de mii) și când am folosit coalesce redus numărul lor la limite rezonabile, acest lucru mi-a distrus împărțirea. Sunt sigur că se poate rezolva, dar după câteva zile de căutare nu am găsit o soluție. În cele din urmă, am terminat toate sarcinile din Spark, deși a durat ceva timp și fișierele mele Parchet împărțite nu erau foarte mici (~200 KB). Cu toate acestea, datele erau acolo unde era nevoie.

Analizarea a 25 TB folosind AWK și R
Prea mic și neuniform, minunat!

Testarea interogărilor locale Spark

Ce am invatat: Spark are prea multă sarcină atunci când rezolvă probleme simple.

Descărcând datele într-un format inteligent, am putut testa viteza. Configurați un script R pentru a rula un server Spark local și apoi încărcați un cadru de date Spark din spațiul de stocare al grupului Parquet specificat (bină). Am încercat să încarc toate datele, dar nu am reușit să-l fac pe Sparklyr să recunoască partiționarea.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Execuția a durat 29,415 secunde. Mult mai bine, dar nu prea bun pentru testarea în masă a ceva. În plus, nu am putut accelera lucrurile cu memorarea în cache, deoarece atunci când am încercat să memorez un cadru de date în memorie, Spark se prăbușește întotdeauna, chiar și atunci când am alocat mai mult de 50 GB de memorie unui set de date care cântărea mai puțin de 15.

Reveniți la AWK

Ce am invatat: Matricele asociative în AWK sunt foarte eficiente.

Mi-am dat seama că pot atinge viteze mai mari. Mi-am amintit asta într-un mod minunat Tutorial AWK de Bruce Barnett Am citit despre o caracteristică grozavă numită „tablouri asociative" În esență, acestea sunt perechi cheie-valoare, care din anumite motive au fost numite diferit în AWK și, prin urmare, nu m-am gândit prea mult la ele. Roman Cheplyaka a reamintit că termenul „matrice asociative” este mult mai vechi decât termenul „pereche cheie-valoare”. Chiar daca tu căutați cheia-valoare în Google Ngram, nu veți vedea acest termen acolo, dar veți găsi matrice asociative! În plus, „perechea cheie-valoare” este cel mai adesea asociată cu bazele de date, așa că este mult mai logic să o comparăm cu o hartă hash. Mi-am dat seama că aș putea folosi aceste matrice asociative pentru a-mi asocia SNP-urile cu un tabel bin și date brute fără a folosi Spark.

Pentru a face acest lucru, în scriptul AWK am folosit blocul BEGIN. Aceasta este o bucată de cod care este executată înainte ca prima linie de date să fie transmisă în corpul principal al scriptului.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Echipă while(getline...) a încărcat toate rândurile din grupul CSV (bin), setați prima coloană (numele SNP) ca cheie pentru tabloul asociativ bin iar a doua valoare (grup) ca valoare. Apoi în bloc { }, care se execută pe toate liniile fișierului principal, fiecare linie este trimisă la fișierul de ieșire, care primește un nume unic în funcție de grupul său (bin): ..._bin_"bin[$1]"_....

variabile batch_num и chunk_id a potrivit datele furnizate de conductă, evitând o condiție de cursă și fiecare fir de execuție rulând parallel, a scris în propriul fișier unic.

Deoarece am împrăștiat toate datele brute în foldere de pe cromozomi rămase din experimentul meu anterior cu AWK, acum aș putea scrie un alt script Bash pentru a procesa câte un cromozom și a trimite date mai profunde partiționate la S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scenariul are două secțiuni parallel.

În prima secțiune, datele sunt citite din toate fișierele care conțin informații despre cromozomul dorit, apoi aceste date sunt distribuite pe fire, care distribuie fișierele în grupurile corespunzătoare (bin). Pentru a evita condițiile de cursă atunci când mai multe fire de execuție scriu în același fișier, AWK transmite numele fișierelor pentru a scrie date în locuri diferite, de ex. chr_10_bin_52_batch_2_aa.csv. Ca rezultat, multe fișiere mici sunt create pe disc (pentru aceasta am folosit volume EBS de terabyte).

Transportor din a doua secțiune parallel trece prin grupuri (bin) și combină fișierele lor individuale în CSV comun c catși apoi le trimite la export.

Emite în R?

Ce am invatat: Tu poti contacta stdin и stdout dintr-un script R și, prin urmare, folosiți-l în pipeline.

S-ar putea să fi observat această linie în scriptul Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce toate fișierele de grup concatenate (bin) în scriptul R de mai jos. {} este o tehnică specială parallel, care inserează toate datele pe care le trimite către fluxul specificat direct în comanda însăși. Opțiune {#} oferă un ID unic de fir și {%} reprezintă numărul locului de muncă (repetat, dar niciodată simultan). O listă cu toate opțiunile poate fi găsită în documentație.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Când o variabilă file("stdin") transmis la readr::read_csv, datele traduse în scriptul R sunt încărcate într-un cadru, care este apoi în formă .rds-fișier folosind aws.s3 scris direct pe S3.

RDS este ceva asemănător cu o versiune junior a parchetului, fără bibelourile depozitării difuzoarelor.

După ce am terminat scriptul Bash, am primit un pachet .rds-fișiere situate în S3, ceea ce mi-a permis să folosesc compresie eficientă și tipuri încorporate.

În ciuda utilizării frânei R, totul a funcționat foarte repede. Nu este surprinzător că părțile lui R care citesc și scriu date sunt foarte optimizate. După testarea pe un cromozom de dimensiuni medii, lucrarea a fost finalizată pe o instanță C5n.4xl în aproximativ două ore.

S3 Limitări

Ce am invatat: Datorită implementării căii inteligente, S3 poate gestiona multe fișiere.

Eram îngrijorat dacă S3 va fi capabil să gestioneze numeroasele fișiere care au fost transferate în el. Aș putea face ca numele fișierelor să aibă sens, dar cum le-ar căuta S3?

Analizarea a 25 TB folosind AWK și R
Folderele din S3 sunt doar pentru afișare, de fapt sistemul nu este interesat de simbol /. Din pagina Întrebări frecvente S3.

Se pare că S3 reprezintă calea către un anumit fișier ca o cheie simplă într-un fel de tabel hash sau bază de date bazată pe documente. O găleată poate fi considerată ca un tabel, iar fișierele pot fi considerate înregistrări în acel tabel.

Deoarece viteza și eficiența sunt importante pentru a obține un profit la Amazon, nu este surprinzător faptul că acest sistem de cheie ca cale de fișier este extrem de optimizat. Am încercat să găsesc un echilibru: astfel încât să nu fi fost nevoit să fac o mulțime de cereri get, dar să fie executate rapid cererile. S-a dovedit că cel mai bine este să faci aproximativ 20 de mii de fișiere bin. Cred că dacă continuăm să optimizăm, putem obține o creștere a vitezei (de exemplu, realizarea unui bucket special doar pentru date, reducând astfel dimensiunea tabelului de căutare). Dar nu a existat timp sau bani pentru experimente ulterioare.

Dar compatibilitatea încrucișată?

Ce am învățat: cauza numărul unu a pierderii de timp este optimizarea prematură a metodei de stocare.

În acest moment, este foarte important să vă întrebați: „De ce să folosiți un format de fișier proprietar?” Motivul constă în viteza de încărcare (încărcarea fișierelor CSV gzipped a durat de 7 ori mai mult) și compatibilitatea cu fluxurile noastre de lucru. S-ar putea să mă gândesc dacă R poate încărca cu ușurință fișierele Parquet (sau Arrow) fără încărcarea Spark. Toată lumea din laboratorul nostru folosește R și, dacă trebuie să convertesc datele într-un alt format, mai am datele text originale, așa că pot rula din nou pipeline.

Diviziunea muncii

Ce am invatat: Nu încercați să optimizați manual lucrările, lăsați computerul să o facă.

Am depanat fluxul de lucru pe un cromozom, acum trebuie să procesez toate celelalte date.
Am vrut să ridic mai multe instanțe EC2 pentru conversie, dar, în același timp, îmi era teamă să nu obțin o încărcare foarte dezechilibrată în diferite joburi de procesare (la fel cum Spark suferea din cauza partițiilor dezechilibrate). În plus, nu m-a interesat să ridic câte o instanță pe cromozom, deoarece pentru conturile AWS există o limită implicită de 10 instanțe.

Apoi am decis să scriu un script în R pentru a optimiza joburile de procesare.

Mai întâi, i-am cerut lui S3 să calculeze cât spațiu de stocare a ocupat fiecare cromozom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Apoi am scris o funcție care ia dimensiunea totală, amestecă ordinea cromozomilor, îi împarte în grupuri num_jobs și vă spune cât de diferite sunt dimensiunile tuturor lucrărilor de procesare.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Apoi am trecut prin o mie de amestecări folosind torc și am ales cel mai bun.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Așa că am ajuns cu un set de sarcini care erau foarte asemănătoare ca dimensiune. Apoi tot ce a mai rămas a fost să închei scriptul meu Bash anterior într-o buclă mare for. Scrierea acestei optimizari a durat aproximativ 10 minute. Și aceasta este mult mai puțin decât aș cheltui pentru crearea manuală a sarcinilor dacă ar fi dezechilibrate. Prin urmare, cred că am avut dreptate cu această optimizare preliminară.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

La final adaug comanda de oprire:

sudo shutdown -h now

...si totul a iesit! Folosind AWS CLI, am generat instanțe folosind opțiunea user_data le-a dat scripturi Bash ale sarcinilor lor pentru procesare. Au rulat și s-au oprit automat, așa că nu plăteam pentru putere suplimentară de procesare.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Hai să împachetăm!

Ce am invatat: API-ul ar trebui să fie simplu de dragul ușurinței și flexibilității utilizării.

În cele din urmă, am primit datele în locul și forma potrivite. Tot ce a rămas a fost să simplific procesul de utilizare a datelor cât mai mult posibil pentru a le ușura colegilor mei. Am vrut să fac un API simplu pentru crearea solicitărilor. Daca in viitor ma decid sa trec de la .rds la fișierele Parquet, atunci aceasta ar trebui să fie o problemă pentru mine, nu pentru colegii mei. Pentru asta am decis să fac un pachet R intern.

Construiți și documentați un pachet foarte simplu care conține doar câteva funcții de acces la date organizate în jurul unei funcții get_snp. Am facut si un site pentru colegii mei pkgdown, astfel încât să poată vedea cu ușurință exemple și documentație.

Analizarea a 25 TB folosind AWK și R

Memorare inteligentă în cache

Ce am invatat: Dacă datele dumneavoastră sunt bine pregătite, stocarea în cache va fi ușoară!

Deoarece unul dintre fluxurile de lucru principale a aplicat același model de analiză pachetului SNP, am decis să folosesc binning-ul în avantajul meu. La transmiterea datelor prin SNP, toate informațiile din grup (bin) sunt atașate obiectului returnat. Adică, interogările vechi pot (teoretic) să accelereze procesarea interogărilor noi.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Când am construit pachetul, am rulat multe benchmark-uri pentru a compara viteza când am folosit diferite metode. Recomand să nu neglijăm acest lucru, pentru că uneori rezultatele sunt neașteptate. De exemplu, dplyr::filter a fost mult mai rapidă decât capturarea rândurilor folosind filtrarea bazată pe indexare, iar preluarea unei singure coloane dintr-un cadru de date filtrat a fost mult mai rapidă decât utilizarea sintaxei de indexare.

Vă rugăm să rețineți că obiectul prev_snp_results conţine cheia snps_in_bin. Aceasta este o serie de toate SNP-urile unice într-un grup (bină), permițându-vă să verificați rapid dacă aveți deja date dintr-o interogare anterioară. De asemenea, face ușoară trecerea în buclă prin toate SNP-urile dintr-un grup (bină) cu acest cod:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Constatări

Acum putem (și am început să rulăm serios) modele și scenarii care înainte ne erau inaccesibile. Cel mai bun lucru este că colegii mei de laborator nu trebuie să se gândească la nicio complicație. Au doar o funcție care funcționează.

Și, deși pachetul le scutește de detalii, am încercat să simplific formatul de date, încât să-și dea seama dacă aș dispărea brusc mâine...

Viteza a crescut considerabil. De obicei scanăm fragmente de genom semnificative din punct de vedere funcțional. Anterior, nu puteam face acest lucru (s-a dovedit a fi prea scump), dar acum, datorită structurii grupului (bin) și caching-ului, o solicitare pentru un SNP durează în medie mai puțin de 0,1 secunde, iar utilizarea datelor este atât de mare. scăzut că costurile pentru S3 sunt arahide.

Concluzie

Acest articol nu este deloc un ghid. Soluția s-a dovedit a fi individuală și aproape sigur nu optimă. Mai degrabă, este un jurnal de călătorie. Vreau ca alții să înțeleagă că astfel de decizii nu apar pe deplin formate în cap, sunt rezultatul încercării și erorii. De asemenea, dacă sunteți în căutarea unui cercetător de date, rețineți că utilizarea eficientă a acestor instrumente necesită experiență, iar experiența costă bani. Mă bucur că am avut mijloacele să plătesc, dar mulți alții care pot face aceeași treabă mai bine decât mine nu vor avea niciodată ocazia din lipsă de bani să încerce.

Instrumentele de date mari sunt versatile. Dacă aveți timp, aproape sigur că puteți scrie o soluție mai rapidă folosind tehnici inteligente de curățare, stocare și extracție a datelor. În cele din urmă, se reduce la o analiză cost-beneficiu.

Ce am învățat:

  • nu există o modalitate ieftină de a analiza 25 TB la un moment dat;
  • Aveți grijă la dimensiunea fișierelor dvs. Parquet și la organizarea acestora;
  • Partițiile din Spark trebuie să fie echilibrate;
  • În general, nu încercați niciodată să faceți 2,5 milioane de partiții;
  • Sortarea este încă dificilă, la fel ca și configurarea Spark;
  • uneori datele speciale necesită soluții speciale;
  • Agregarea Spark este rapidă, dar partiționarea este încă costisitoare;
  • nu dormi când te învață elementele de bază, probabil că cineva ți-a rezolvat deja problema în anii 1980;
  • gnu parallel - acesta este un lucru magic, toată lumea ar trebui să-l folosească;
  • Lui Spark îi plac datele necomprimate și nu-i place combinarea partițiilor;
  • Spark are prea multă sarcină atunci când rezolvă probleme simple;
  • Matricele asociative ale AWK sunt foarte eficiente;
  • tu poti contacta stdin и stdout dintr-un script R și, prin urmare, folosiți-l în conductă;
  • Datorită implementării căii inteligente, S3 poate procesa multe fișiere;
  • Motivul principal pentru pierderea timpului este optimizarea prematură a metodei de stocare;
  • nu încercați să optimizați manual sarcinile, lăsați computerul să o facă;
  • API-ul ar trebui să fie simplu de dragul ușurinței și flexibilității utilizării;
  • Dacă datele dumneavoastră sunt bine pregătite, stocarea în cache va fi ușoară!

Sursa: www.habr.com

Adauga un comentariu