Parsing 25TB koristeći AWK i R

Parsing 25TB koristeći AWK i R
Kako čitati ovaj članak: Izvinjavam se što je tekst tako dugačak i haotičan. Kako bih vam uštedio vrijeme, svako poglavlje počinjem uvodom „Ono što sam naučio“, koji suštinu poglavlja sažima u jednoj ili dvije rečenice.

“Samo mi pokaži rješenje!” Ako samo želite da vidite odakle sam došao, onda preskočite na poglavlje „Postati inventivniji“, ali mislim da je zanimljivije i korisnije čitati o neuspjehu.

Nedavno sam dobio zadatak da postavim proces za obradu velike količine sirovih DNK sekvenci (tehnički SNP čip). Potreba je bila da se brzo dobiju podaci o datoj genetskoj lokaciji (nazvanoj SNP) za naknadno modeliranje i druge zadatke. Koristeći R i AWK, uspio sam očistiti i organizirati podatke na prirodan način, uvelike ubrzavši obradu upita. Ovo mi nije bilo lako i zahtijevalo je brojne iteracije. Ovaj članak će vam pomoći da izbjegnete neke od mojih grešaka i pokazati vam s čime sam završio.

Prvo, neka uvodna objašnjenja.

podaci

Naš univerzitetski centar za genetičku obradu informacija pružio nam je podatke u obliku TSV-a od 25 TB. Dobio sam ih podijeljene u 5 paketa, komprimiranih Gzip-om, od kojih je svaki sadržavao oko 240 datoteka od četiri gigabajta. Svaki red je sadržavao podatke za jedan SNP od jedne osobe. Ukupno su preneseni podaci o ~2,5 miliona SNP-a i ~60 hiljada ljudi. Pored informacija o SNP-u, datoteke su sadržavale brojne kolone s brojevima koji odražavaju različite karakteristike, kao što su intenzitet čitanja, učestalost različitih alela, itd. Ukupno je bilo oko 30 kolona sa jedinstvenim vrijednostima.

Cilj

Kao i kod svakog projekta upravljanja podacima, najvažnije je bilo odrediti kako će se podaci koristiti. U ovom slučaju uglavnom ćemo birati modele i tokove rada za SNP zasnovane na SNP-u. Odnosno, biće nam potrebni podaci samo o jednom SNP-u. Morao sam naučiti kako da dođem do svih zapisa povezanih s jednim od 2,5 miliona SNP-ova što je lakše, brže i jeftinije.

Kako ovo ne raditi

Da citiram odgovarajući kliše:

Nisam uspio hiljadu puta, samo sam otkrio hiljadu načina da izbjegnem raščlanjivanje gomile podataka u formatu pogodnom za upite.

Prvi pokušaj

Šta sam naučio: Ne postoji jeftin način da se analizira 25 TB odjednom.

Nakon što sam pohađao kurs “Napredne metode za obradu velikih podataka” na Univerzitetu Vanderbilt, bio sam siguran da je trik u torbi. Vjerovatno će trebati sat ili dva da se Hive server postavi da prođe kroz sve podatke i prijavi rezultat. Pošto su naši podaci pohranjeni u AWS S3, koristio sam uslugu Atina, koji vam omogućava da primijenite Hive SQL upite na S3 podatke. Ne morate da postavljate/podižete klaster Hive, a takođe plaćate samo za podatke koje tražite.

Nakon što sam Ateni pokazao svoje podatke i njihov format, izvršio sam neke testove sa upitima poput ovog:

select * from intensityData limit 10;

I brzo dobio dobro strukturirane rezultate. Spreman.

Sve dok nismo pokušali da koristimo podatke u svom radu...

Od mene je zatraženo da izvučem sve SNP informacije da testiram model. Pokrenuo sam upit:


select * from intensityData 
where snp = 'rs123456';

...i počeo da čeka. Nakon osam minuta i više od 4 TB traženih podataka, dobio sam rezultat. Athena naplaćuje po količini pronađenih podataka, 5 dolara po terabajtu. Dakle, ovaj pojedinačni zahtjev košta 20 dolara i osam minuta čekanja. Da bismo pokrenuli model na svim podacima, morali smo čekati 38 godina i platiti 50 miliona dolara.Očigledno nam ovo nije odgovaralo.

Bilo je potrebno koristiti parket...

Šta sam naučio: Budite oprezni s veličinom vaših Parket datoteka i njihovom organizacijom.

Prvo sam pokušao popraviti situaciju pretvaranjem svih TSV-ova u Turpije za parket. Pogodni su za rad sa velikim skupovima podataka jer se informacije u njima pohranjuju u kolonskom obliku: svaka kolona leži u svom segmentu memorije/diska, za razliku od tekstualnih datoteka, u kojima redovi sadrže elemente svake kolone. A ako trebate nešto pronaći, onda samo pročitajte traženu kolumnu. Osim toga, svaka datoteka pohranjuje raspon vrijednosti u koloni, tako da ako vrijednost koju tražite nije u rasponu kolone, Spark neće gubiti vrijeme skenirajući cijelu datoteku.

Izvršio sam jednostavan zadatak AWS lepak da konvertujemo naše TSV-ove u parket i bacimo nove fajlove u Athenu. Trajalo je oko 5 sati. Ali kada sam pokrenuo zahtjev, trebalo mi je otprilike isto toliko vremena i malo manje novca da se izvrši. Činjenica je da je Spark, pokušavajući optimizirati zadatak, jednostavno raspakirao jedan TSV komad i stavio ga u vlastiti Parquet komad. I pošto je svaki komad bio dovoljno velik da sadrži čitave zapise mnogih ljudi, svaki fajl je sadržavao sve SNP-ove, tako da je Spark morao otvoriti sve datoteke kako bi izdvojio informacije koje su mu bile potrebne.

Zanimljivo je da se zadani (i preporučeni) tip kompresije Parketa, snappy, ne može podijeliti. Stoga je svaki izvršilac zaglavio na zadatku raspakivanja i preuzimanja kompletnog skupa podataka od 3,5 GB.

Parsing 25TB koristeći AWK i R

Hajde da razumemo problem

Šta sam naučio: Sortiranje je teško, posebno ako su podaci distribuirani.

Činilo mi se da sam sada shvatio suštinu problema. Trebao sam samo sortirati podatke po SNP koloni, a ne po ljudima. Tada će nekoliko SNP-ova biti pohranjeno u zasebnom bloku podataka, a onda će se Parquetova "pametna" funkcija "otvorena samo ako je vrijednost u rasponu" pokazati u svom svom sjaju. Nažalost, sortiranje kroz milijarde redova raštrkanih po klasteru pokazalo se teškim zadatkom.

AWS definitivno ne želi vratiti novac zbog razloga „Ja sam rastrojen student“. Nakon što sam pokrenuo sortiranje na Amazon Glueu, radio je 2 dana i srušio se.

Šta je sa particioniranjem?

Šta sam naučio: Particije u Spark-u moraju biti izbalansirane.

Tada sam došao na ideju da podelim podatke u hromozome. Ima ih 23 (i još nekoliko ako uzmete u obzir mitohondrijsku DNK i nemapirane regije).
Ovo će vam omogućiti da podijelite podatke na manje dijelove. Ako dodate samo jedan red u funkciju izvoza Spark u Glue skriptu partition_by = "chr", onda podatke treba podijeliti u segmente.

Parsing 25TB koristeći AWK i R
Genom se sastoji od brojnih fragmenata koji se nazivaju hromozomi.

Nažalost, nije uspjelo. Kromosomi imaju različite veličine, što znači različite količine informacija. To znači da zadaci koje je Spark slao radnicima nisu bili izbalansirani i dovršeni sporo jer su neki čvorovi završili rano i bili u mirovanju. Međutim, zadaci su obavljeni. Ali kada se tražio jedan SNP, disbalans je opet izazvao probleme. Troškovi obrade SNP-ova na većim hromozomima (odnosno tamo gdje želimo da dobijemo podatke) smanjili su se samo za oko 10 faktora. Mnogo, ali nedovoljno.

Šta ako ga podijelimo na još manje dijelove?

Šta sam naučio: Nikada ne pokušavajte napraviti 2,5 miliona particija.

Odlučio sam dati sve od sebe i podijelio svaki SNP. Time je osigurano da su pregrade jednake veličine. BILA JE LOŠA IDEJA. Koristio sam Glue i dodao nevinu liniju partition_by = 'snp'. Zadatak je započeo i počeo se izvršavati. Dan kasnije sam provjerio i vidio da još uvijek ništa nije napisano na S3, pa sam prekinuo zadatak. Izgleda da je Glue pisao međufajlove na skrivenu lokaciju u S3, puno fajlova, možda nekoliko miliona. Kao rezultat toga, moja greška koštala je više od hiljadu dolara i nije se svidjela mom mentoru.

Particioniranje + sortiranje

Šta sam naučio: Sortiranje je i dalje teško, kao i podešavanje Spark-a.

Moj posljednji pokušaj particioniranja uključivao je particioniranje hromozoma i zatim sortiranje svake particije. U teoriji, ovo bi ubrzalo svaki upit jer su željeni SNP podaci morali biti unutar nekoliko Parquet komada unutar zadanog raspona. Nažalost, pokazalo se da je sortiranje čak i particioniranih podataka težak zadatak. Kao rezultat toga, prešao sam na EMR za prilagođeni klaster i koristio osam moćnih instanci (C5.4xl) i Sparklyr da stvorim fleksibilniji radni tok...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...međutim, zadatak još uvijek nije završen. Konfigurirao sam ga na različite načine: povećao dodjelu memorije za svaki izvršilac upita, koristio čvorove s velikom količinom memorije, koristio varijable emitiranja (broadcasting varijable), ali svaki put se pokazalo da su to polumjere, i postepeno su izvršioci počeli propasti dok sve ne stane.

Postajem kreativniji

Šta sam naučio: Ponekad posebni podaci zahtijevaju posebna rješenja.

Svaki SNP ima vrijednost pozicije. Ovo je broj koji odgovara broju baza duž njegovog hromozoma. Ovo je lijep i prirodan način organiziranja naših podataka. U početku sam htio podijeliti po regijama svakog hromozoma. Na primjer, pozicije 1 - 2000, 2001 - 4000, itd. Ali problem je u tome što SNP-ovi nisu ravnomjerno raspoređeni po hromozomima, pa će stoga veličine grupa uvelike varirati.

Parsing 25TB koristeći AWK i R

Kao rezultat, došao sam do raščlanjivanja pozicija u kategorije (rang). Koristeći već preuzete podatke, pokrenuo sam zahtjev za dobivanje liste jedinstvenih SNP-ova, njihovih pozicija i hromozoma. Zatim sam sortirao podatke unutar svakog hromozoma i sakupio SNP-ove u grupe (bin) određene veličine. Recimo po 1000 SNP-ova. Ovo mi je dalo odnos SNP prema grupi po hromozomu.

Na kraju sam napravio grupe (bin) od 75 SNP-a, razlog će biti objašnjen u nastavku.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Prvi pokušaj sa Sparkom

Šta sam naučio: Spark agregacija je brza, ali je particioniranje i dalje skupo.

Htio sam pročitati ovaj mali (2,5 miliona reda) okvir podataka u Spark, kombinirati ga sa sirovim podacima, a zatim ga podijeliti po novododatoj koloni bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

koristio sam sdf_broadcast(), tako da Spark zna da treba poslati okvir podataka svim čvorovima. Ovo je korisno ako su podaci male veličine i potrebni za sve zadatke. Inače, Spark pokušava biti pametan i distribuira podatke po potrebi, što može uzrokovati usporavanje.

I opet, moja ideja nije uspjela: zadaci su radili neko vrijeme, dovršili uniju, a onda su, kao i izvršioci pokrenuti podjelom, počeli propadati.

Dodavanje AWK

Šta sam naučio: Nemojte spavati kada vas uče osnove. Sigurno je neko već 1980-ih riješio vaš problem.

Do ove tačke, razlog za sve moje neuspjehe sa Spark-om bila je zbrka podataka u klasteru. Možda se situacija može popraviti prethodnim tretmanom. Odlučio sam da pokušam da podelim neobrađene tekstualne podatke u kolone hromozoma, pa sam se nadao da ću Sparku obezbediti „pre-particionirane“ podatke.

Na StackOverflowu sam tražio kako podijeliti po vrijednostima stupaca i našao tako sjajan odgovor. Uz AWK možete podijeliti tekstualnu datoteku po vrijednostima stupaca tako što ćete je napisati u skriptu umjesto da šaljete rezultate na stdout.

Napisao sam Bash skriptu da ga isprobam. Preuzeo jedan od upakovanih TSV-ova, a zatim ga raspakovao koristeći gzip i poslato na awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Uspjelo je!

Punjenje jezgara

Šta sam naučio: gnu parallel - to je magična stvar, svako treba da je koristi.

Razdvajanje je bilo dosta sporo i kad sam krenuo htopda provjerim korištenje moćne (i skupe) EC2 instance, pokazalo se da koristim samo jednu jezgru i oko 200 MB memorije. Da riješimo problem i ne izgubimo puno novca, morali smo smisliti kako da paraleliziramo posao. Srećom, u apsolutno neverovatnoj knjizi Nauka o podacima na komandnoj liniji Našao sam poglavlje Jerona Janssensa o paralelizaciji. Iz toga sam saznao gnu parallel, vrlo fleksibilan metod za implementaciju višenitnog rada u Unixu.

Parsing 25TB koristeći AWK i R
Kada sam započeo particioniranje koristeći novi proces, sve je bilo u redu, ali je i dalje postojalo usko grlo - preuzimanje S3 objekata na disk nije bilo vrlo brzo i nije bilo potpuno paralelno. Da popravim ovo, uradio sam ovo:

  1. Saznao sam da je moguće implementirati fazu preuzimanja S3 direktno u pipelineu, potpuno eliminirajući međuskladište na disku. To znači da mogu izbjeći pisanje neobrađenih podataka na disk i koristiti još manju, a time i jeftiniju pohranu na AWS-u.
  2. tim aws configure set default.s3.max_concurrent_requests 50 znatno povećao broj niti koje AWS CLI koristi (podrazumevano ih je 10).
  3. Prebacio sam se na EC2 instancu optimiziranu za brzinu mreže, sa slovom n u imenu. Otkrio sam da je gubitak procesorske snage pri korištenju n-instanci više nego nadoknađen povećanjem brzine učitavanja. Za većinu zadataka koristio sam c5n.4xl.
  4. Promenjeno gzip na pigz, ovo je gzip alat koji može raditi sjajne stvari za paralelizaciju prvobitno neparaleliziranog zadatka dekompresije datoteka (ovo je najmanje pomoglo).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ovi koraci su kombinovani jedni s drugima kako bi sve funkcioniralo vrlo brzo. Povećanjem brzine preuzimanja i eliminacijom upisivanja na disk, sada sam mogao obraditi paket od 5 terabajta za samo nekoliko sati.

Ovaj tvit je trebao spomenuti 'TSV'. Avaj.

Korištenje novo raščlanjenih podataka

Šta sam naučio: Spark voli nekomprimirane podatke i ne voli kombiniranje particija.

Sada su podaci bili u S3 u raspakovanom (čitaj: zajedničkom) i polu-uređenom formatu, i mogao sam se ponovo vratiti na Spark. Čekalo me iznenađenje: opet nisam uspeo da postignem ono što sam želeo! Bilo je vrlo teško reći Sparku kako su podaci podijeljeni. Čak i kada sam ovo uradio, pokazalo se da je bilo previše particija (95 hiljada), a kada sam koristio coalesce smanjio njihov broj na razumne granice, ovo je uništilo moju particiju. Siguran sam da se ovo može popraviti, ali nakon par dana traženja nisam mogao pronaći rješenje. Na kraju sam završio sve zadatke u Spark-u, iako je to potrajalo i moji podijeljeni Parket fajlovi nisu bili baš mali (~200 KB). Međutim, podaci su bili tamo gdje su bili potrebni.

Parsing 25TB koristeći AWK i R
Premalo i neujednačeno, divno!

Testiranje lokalnih Spark upita

Šta sam naučio: Spark ima previše troškova prilikom rješavanja jednostavnih problema.

Preuzimanjem podataka u pametnom formatu, mogao sam testirati brzinu. Postavite R skriptu za pokretanje lokalnog Spark servera, a zatim učitajte Spark okvir podataka iz specificirane memorije grupe Parket (kante). Pokušao sam učitati sve podatke, ali nisam mogao natjerati Sparklyr da prepozna particioniranje.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Izvršenje je trajalo 29,415 sekundi. Mnogo bolje, ali ne previše dobro za masovno testiranje bilo čega. Uz to, nisam mogao ubrzati stvari s keširanjem jer kada sam pokušao keširati okvir podataka u memoriju, Spark se uvijek srušio, čak i kada sam dodijelio više od 50 GB memorije skupu podataka koji je težio manje od 15.

Vratite se na AWK

Šta sam naučio: Asocijativni nizovi u AWK su veoma efikasni.

Shvatio sam da mogu postići veće brzine. Zapamtio sam to na divan način AWK tutorial od Brucea Barnetta Čitao sam o odličnoj funkciji koja se zove “asocijativni nizovi" U suštini, to su parovi ključ/vrijednost, koji su se iz nekog razloga u AWK-u drugačije nazivali, pa sam nekako o njima razmišljao. Roman Cheplyaka podsjetio da je termin “asocijativni nizovi” mnogo stariji od pojma “par ključ-vrijednost”. Čak i ako ti potražite ključ/vrijednost u Google Ngramu, nećete vidjeti ovaj termin tamo, ali ćete pronaći asocijativne nizove! Osim toga, „par ključ-vrijednost“ se najčešće povezuje sa bazama podataka, tako da je mnogo smislenije upoređivati ​​ga sa hashmapom. Shvatio sam da mogu koristiti ove asocijativne nizove da povežem svoje SNP-ove sa bin tablicom i sirovim podacima bez korištenja Spark-a.

Da bih to uradio, u AWK skripti sam koristio blok BEGIN. Ovo je dio koda koji se izvršava prije nego što se prvi red podataka prosljeđuje glavnom tijelu skripte.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

tim while(getline...) učitao sve redove iz CSV grupe (bin), postavi prvu kolonu (SNP ime) kao ključ za asocijativni niz bin a druga vrijednost (grupa) kao vrijednost. Onda u bloku { }, koji se izvršava na svim linijama glavne datoteke, svaki red se šalje u izlaznu datoteku, koja dobiva jedinstveno ime ovisno o svojoj grupi (bin): ..._bin_"bin[$1]"_....

Varijable batch_num и chunk_id poklapao se sa podacima koje je obezbedio cevovod, izbegavajući uslov trke, i svaku nit izvršavanja koja je pokrenuta parallel, zapisao u svoju jedinstvenu datoteku.

Pošto sam sve sirove podatke raspršio u fascikle na hromozomima preostale od mog prethodnog eksperimenta sa AWK-om, sada sam mogao da napišem još jednu Bash skriptu da obrađuje jedan po jedan hromozom i šalje dublje particionisane podatke na S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skripta ima dva dijela parallel.

U prvom odeljku se čitaju podaci iz svih fajlova koji sadrže informacije o željenom hromozomu, a zatim se ti podaci distribuiraju kroz niti, koje distribuiraju datoteke u odgovarajuće grupe (bin). Da bi se izbjegli uvjeti utrke kada više niti upisuje u istu datoteku, AWK prosljeđuje imena datoteka za pisanje podataka na različita mjesta, npr. chr_10_bin_52_batch_2_aa.csv. Kao rezultat toga, na disku se kreira mnogo malih datoteka (za to sam koristio terabajtne EBS volumene).

Transporter iz druge sekcije parallel prolazi kroz grupe (bin) i kombinuje njihove pojedinačne datoteke u zajednički CSV c cata zatim ih šalje u izvoz.

Emitovanje u R?

Šta sam naučio: Možete kontaktirati stdin и stdout iz R skripte, i stoga je koristite u procesu.

Možda ste primijetili ovu liniju u vašoj Bash skripti: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... On prevodi sve povezane grupne datoteke (bin) u R skriptu ispod. {} je posebna tehnika parallel, koji umeće sve podatke koje šalje navedenom toku direktno u samu naredbu. Opcija {#} pruža jedinstveni ID niti, i {%} predstavlja broj slota posla (ponavlja se, ali nikada istovremeno). Spisak svih opcija se može naći u dokumentaciju.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kada je varijabla file("stdin") preneseno na readr::read_csv, podaci prevedeni u R skriptu se učitavaju u okvir, koji se zatim nalazi u obliku .rds-file koristeći aws.s3 napisano direktno u S3.

RDS je nešto poput junior verzije Parketa, bez nepotrebnih prostora za skladištenje zvučnika.

Nakon što sam završio Bash skriptu, dobio sam paket .rds-fajlovi koji se nalaze u S3, što mi je omogućilo da koristim efikasnu kompresiju i ugrađene tipove.

Uprkos upotrebi kočnice R, sve je radilo vrlo brzo. Nije iznenađujuće da su dijelovi R koji čitaju i pišu podatke visoko optimizirani. Nakon testiranja na jednom hromozomu srednje veličine, posao je završen na instanci C5n.4xl za oko dva sata.

S3 Ograničenja

Šta sam naučio: Zahvaljujući implementaciji pametne putanje, S3 može rukovati mnogim datotekama.

Brinuo sam se da li će S3 moći da obradi mnoge fajlove koji su mu prebačeni. Mogao bih učiniti da imena datoteka imaju smisla, ali kako bi ih S3 tražio?

Parsing 25TB koristeći AWK i R
Fascikle u S3 su samo za prikaz, u stvari sistem ne zanima simbol /. Sa stranice S3 FAQ.

Čini se da S3 predstavlja putanju do određene datoteke kao jednostavan ključ u nekoj vrsti hash tablice ili baze podataka zasnovanoj na dokumentu. Bucket se može posmatrati kao tabela, a datoteke se mogu smatrati zapisima u toj tabeli.

Budući da su brzina i efikasnost važni za ostvarivanje profita u Amazonu, ne čudi što je ovaj sistem ključ-kao-fajl-putanja jebeno optimizovan. Pokušao sam da nađem balans: tako da ne moram da pravim mnogo zahteva za dobijanje, već da se zahtevi izvršavaju brzo. Pokazalo se da je najbolje napraviti oko 20 hiljada bin fajlova. Mislim da ako nastavimo s optimizacijom, možemo postići povećanje brzine (na primjer, pravljenje posebne kante samo za podatke, čime se smanjuje veličina tabele pretraživanja). Ali nije bilo vremena ni novca za dalje eksperimente.

Šta je sa unakrsnom kompatibilnošću?

Ono što sam naučio: uzrok broj jedan izgubljenog vremena je prerano optimiziranje načina skladištenja.

U ovom trenutku, veoma je važno da se zapitate: „Zašto koristiti vlasnički format datoteke?“ Razlog leži u brzini učitavanja (gzipiranim CSV datotekama je trebalo 7 puta duže da se učitaju) i kompatibilnosti s našim tokovima rada. Možda ću ponovo razmisliti može li R lako učitati datoteke Parketa (ili Arrow) bez učitavanja Spark. Svi u našoj laboratoriji koriste R, i ako trebam pretvoriti podatke u drugi format, još uvijek imam originalne tekstualne podatke, tako da mogu jednostavno ponovo pokrenuti cevovod.

Podjela posla

Šta sam naučio: Ne pokušavajte ručno optimizirati poslove, pustite računar da to uradi.

Otklonio sam greške u toku rada na jednom hromozomu, sada moram obraditi sve ostale podatke.
Želio sam pokrenuti nekoliko EC2 instanci za konverziju, ali sam se u isto vrijeme bojao da ću dobiti vrlo neuravnoteženo opterećenje na različitim poslovima obrade (baš kao što je Spark patio od neuravnoteženih particija). Pored toga, nisam bio zainteresovan za podizanje jedne instance po hromozomu, jer za AWS naloge postoji podrazumevano ograničenje od 10 instanci.

Tada sam odlučio da napišem skriptu u R-u da optimizujem poslove obrade.

Prvo sam zamolio S3 da izračuna koliko prostora za skladištenje zauzima svaki hromozom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Zatim sam napisao funkciju koja uzima ukupnu veličinu, miješa redoslijed kromosoma, dijeli ih u grupe num_jobs i govori vam koliko su različite veličine svih poslova obrade.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Onda sam prošao kroz hiljadu mešanja koristeći predenje i izabrao najbolje.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Tako sam završio sa skupom zadataka koji su bili vrlo slični po veličini. Onda je sve što je preostalo bilo da umotam svoju prethodnu Bash skriptu u veliku petlju for. Pisanje ove optimizacije trajalo je oko 10 minuta. A ovo je mnogo manje nego što bih potrošio na ručno kreiranje zadataka da su neuravnoteženi. Stoga mislim da sam bio u pravu sa ovom preliminarnom optimizacijom.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na kraju dodajem naredbu za isključivanje:

sudo shutdown -h now

... i sve je ispalo! Koristeći AWS CLI, pokrenuo sam instance koristeći opciju user_data dao im je Bash skripte njihovih zadataka za obradu. Pokrenuli su se i automatski isključili, tako da nisam plaćao dodatnu procesorsku snagu.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Spakujmo se!

Šta sam naučio: API bi trebao biti jednostavan radi lakšeg i fleksibilnog korištenja.

Konačno sam dobio podatke na pravom mjestu iu pravom obliku. Ostalo je samo da pojednostavim proces korištenja podataka što je više moguće kako bih olakšao svojim kolegama. Htio sam napraviti jednostavan API za kreiranje zahtjeva. Ako u budućnosti odlučim da se prebacim .rds na parketne turpije, onda bi ovo trebao biti problem za mene, a ne za moje kolege. Za ovo sam odlučio napraviti interni R paket.

Izgradite i dokumentirajte vrlo jednostavan paket koji sadrži samo nekoliko funkcija pristupa podacima organiziranih oko funkcije get_snp. Napravio sam i web stranicu za svoje kolege pkgdown, tako da mogu lako vidjeti primjere i dokumentaciju.

Parsing 25TB koristeći AWK i R

Pametno keširanje

Šta sam naučio: Ako su vaši podaci dobro pripremljeni, keširanje će biti lako!

Pošto je jedan od glavnih tokova posla primenio isti model analize na SNP paket, odlučio sam da koristim binning u svoju korist. Prilikom prijenosa podataka preko SNP-a, sve informacije iz grupe (bin) su pridružene vraćenom objektu. To jest, stari upiti mogu (u teoriji) ubrzati obradu novih upita.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Prilikom izrade paketa, izvršio sam mnogo mjerila kako bih uporedio brzinu kada sam koristio različite metode. Preporučujem da ovo ne zanemarite, jer su ponekad rezultati neočekivani. Na primjer, dplyr::filter bio je mnogo brži od hvatanja redova korištenjem filtriranja zasnovanog na indeksiranju, a dohvaćanje jedne kolone iz filtriranog okvira podataka bilo je mnogo brže nego korištenjem sintakse indeksiranja.

Napominjemo da je objekt prev_snp_results sadrži ključ snps_in_bin. Ovo je niz svih jedinstvenih SNP-ova u grupi (bin), koji vam omogućava da brzo provjerite imate li već podatke iz prethodnog upita. Takođe olakšava petlju kroz sve SNP-ove u grupi (bin) sa ovim kodom:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Rezulʹtaty

Sada možemo (i počeli smo ozbiljno) da pokrećemo modele i scenarije koji su nam ranije bili nedostupni. Najbolje je to što moje kolege iz laboratorije ne moraju razmišljati o bilo kakvim komplikacijama. Oni jednostavno imaju funkciju koja radi.

I iako ih paket poštedi detalja, pokušao sam da format podataka učinim dovoljno jednostavnim da bi ga mogli shvatiti ako sutra iznenada nestanem...

Brzina je primjetno porasla. Obično skeniramo funkcionalno značajne fragmente genoma. Ranije to nismo mogli (ispostavilo se da je preskupo), ali sada, zahvaljujući strukturi grupe (bin) i keširanju, zahtjev za jedan SNP u prosjeku traje manje od 0,1 sekunde, a potrošnja podataka je toliko nisko da su troškovi za S3 kikiriki.

zaključak

Ovaj članak uopće nije vodič. Rješenje se pokazalo individualnim, a gotovo sigurno ne optimalnim. Tačnije, to je putopis. Želim da drugi shvate da takve odluke ne izgledaju potpuno formirane u glavi, već su rezultat pokušaja i grešaka. Takođe, ako tražite naučnika za podatke, imajte na umu da efikasno korišćenje ovih alata zahteva iskustvo, a iskustvo košta. Sretan sam što sam imao sredstva da platim, ali mnogi drugi koji mogu raditi isti posao bolje od mene nikada neće imati priliku zbog nedostatka novca ni pokušati.

Alati za velike podatke su raznovrsni. Ako imate vremena, gotovo sigurno možete napisati brže rješenje koristeći tehnike pametnog čišćenja, skladištenja i ekstrakcije podataka. Na kraju, sve se svodi na analizu troškova i koristi.

Šta sam naučio:

  • ne postoji jeftin način da se analizira 25 TB odjednom;
  • budite oprezni s veličinom vaših Parket fajlova i njihovom organizacijom;
  • Particije u Spark-u moraju biti izbalansirane;
  • Generalno, nikada ne pokušavajte da napravite 2,5 miliona particija;
  • Sortiranje je i dalje teško, kao i postavljanje Spark-a;
  • ponekad posebni podaci zahtijevaju posebna rješenja;
  • Spark agregacija je brza, ali je particioniranje i dalje skupo;
  • ne spavaj kad te uče osnovama, neko ti je vjerovatno već 1980-ih riješio problem;
  • gnu parallel - ovo je magična stvar, svako treba da je koristi;
  • Spark voli nekomprimirane podatke i ne voli kombiniranje particija;
  • Spark ima previše troškova prilikom rješavanja jednostavnih problema;
  • AWK-ovi asocijativni nizovi su veoma efikasni;
  • možete kontaktirati stdin и stdout iz R skripte, i stoga je koristite u procesu;
  • Zahvaljujući implementaciji pametne putanje, S3 može obraditi mnogo datoteka;
  • Glavni razlog gubljenja vremena je prerano optimizovanje vašeg načina skladištenja;
  • ne pokušavajte da optimizujete zadatke ručno, pustite računar da to uradi;
  • API bi trebao biti jednostavan radi lakšeg i fleksibilnog korištenja;
  • Ako su vaši podaci dobro pripremljeni, keširanje će biti lako!

izvor: www.habr.com

Dodajte komentar