Raščlanjivanje 25TB pomoću AWK i R

Raščlanjivanje 25TB pomoću AWK i R
Kako čitati ovaj članak: Ispričavam se što je tekst tako dug i kaotičan. Kako bih vam uštedio vrijeme, svako poglavlje počinjem uvodom "Što sam naučio", koji sažima bit poglavlja u jednoj ili dvije rečenice.

"Samo mi pokaži rješenje!" Ako samo želite vidjeti odakle sam došao, prijeđite na poglavlje "Postati inventivniji", ali mislim da je zanimljivije i korisnije čitati o neuspjehu.

Nedavno sam dobio zadatak da postavim proces za obradu velike količine neobrađenih DNK sekvenci (tehnički SNP čip). Trebalo je brzo dobiti podatke o određenoj genetskoj lokaciji (zvanoj SNP) za kasnije modeliranje i druge zadatke. Koristeći R i AWK, uspio sam očistiti i organizirati podatke na prirodan način, uvelike ubrzavajući obradu upita. To mi nije bilo lako i zahtijevalo je brojne iteracije. Ovaj članak će vam pomoći da izbjegnete neke od mojih pogrešaka i pokazati vam što sam završio.

Prvo neka uvodna objašnjenja.

Podaci

Naš sveučilišni centar za obradu genetskih informacija dao nam je podatke u obliku TSV-a od 25 TB. Dobio sam ih podijeljene u 5 Gzip-komprimiranih paketa, od kojih je svaki sadržavao oko 240 datoteka od četiri gigabajta. Svaki red je sadržavao podatke za jedan SNP od jedne osobe. Ukupno su preneseni podaci o ~2,5 milijuna SNP-ova i ~60 tisuća ljudi. Osim informacija o SNP-u, datoteke su sadržavale brojne stupce s brojevima koji su odražavali različite karakteristike, poput intenziteta čitanja, učestalosti različitih alela itd. Ukupno je bilo oko 30 stupaca s jedinstvenim vrijednostima.

Cilj

Kao i kod svakog projekta upravljanja podacima, najvažnije je bilo odrediti kako će se podaci koristiti. U ovom slučaju uglavnom ćemo odabrati modele i tijekove rada za SNP temeljene na SNP-u. Odnosno, trebat će nam podaci samo o jednom SNP-u. Morao sam naučiti kako dohvatiti sve zapise povezane s jednim od 2,5 milijuna SNP-ova što je moguće lakše, brže i jeftinije.

Kako to ne učiniti

Da citiram prikladan klišej:

Nisam pogriješio tisuću puta, samo sam otkrio tisuću načina da izbjegnem raščlanjivanje hrpe podataka u formatu pogodnom za upite.

Prvi pokušaj

Što sam naučio: Ne postoji jeftin način za analizu 25 TB odjednom.

Nakon što sam pohađao tečaj “Napredne metode za obradu velikih podataka” na Sveučilištu Vanderbilt, bio sam siguran da je trik u torbi. Vjerojatno će trebati sat ili dva da se Hive poslužitelj postavi za prolazak kroz sve podatke i izvješće o rezultatu. Budući da su naši podaci pohranjeni u AWS S3, koristio sam uslugu Atina, koji vam omogućuje primjenu Hive SQL upita na S3 podatke. Ne morate postaviti/podići Hive klaster, a također plaćate samo podatke koje tražite.

Nakon što sam Atheni pokazao svoje podatke i njihov format, pokrenuo sam neke testove s ovakvim upitima:

select * from intensityData limit 10;

I brzo dobio dobro strukturirane rezultate. Spreman.

Dok nismo pokušali koristiti podatke u našem radu...

Zamoljen sam da izvučem sve podatke o SNP-u kako bih testirao model. Pokrenuo sam upit:


select * from intensityData 
where snp = 'rs123456';

...i počeo čekati. Nakon osam minuta i više od 4 TB traženih podataka, dobio sam rezultat. Athena naplaćuje prema količini pronađenih podataka, 5 dolara po terabajtu. Tako je ovaj zahtjev koštao 20 dolara i osam minuta čekanja. Da bismo pokrenuli model na svim podacima, morali smo čekati 38 godina i platiti 50 milijuna dolara. Očito nam to nije odgovaralo.

Bilo je potrebno koristiti parket...

Što sam naučio: Budite oprezni s veličinom svojih turpija za parket i njihovom organizacijom.

Prvo sam pokušao popraviti situaciju pretvaranjem svih TSV-ova u Turpije za parket. Pogodni su za rad s velikim skupovima podataka jer su informacije u njima pohranjene u obliku stupaca: svaki stupac leži u vlastitom segmentu memorije/diska, za razliku od tekstualnih datoteka u kojima redovi sadrže elemente svakog stupca. A ako trebate nešto pronaći, samo pročitajte traženi stupac. Osim toga, svaka datoteka pohranjuje raspon vrijednosti u stupac, pa ako vrijednost koju tražite nije u rasponu stupca, Spark neće gubiti vrijeme skenirajući cijelu datoteku.

Izvršio sam jednostavan zadatak AWS ljepilo pretvoriti naše TSV-ove u Parquet i ispustiti nove datoteke u Athenu. Trajalo je oko 5 sati. Ali kad sam pokrenuo zahtjev, trebalo je otprilike isto toliko vremena i nešto manje novca da se izvrši. Činjenica je da je Spark, pokušavajući optimizirati zadatak, jednostavno raspakirao jedan TSV chunk i stavio ga u vlastiti Parquet chunk. A budući da je svaki dio bio dovoljno velik da sadrži cijele zapise mnogih ljudi, svaka je datoteka sadržavala sve SNP-ove, pa je Spark morao otvoriti sve datoteke kako bi izvukao potrebne informacije.

Zanimljivo, Parquet-ov zadani (i preporučeni) tip kompresije, snappy, nije djeljiv. Stoga je svaki izvršitelj zapeo na zadatku raspakiranja i preuzimanja cijelog skupa podataka od 3,5 GB.

Raščlanjivanje 25TB pomoću AWK i R

Shvatimo problem

Što sam naučio: Razvrstavanje je teško, pogotovo ako su podaci distribuirani.

Činilo mi se da sam sada shvatio bit problema. Trebao sam sortirati podatke samo prema SNP stupcu, a ne prema osobama. Zatim će nekoliko SNP-ova biti pohranjeno u zasebnom bloku podataka, a zatim će se Parquetova "pametna" funkcija "otvoriti samo ako je vrijednost u rasponu" pokazati u punom sjaju. Nažalost, razvrstavanje kroz milijarde redaka razasutih po klasteru pokazalo se teškim zadatkom.

AWS definitivno ne želi izdati povrat novca zbog razloga "Ja sam rastreseni student". Nakon što sam pokrenuo sortiranje na Amazon Glue, radio je 2 dana i srušio se.

Što je s particioniranjem?

Što sam naučio: Particije u Sparku moraju biti uravnotežene.

Tada sam došao na ideju o particioniranju podataka u kromosomima. Ima ih 23 (i još nekoliko ako uzmete u obzir mitohondrijsku DNK i nemapirane regije).
To će vam omogućiti da podijelite podatke u manje dijelove. Ako dodate samo jedan redak funkciji izvoza Spark u Glue skripti partition_by = "chr", tada podatke treba podijeliti u segmente.

Raščlanjivanje 25TB pomoću AWK i R
Genom se sastoji od brojnih fragmenata koji se nazivaju kromosomi.

Nažalost, nije uspjelo. Kromosomi imaju različite veličine, što znači različite količine informacija. To znači da zadaci koje je Spark slao radnicima nisu bili uravnoteženi i sporo dovršavani jer su neki čvorovi završili ranije i bili u stanju mirovanja. Međutim, zadaće su izvršene. Ali kada se tražio jedan SNP, neravnoteža je ponovno uzrokovala probleme. Trošak obrade SNP-ova na većim kromosomima (to jest, gdje želimo dobiti podatke) smanjio se samo za faktor 10. Puno, ali nedovoljno.

Što ako ga podijelimo na još manje dijelove?

Što sam naučio: Nikad ne pokušavajte napraviti 2,5 milijuna particija.

Odlučio sam dati sve od sebe i podijelio sam svaki SNP. Time je osigurano da su pregrade jednake veličine. BILA JE TO LOŠA IDEJA. Koristio sam ljepilo i dodao nevinu liniju partition_by = 'snp'. Zadatak je krenuo i počeo se izvršavati. Dan kasnije provjerio sam i vidio da još uvijek ništa nije upisano na S3, pa sam ukinuo zadatak. Čini se da je Glue zapisivao posredne datoteke na skrivenu lokaciju u S3, puno datoteka, možda nekoliko milijuna. Kao rezultat toga, moja je pogreška koštala više od tisuću dolara i nije zadovoljila mog mentora.

Particioniranje + sortiranje

Što sam naučio: Razvrstavanje je još uvijek teško, kao i ugađanje Sparka.

Moj posljednji pokušaj particioniranja uključivao je razdvajanje kromosoma i zatim sortiranje svake particije. U teoriji, ovo bi ubrzalo svaki upit jer su željeni SNP podaci morali biti unutar nekoliko Parquet dijelova unutar zadanog raspona. Nažalost, razvrstavanje čak i particioniranih podataka pokazalo se teškim zadatkom. Kao rezultat toga, prebacio sam se na EMR za prilagođeni klaster i upotrijebio osam moćnih instanci (C5.4xl) i Sparklyr za stvaranje fleksibilnijeg tijeka rada...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...međutim, zadatak još uvijek nije izvršen. Konfigurirao sam ga na različite načine: povećao dodjelu memorije za svakog izvršitelja upita, koristio čvorove s velikom količinom memorije, koristio varijable emitiranja (broadcasting varijable), ali svaki put se pokazalo da su to bile polumjere i postupno su izvršitelji počeli propadati dok sve ne stane.

Postajem kreativniji

Što sam naučio: Ponekad posebni podaci zahtijevaju posebna rješenja.

Svaki SNP ima vrijednost pozicije. To je broj koji odgovara broju baza duž njegovog kromosoma. Ovo je lijep i prirodan način organiziranja naših podataka. Isprva sam želio podijeliti po regijama svakog kromosoma. Na primjer, pozicije 1 - 2000, 2001 - 4000 itd. Ali problem je u tome što SNP-ovi nisu ravnomjerno raspoređeni po kromosomima, pa će stoga veličine skupina uvelike varirati.

Raščlanjivanje 25TB pomoću AWK i R

Kao rezultat, došao sam do raščlambe pozicija u kategorije (rank). Koristeći već preuzete podatke, pokrenuo sam zahtjev za dobivanje popisa jedinstvenih SNP-ova, njihovih položaja i kromosoma. Zatim sam razvrstao podatke unutar svakog kromosoma i prikupio SNP-ove u skupine (bin) zadane veličine. Recimo po 1000 SNP-ova. Ovo mi je dalo odnos SNP-skupina-po-kromosomu.

Na kraju sam napravio grupe (bin) od 75 SNP-ova, razlog će biti objašnjen u nastavku.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Prvo pokušajte sa Sparkom

Što sam naučio: Spark agregacija je brza, ali je particioniranje još uvijek skupo.

Htio sam pročitati ovaj mali (2,5 milijuna redaka) podatkovni okvir u Spark, kombinirati ga s neobrađenim podacima, a zatim ga podijeliti prema novododanom stupcu bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Koristio sam sdf_broadcast(), tako da Spark zna da bi trebao poslati okvir podataka svim čvorovima. Ovo je korisno ako su podaci male veličine i potrebni za sve zadatke. Inače, Spark pokušava biti pametan i distribuira podatke prema potrebi, što može uzrokovati usporavanje.

I opet, moja ideja nije uspjela: zadaci su neko vrijeme radili, dovršavali uniju, a onda su, poput izvršitelja pokrenutih particioniranjem, počeli propadati.

Dodavanje AWK

Što sam naučio: Ne spavaj kad te uče osnove. Sigurno je netko već riješio vaš problem davnih 1980-ih.

Do ove točke, razlog svih mojih neuspjeha sa Sparkom bila je zbrka podataka u klasteru. Možda se situacija može popraviti prethodnim tretmanom. Odlučio sam pokušati podijeliti neobrađene tekstualne podatke u stupce kromosoma, pa sam se nadao da ću Sparku pružiti "unaprijed particionirane" podatke.

Tražio sam na StackOverflowu kako podijeliti prema vrijednostima stupaca i pronašao tako sjajan odgovor. S AWK-om možete podijeliti tekstualnu datoteku prema vrijednostima stupca tako da je zapišete u skriptu umjesto da rezultate šaljete stdout.

Napisao sam Bash skriptu da je isprobam. Preuzeo sam jedan od zapakiranih TSV-ova, a zatim ga raspakirao koristeći gzip i poslao u awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Upalilo je!

Punjenje jezgri

Što sam naučio: gnu parallel - to je čarobna stvar, svatko bi je trebao koristiti.

Razdvajanje je bilo dosta sporo i kad sam počela htopda provjerim korištenje moćne (i skupe) EC2 instance pokazalo se da koristim samo jednu jezgru i oko 200 MB memorije. Kako bismo riješili problem i ne izgubili puno novca, morali smo smisliti kako paralelizirati posao. Srećom, u apsolutno nevjerojatnoj knjizi Znanost o podacima u naredbenom retku Pronašao sam poglavlje Jerona Janssensa o paralelizaciji. Iz njega sam saznao za gnu parallel, vrlo fleksibilna metoda za implementaciju višenitnosti u Unixu.

Raščlanjivanje 25TB pomoću AWK i R
Kad sam pokrenuo particioniranje koristeći novi proces, sve je bilo u redu, ali je i dalje postojalo usko grlo - preuzimanje S3 objekata na disk nije bilo jako brzo i nije bilo potpuno paralelizirano. Da bih to popravio, napravio sam sljedeće:

  1. Saznao sam da je moguće implementirati stupanj preuzimanja S3 izravno u cjevovodu, potpuno eliminirajući međupohranu na disku. To znači da mogu izbjeći pisanje neobrađenih podataka na disk i koristiti čak i manju, a time i jeftiniju pohranu na AWS-u.
  2. tim aws configure set default.s3.max_concurrent_requests 50 uvelike povećao broj niti koje AWS CLI koristi (prema zadanim postavkama ima ih 10).
  3. Prešao sam na EC2 instancu optimiziranu za brzinu mreže, sa slovom n u nazivu. Otkrio sam da je gubitak procesorske snage pri korištenju n-instanci više nego nadoknađen povećanjem brzine učitavanja. Za većinu zadataka koristio sam c5n.4xl.
  4. Promijenjeno gzip na pigz, ovo je gzip alat koji može činiti super stvari za paraleliziranje inicijalno neparaleliziranog zadatka dekomprimiranja datoteka (ovo je najmanje pomoglo).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ovi se koraci međusobno kombiniraju kako bi sve funkcioniralo vrlo brzo. Povećanjem brzine preuzimanja i eliminacijom pisanja na disk, sada mogu obraditi paket od 5 terabajta u samo nekoliko sati.

Ovaj tweet je trebao spominjati 'TSV'. Jao.

Korištenje novoraščlanjenih podataka

Što sam naučio: Spark voli nekomprimirane podatke i ne voli kombiniranje particija.

Sada su podaci bili u S3 u neraspakiranom (čitaj: podijeljenom) i polusređenom formatu, i mogao sam se ponovno vratiti u Spark. Čekalo me iznenađenje: opet nisam uspio postići ono što sam želio! Bilo je vrlo teško Sparku točno reći kako su podaci podijeljeni. A i kad sam ovo napravio, pokazalo se da ima previše particija (95 tisuća), a kad sam koristio coalesce smanjio njihov broj na razumne granice, to je uništilo moju particiju. Siguran sam da se to može popraviti, ali nakon nekoliko dana traženja nisam uspio pronaći rješenje. Na kraju sam završio sve zadatke u Sparku, iako je trebalo neko vrijeme i moje podijeljene Parquet datoteke nisu bile baš male (~200 KB). Međutim, podataka je bilo tamo gdje je trebalo.

Raščlanjivanje 25TB pomoću AWK i R
Premalo i neravnomjerno, divno!

Testiranje lokalnih Spark upita

Što sam naučio: Spark ima previše troškova kada rješava jednostavne probleme.

Preuzimanjem podataka u pametnom formatu, mogao sam testirati brzinu. Postavite R skriptu za pokretanje lokalnog Spark poslužitelja, a zatim učitajte Spark podatkovni okvir iz navedene grupe za pohranu Parquet (bin). Pokušao sam učitati sve podatke, ali nisam uspio natjerati Sparklyr da prepozna particioniranje.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Izvođenje je trajalo 29,415 sekundi. Puno bolje, ali ne previše dobro za masovno testiranje bilo čega. Osim toga, nisam mogao ubrzati stvari s predmemoriranjem jer kad sam pokušao predmemorirati podatkovni okvir u memoriji, Spark se uvijek srušio, čak i kad sam dodijelio više od 50 GB memorije skupu podataka koji je težio manje od 15.

Povratak na AWK

Što sam naučio: Asocijativni nizovi u AWK-u vrlo su učinkoviti.

Shvatio sam da mogu postići veće brzine. Sjetio sam se toga u divnom AWK vodič Brucea Barnetta Čitao sam o super značajci koja se zove "asocijativni nizovi" U suštini, radi se o parovima ključ-vrijednost, koji su se iz nekog razloga u AWK-u drugačije zvali, pa nekako nisam puno razmišljao o njima. Roman Čepljaka podsjetio da je pojam "asocijativni nizovi" puno stariji od pojma "par ključ-vrijednost". Čak i ako ti potražite ključ-vrijednost u Google Ngramu, tamo nećete vidjeti ovaj izraz, ali ćete pronaći asocijativne nizove! Osim toga, “par ključ-vrijednost” najčešće se povezuje s bazama podataka, pa ga ima mnogo smisla usporediti s hashmapom. Shvatio sam da mogu upotrijebiti te asocijativne nizove za povezivanje svojih SNP-ova s ​​bin tablicom i neobrađenim podacima bez korištenja Spark-a.

Da bih to učinio, u AWK skripti koristio sam blok BEGIN. Ovo je dio koda koji se izvršava prije nego što se prvi redak podataka proslijedi glavnom tijelu skripte.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Momčad while(getline...) učitao sve retke iz CSV grupe (bin), postavio prvi stupac (SNP naziv) kao ključ za asocijativni niz bin a druga vrijednost (grupa) kao vrijednost. Zatim u bloku { }, koji se izvršava na svim linijama glavne datoteke, svaka linija se šalje u izlaznu datoteku, koja dobiva jedinstveno ime ovisno o svojoj grupi (bin): ..._bin_"bin[$1]"_....

varijable batch_num и chunk_id odgovarao podacima koje pruža cjevovod, izbjegavajući stanje utrke i pokretanje svake izvršne niti parallel, napisao je u vlastitu jedinstvenu datoteku.

Budući da sam raspršio sve neobrađene podatke u mape na kromosomima preostalim od mog prethodnog eksperimenta s AWK-om, sada bih mogao napisati drugu Bash skriptu za obradu jednog po jednog kromosoma i slanje dublje particioniranih podataka na S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scenarij ima dva dijela parallel.

U prvom odjeljku podaci se čitaju iz svih datoteka koje sadrže podatke o željenom kromosomu, zatim se ti podaci distribuiraju po nitima koje raspoređuju datoteke u odgovarajuće grupe (bin). Kako bi se izbjegli uvjeti utrke kada više niti piše u istu datoteku, AWK prosljeđuje nazive datoteka za pisanje podataka na različita mjesta, npr. chr_10_bin_52_batch_2_aa.csv. Kao rezultat toga, na disku se stvara mnogo malih datoteka (za ovo sam koristio terabajtne EBS jedinice).

Transporter iz druge sekcije parallel prolazi kroz grupe (bin) i kombinira njihove pojedinačne datoteke u zajednički CSV c cata zatim ih šalje u izvoz.

Emitiranje u R?

Što sam naučio: Možete kontaktirati stdin и stdout iz R skripte i stoga ga koristite u cjevovodu.

Možda ste primijetili ovaj redak u svojoj Bash skripti: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Prevodi sve spojene grupne datoteke (bin) u R skriptu u nastavku. {} je posebna tehnika parallel, koji umeće sve podatke koje šalje navedenom toku izravno u samu naredbu. Opcija {#} pruža jedinstveni ID niti, i {%} predstavlja broj utora za posao (ponovljeno, ali nikad istovremeno). Popis svih opcija možete pronaći u dokumentacija.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kada varijabla file("stdin") prenosi na readr::read_csv, podaci prevedeni u R skriptu učitavaju se u okvir, koji je zatim u obliku .rds- koristeći datoteku aws.s3 napisano izravno na S3.

RDS je nešto poput mlađe verzije Parquet-a, bez dodataka skladištenja zvučnika.

Nakon završetka Bash skripte dobio sam paket .rds-datoteke koje se nalaze u S3, što mi je omogućilo korištenje učinkovite kompresije i ugrađenih vrsta.

Unatoč korištenju kočnice R, sve je radilo vrlo brzo. Nije iznenađujuće da su dijelovi R-a koji čitaju i pišu podatke visoko optimizirani. Nakon testiranja na jednom kromosomu srednje veličine, posao je završen na instanci C5n.4xl za otprilike dva sata.

S3 Ograničenja

Što sam naučio: Zahvaljujući pametnoj implementaciji putanje, S3 može rukovati mnogim datotekama.

Brinuo sam se hoće li S3 moći obraditi mnoge datoteke koje su mu prenesene. Mogao bih učiniti da imena datoteka imaju smisla, ali kako bi ih S3 tražio?

Raščlanjivanje 25TB pomoću AWK i R
Mape u S3 samo su za pokazivanje, zapravo sustav ne zanima simbol /. Sa stranice S3 FAQ.

Čini se da S3 predstavlja put do određene datoteke kao jednostavan ključ u nekoj vrsti hash tablice ili baze podataka temeljene na dokumentu. Spremnik se može smatrati tablicom, a datoteke se mogu smatrati zapisima u toj tablici.

Budući da su brzina i učinkovitost važni za ostvarivanje profita u Amazonu, ne čudi da je ovaj sustav ključa kao putanje do datoteke jebeno optimiziran. Pokušao sam pronaći balans: da ne moram postavljati puno get zahtjeva, već da se zahtjevi brzo izvršavaju. Pokazalo se da je najbolje napraviti oko 20 tisuća bin datoteka. Mislim da ako nastavimo s optimizacijom, možemo postići povećanje brzine (na primjer, napraviti posebnu kantu samo za podatke, čime ćemo smanjiti veličinu tablice za pretraživanje). Ali nije bilo vremena ni novca za daljnje eksperimente.

Što je s unakrsnom kompatibilnošću?

Što sam naučio: Uzalud izgubljeno vrijeme broj jedan je prerano optimiziranje metode pohrane.

U ovom trenutku vrlo je važno zapitati se: "Zašto koristiti vlasnički format datoteke?" Razlog leži u brzini učitavanja (gzipanim CSV datotekama trebalo je 7 puta dulje za učitavanje) i kompatibilnosti s našim tijekovima rada. Možda ću ponovno razmotriti može li R lako učitati datoteke Parquet (ili Arrow) bez učitavanja Spark. Svi u našem laboratoriju koriste R, a ako trebam pretvoriti podatke u drugi format, još uvijek imam izvorne tekstualne podatke, tako da mogu ponovno pokrenuti cjevovod.

Podjela posla

Što sam naučio: Ne pokušavajte ručno optimizirati poslove, pustite da to učini računalo.

Otklonio sam pogreške u tijeku rada na jednom kromosomu, sada moram obraditi sve ostale podatke.
Htio sam podignuti nekoliko EC2 instanci za pretvorbu, ali u isto sam se vrijeme bojao dobiti vrlo neuravnoteženo opterećenje u različitim poslovima obrade (baš kao što je Spark patio od neuravnoteženih particija). Osim toga, nisam bio zainteresiran za podizanje jedne instance po kromosomu, jer za AWS račune postoji zadano ograničenje od 10 instanci.

Zatim sam odlučio napisati skriptu u R-u kako bih optimizirao poslove obrade.

Prvo sam zamolio S3 da izračuna koliko prostora za pohranu zauzima svaki kromosom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Zatim sam napisao funkciju koja uzima ukupnu veličinu, miješa redoslijed kromosoma, dijeli ih u grupe num_jobs i govori koliko su različite veličine svih poslova obrade.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Zatim sam pregledao tisuću namještanja koristeći purrr i izabrao najbolje.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Tako sam završio sa skupom zadataka koji su bili vrlo slične veličine. Onda je sve što je preostalo bilo zamotati moju prethodnu Bash skriptu u veliku petlju for. Za pisanje ove optimizacije bilo je potrebno oko 10 minuta. A to je mnogo manje nego što bih potrošio na ručno kreiranje zadataka da su neuravnoteženi. Stoga mislim da sam bio u pravu s ovom preliminarnom optimizacijom.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na kraju dodajem naredbu za isključivanje:

sudo shutdown -h now

... i sve je uspjelo! Koristeći AWS CLI, pokrenuo sam instance pomoću opcije user_data dao im Bash skripte njihovih zadataka na obradu. Automatski su radili i gasili se, tako da nisam plaćao dodatnu procesorsku snagu.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pakirajmo se!

Što sam naučio: API bi trebao biti jednostavan radi jednostavnosti i fleksibilnosti korištenja.

Konačno sam dobio podatke na pravom mjestu iu pravom obliku. Ostalo je još samo maksimalno pojednostaviti proces korištenja podataka kako bi olakšao svojim kolegama. Htio sam napraviti jednostavan API za kreiranje zahtjeva. Ako u budućnosti odlučim prijeći s .rds na turpije za parket, onda bi to meni trebao biti problem, a ne mojim kolegama. Za ovo sam odlučio napraviti interni R paket.

Izradite i dokumentirajte vrlo jednostavan paket koji sadrži samo nekoliko funkcija pristupa podacima organiziranih oko funkcije get_snp. Napravio sam i web stranicu za svoje kolege pkgdown, tako da mogu lako vidjeti primjere i dokumentaciju.

Raščlanjivanje 25TB pomoću AWK i R

Pametno predmemoriranje

Što sam naučio: Ako su vaši podaci dobro pripremljeni, predmemoriranje će biti jednostavno!

Budući da je jedan od glavnih tokova rada primijenio isti model analize na SNP paket, odlučio sam upotrijebiti grupiranje u svoju korist. Prilikom prijenosa podataka putem SNP-a, sve informacije iz grupe (bin) pridružene su vraćenom objektu. To jest, stari upiti mogu (u teoriji) ubrzati obradu novih upita.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Prilikom izrade paketa, pokrenuo sam mnogo mjerila kako bih usporedio brzinu pri korištenju različitih metoda. Preporučam da to ne zanemarite, jer ponekad su rezultati neočekivani. Na primjer, dplyr::filter bilo je puno brže od hvatanja redaka pomoću filtriranja temeljenog na indeksiranju, a dohvaćanje jednog stupca iz okvira filtriranih podataka bilo je puno brže od korištenja sintakse indeksiranja.

Napominjemo da objekt prev_snp_results sadrži ključ snps_in_bin. Ovo je niz svih jedinstvenih SNP-ova u grupi (bin), koji vam omogućuje da brzo provjerite imate li već podatke iz prethodnog upita. Također olakšava prolazak kroz sve SNP-ove u grupi (bin) s ovim kodom:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Nalazi

Sada možemo (i počeli smo ozbiljno) pokretati modele i scenarije koji su nam prije bili nedostupni. Najbolje je što moji kolege iz laboratorija ne moraju razmišljati ni o kakvim komplikacijama. Oni samo imaju funkciju koja radi.

I premda ih paket pošteđuje detalja, pokušao sam načiniti format podataka dovoljno jednostavnim da mogu shvatiti ako sutra iznenada nestanem...

Brzina se osjetno povećala. Obično skeniramo funkcionalno značajne fragmente genoma. Ranije to nismo mogli (ispalo je preskupo), ali sada, zahvaljujući grupi (bin) strukturi i predmemoriranju, zahtjev za jedan SNP u prosjeku traje manje od 0,1 sekunde, a potrošnja podataka je toliko nisko da su troškovi za S3 mali.

Zaključak

Ovaj članak uopće nije vodič. Rješenje se pokazalo individualnim, a gotovo sigurno ne i optimalnim. Dapače, to je putopis. Želim da drugi shvate da se takve odluke ne čine potpuno oblikovane u glavi, one su rezultat pokušaja i pogrešaka. Također, ako tražite podatkovnog znanstvenika, imajte na umu da učinkovito korištenje ovih alata zahtijeva iskustvo, a iskustvo košta. Sretan sam što sam imao sredstava za platiti, ali mnogi drugi koji mogu raditi isti posao bolje od mene nikada neće imati priliku zbog nedostatka novca ni pokušati.

Big data alati su svestrani. Ako imate vremena, gotovo sigurno možete napisati brže rješenje koristeći pametne tehnike čišćenja, pohrane i ekstrakcije podataka. U konačnici se svodi na analizu troškova i koristi.

Što sam naučio:

  • ne postoji jeftin način za analizu 25 TB odjednom;
  • Budite oprezni s veličinom svojih turpija za parket i njihovom organizacijom;
  • Particije u Sparku moraju biti uravnotežene;
  • Općenito, nikada ne pokušavajte napraviti 2,5 milijuna particija;
  • Razvrstavanje je još uvijek teško, kao i postavljanje Spark-a;
  • ponekad posebni podaci zahtijevaju posebna rješenja;
  • Spark agregacija je brza, ali je particioniranje još uvijek skupo;
  • ne spavaj kad te uče osnovama, vjerojatno ti je netko već riješio problem davnih osamdesetih;
  • gnu parallel - ovo je čarobna stvar, svatko bi je trebao koristiti;
  • Spark voli nekomprimirane podatke i ne voli kombiniranje particija;
  • Spark ima previše režija pri rješavanju jednostavnih problema;
  • AWK-ovi asocijativni nizovi vrlo su učinkoviti;
  • možete kontaktirati stdin и stdout iz R skripte, i stoga ga koristite u cjevovodu;
  • Zahvaljujući implementaciji pametnog puta, S3 može obraditi mnogo datoteka;
  • Glavni razlog za gubljenje vremena je prerano optimiziranje vaše metode pohrane;
  • ne pokušavajte ručno optimizirati zadatke, pustite da to učini računalo;
  • API bi trebao biti jednostavan radi lakoće i fleksibilnosti korištenja;
  • Ako su vaši podaci dobro pripremljeni, predmemoriranje će biti jednostavno!

Izvor: www.habr.com

Kupite pouzdan hosting za stranice s DDoS zaštitom, VPS VDS poslužiteljima 🔥 Kupite pouzdan web hosting sa DDoS zaštitom, VPS VDS servere | ProHoster