Razčlenjevanje 25TB z uporabo AWK in R

Razčlenjevanje 25TB z uporabo AWK in R
Kako brati ta članek: Opravičujem se, ker je besedilo tako dolgo in kaotično. Da vam prihranim čas, vsako poglavje začnem z uvodom »Kaj sem se naučil«, ki v enem ali dveh stavkih povzema bistvo poglavja.

"Samo pokaži mi rešitev!" Če želite samo videti, od kod prihajam, potem preskočite na poglavje »Postati bolj iznajdljiv«, vendar mislim, da je bolj zanimivo in koristno brati o neuspehu.

Pred kratkim so mi zadolžili vzpostavitev postopka za obdelavo velike količine neobdelanih zaporedij DNK (tehnično SNP čip). Potreba je bila po hitrem pridobivanju podatkov o dani genetski lokaciji (imenovani SNP) za kasnejše modeliranje in druge naloge. Z uporabo R in AWK sem lahko očistil in organiziral podatke na naraven način, kar je močno pospešilo obdelavo poizvedb. To zame ni bilo lahko in zahtevalo je številne ponovitve. Ta članek vam bo pomagal preprečiti nekatere moje napake in vam pokazal, kaj sem končal.

Najprej nekaj uvodnih pojasnil.

Podatki

Naš univerzitetni center za obdelavo genetskih informacij nam je posredoval podatke v obliki 25 TB TSV. Prejel sem jih razdeljene v 5 paketov, stisnjenih z Gzip, od katerih je vsak vseboval približno 240 štirigigabajtnih datotek. Vsaka vrstica je vsebovala podatke za en SNP enega posameznika. Skupaj so bili posredovani podatki o ~ 2,5 milijona SNP in ~ 60 tisoč ljudeh. Poleg informacij SNP so datoteke vsebovale številne stolpce s številkami, ki so odražale različne značilnosti, kot so intenzivnost branja, pogostost različnih alelov itd. Skupaj je bilo približno 30 stolpcev z edinstvenimi vrednostmi.

Cilj

Kot pri vsakem projektu upravljanja podatkov je bilo najpomembnejše določiti, kako bodo podatki uporabljeni. V tem primeru večinoma bomo izbrali modele in poteke dela za SNP, ki temeljijo na SNP. To pomeni, da bomo potrebovali samo podatke o enem SNP naenkrat. Moral sem se naučiti čim lažje, hitreje in ceneje pridobiti vse zapise, povezane z enim od 2,5 milijona SNP.

Kako ne narediti tega

Če citiram primeren kliše:

Ni mi spodletelo tisočkrat, le odkril sem tisoč načinov, kako se izogniti razčlenjevanju množice podatkov v formatu, ki je prijazen do poizvedb.

Najprej poskusite

Kaj sem se naučil: ni poceni načina za razčlenjevanje 25 TB naenkrat.

Po tečaju »Napredne metode za obdelavo velikih podatkov« na univerzi Vanderbilt sem bil prepričan, da je trik v vreči. Verjetno bo trajalo uro ali dve, da nastavite strežnik Hive, da pregleda vse podatke in poroča o rezultatu. Ker so naši podatki shranjeni v AWS S3, sem uporabil storitev Athena, ki omogoča uporabo poizvedb Hive SQL za podatke S3. Ni vam treba nastaviti/vzgojiti gruče Hive in plačate samo za podatke, ki jih iščete.

Ko sem Atheni pokazal svoje podatke in njihovo obliko, sem opravil nekaj testov s takšnimi poizvedbami:

select * from intensityData limit 10;

In hitro prejeli dobro strukturirane rezultate. pripravljena

Dokler nismo poskusili uporabiti podatkov pri našem delu ...

Prosili so me, naj izvlečem vse informacije SNP, da bi preizkusil model. Izvedel sem poizvedbo:


select * from intensityData 
where snp = 'rs123456';

... in začel čakati. Po osmih minutah in več kot 4 TB zahtevanih podatkov sem prejel rezultat. Athena zaračuna glede na količino najdenih podatkov, 5 USD na terabajt. Tako je ta ena zahteva stala 20 dolarjev in osem minut čakanja. Da bi model zagnali na vseh podatkih, smo morali čakati 38 let in plačati 50 milijonov dolarjev.Očitno nam to ni ustrezalo.

Treba je bilo uporabiti parket...

Kaj sem se naučil: Bodite previdni pri velikosti svojih pilic za parket in njihovi organizaciji.

Najprej sem poskušal popraviti situacijo s pretvorbo vseh TSV-jev v Pile za parket. Primerne so za delo z velikimi nabori podatkov, ker so informacije v njih shranjene v stolpčni obliki: vsak stolpec leži v svojem segmentu pomnilnika/diska, v nasprotju z besedilnimi datotekami, v katerih vrstice vsebujejo elemente vsakega stolpca. In če morate nekaj najti, potem samo preberite zahtevani stolpec. Poleg tega vsaka datoteka shrani obseg vrednosti v stolpec, tako da če vrednost, ki jo iščete, ni v obsegu stolpca, Spark ne bo izgubljal časa s skeniranjem celotne datoteke.

Opravil sem preprosto nalogo AWS lepilo za pretvorbo naših TSV-jev v Parquet in spustil nove datoteke v Atheno. Trajalo je približno 5 ur. Toda ko sem zagnal zahtevo, je trajalo približno enako časa in malo manj denarja za dokončanje. Dejstvo je, da je Spark, ki je poskušal optimizirati nalogo, preprosto razpakiral en kos TSV in ga dal v svoj kos Parquet. In ker je bil vsak kos dovolj velik, da je vseboval celotne zapise številnih ljudi, je vsaka datoteka vsebovala vse SNP-je, zato je moral Spark odpreti vse datoteke, da je izluščil potrebne informacije.

Zanimivo je, da Parquetov privzeti (in priporočeni) tip stiskanja, snappy, ni razdelljiv. Zato je vsak izvajalec obstal pri nalogi razpakiranja in prenosa celotnega nabora podatkov v velikosti 3,5 GB.

Razčlenjevanje 25TB z uporabo AWK in R

Razumejmo problem

Kaj sem se naučil: Razvrščanje je težko, še posebej, če so podatki porazdeljeni.

Zdelo se mi je, da zdaj razumem bistvo problema. Podatke sem moral samo razvrstiti po stolpcu SNP, ne po ljudeh. Nato bo več SNP-jev shranjenih v ločenem podatkovnem delu in takrat se bo Parquetova "pametna" funkcija "odprto samo, če je vrednost v območju" pokazala v vsem svojem sijaju. Na žalost se je razvrščanje med milijardami vrstic, razpršenih po gruči, izkazalo za težko nalogo.

AWS vsekakor ne želi izdati vračila zaradi razloga "sem raztresen študent". Ko sem zagnal sortiranje na Amazon Glue, je delovalo 2 dni in se zrušilo.

Kaj pa particioniranje?

Kaj sem se naučil: Particije v Sparku morajo biti uravnotežene.

Potem sem prišel na idejo o razdelitvi podatkov v kromosome. Teh je 23 (in več, če upoštevamo mitohondrijsko DNK in nekartirane regije).
To vam bo omogočilo, da podatke razdelite na manjše dele. Če dodate samo eno vrstico izvozni funkciji Spark v skriptu Glue partition_by = "chr", potem je treba podatke razdeliti v vedra.

Razčlenjevanje 25TB z uporabo AWK in R
Genom je sestavljen iz številnih fragmentov, imenovanih kromosomi.

Na žalost ni šlo. Kromosomi so različnih velikosti, kar pomeni različne količine informacij. To pomeni, da naloge, ki jih je Spark poslal delavcem, niso bile uravnotežene in dokončane počasi, ker so se nekatera vozlišča končala predčasno in bila nedejavna. Vendar so bile naloge opravljene. Toda pri zahtevi za en SNP je neravnovesje spet povzročilo težave. Stroški obdelave SNP-jev na večjih kromosomih (to je tam, kjer želimo dobiti podatke) so se zmanjšali le za faktor 10. Veliko, a premalo.

Kaj pa, če ga razdelimo na še manjše dele?

Kaj sem se naučil: Nikoli ne poskušajte narediti 2,5 milijona particij.

Odločil sem se, da bom dal vse od sebe in razdelil vsak SNP. To je zagotovilo, da so particije enake velikosti. BILA JE SLABA IDEJA. Uporabil sem lepilo in dodal nedolžno črto partition_by = 'snp'. Naloga se je začela in začela izvajati. Dan kasneje sem preveril in videl, da na S3 še vedno ni nič zapisanega, zato sem nalogo uničil. Videti je, da je Glue pisal vmesne datoteke na skrito lokacijo v S3, veliko datotek, morda nekaj milijonov. Posledično je moja napaka stala več kot tisoč dolarjev in ni zadovoljila mojega mentorja.

Razdelitev + sortiranje

Kaj sem se naučil: Razvrščanje je še vedno težko, prav tako uglaševanje Spark.

Moj zadnji poskus delitve je vključeval razdelitev kromosomov in nato razvrščanje vsake particije. Teoretično bi to pospešilo vsako poizvedbo, ker so morali biti želeni podatki SNP znotraj nekaj kosov Parquet znotraj danega obsega. Na žalost se je razvrščanje celo particioniranih podatkov izkazalo za težko nalogo. Posledično sem preklopil na EMR za gručo po meri in uporabil osem zmogljivih primerkov (C5.4xl) in Sparklyr, da sem ustvaril bolj prilagodljiv potek dela ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...vendar naloga še vedno ni bila opravljena. Konfiguriral sem ga na različne načine: povečal sem dodelitev pomnilnika za vsakega izvajalca poizvedbe, uporabil vozlišča z veliko količino pomnilnika, uporabil spremenljivke za oddajanje (spremenljivke za oddajanje), vendar se je vsakič izkazalo, da so to polovični ukrepi in postopoma so izvajalci začeli propadati, dokler se vse ne ustavi.

Postajam bolj ustvarjalna

Kaj sem se naučil: Včasih posebni podatki zahtevajo posebne rešitve.

Vsak SNP ima vrednost položaja. To je število, ki ustreza številu baz vzdolž njegovega kromosoma. To je lep in naraven način organiziranja naših podatkov. Sprva sem želel razdeliti po regijah vsakega kromosoma. Na primer položaji 1 - 2000, 2001 - 4000 itd. Toda težava je v tem, da SNP-ji niso enakomerno porazdeljeni po kromosomih, zato se bodo velikosti skupin zelo razlikovale.

Razčlenjevanje 25TB z uporabo AWK in R

Posledično sem prišel do razčlenitve pozicij v kategorije (rank). Z uporabo že prenesenih podatkov sem zagnal zahtevo za pridobitev seznama edinstvenih SNP-jev, njihovih položajev in kromosomov. Nato sem razvrstil podatke znotraj vsakega kromosoma in zbral SNP-je v skupine (bin) določene velikosti. Recimo po 1000 SNP-jev. To mi je dalo razmerje med SNP in skupino na kromosom.

Na koncu sem naredil skupine (bin) 75 SNP-jev, razlog bo razložen spodaj.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Najprej poskusite s Sparkom

Kaj sem se naučil: Združevanje Spark je hitro, vendar je particioniranje še vedno drago.

Želel sem prebrati ta majhen (2,5 milijona vrstic) podatkovni okvir v Spark, ga združiti z neobdelanimi podatki in nato razdeliti na novo dodane stolpce bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

uporabil sem sdf_broadcast(), tako da Spark ve, da bi moral poslati podatkovni okvir vsem vozliščem. To je uporabno, če so podatki majhni in potrebni za vse naloge. V nasprotnem primeru Spark poskuša biti pameten in distribuira podatke po potrebi, kar lahko povzroči upočasnitev.

In spet moja ideja ni delovala: naloge so delovale nekaj časa, dokončale unijo, nato pa so, tako kot izvajalci, ki jih je sprožila particija, začele odpovedovati.

Dodajanje AWK

Kaj sem se naučil: Ne spi, ko te učijo osnov. Zagotovo je vaš problem že kdo rešil v osemdesetih letih.

Do te točke je bil razlog za vse moje neuspehe s Sparkom zmešnjava podatkov v gruči. Morda se stanje lahko izboljša s predhodnim zdravljenjem. Odločil sem se, da bom neobdelane besedilne podatke poskusil razdeliti na stolpce kromosomov, zato sem upal, da bom Sparku zagotovil »vnaprej particionirane« podatke.

Na StackOverflow sem iskal, kako razdeliti po vrednostih stolpcev in našel tako odličen odgovor. Z AWK lahko besedilno datoteko razdelite po vrednostih stolpcev tako, da jo zapišete v skript, namesto da pošljete rezultate stdout.

Napisal sem skript Bash, da ga preizkusim. Prenesel enega od pakiranih TSV-jev, nato pa ga razpakiral z uporabo gzip in poslano na awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Delovalo je!

Polnjenje jeder

Kaj sem se naučil: gnu parallel - to je čarobna stvar, vsi bi jo morali uporabljati.

Ločevanje je potekalo precej počasi in ko sem začel htopza preverjanje uporabe zmogljivega (in dragega) primerka EC2 se je izkazalo, da uporabljam samo eno jedro in cca 200 MB pomnilnika. Da bi rešili problem in ne izgubili veliko denarja, smo morali ugotoviti, kako vzporediti delo. Na srečo v popolnoma neverjetni knjigi Podatkovna znanost v ukazni vrstici Našel sem poglavje Jerona Janssensa o paralelizaciji. Iz njega sem izvedel za gnu parallel, zelo prilagodljiva metoda za izvajanje večnitnosti v Unixu.

Razčlenjevanje 25TB z uporabo AWK in R
Ko sem začel particioniranje z novim postopkom, je bilo vse v redu, vendar je bilo še vedno ozko grlo - prenos objektov S3 na disk ni bil zelo hiter in ni bil popolnoma vzporeden. Da bi to popravil, sem naredil to:

  1. Ugotovil sem, da je možno implementirati stopnjo prenosa S3 neposredno v cevovodu, s čimer popolnoma odpravimo vmesno shranjevanje na disku. To pomeni, da se lahko izognem pisanju neobdelanih podatkov na disk in uporabim še manjši in zato cenejši prostor za shranjevanje na AWS.
  2. ekipa aws configure set default.s3.max_concurrent_requests 50 močno povečal število niti, ki jih uporablja AWS CLI (privzeto jih je 10).
  3. Preklopil sem na primerek EC2, optimiziran za hitrost omrežja, s črko n v imenu. Ugotovil sem, da se izguba procesorske moči pri uporabi n-primerkov več kot kompenzira s povečanjem hitrosti nalaganja. Za večino nalog sem uporabil c5n.4xl.
  4. Spremenjeno gzip o pigz, to je orodje gzip, ki lahko naredi kul stvari za paralelizacijo prvotno neparalelizirane naloge dekompresije datotek (to je najmanj pomagalo).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ti koraki so med seboj kombinirani, da vse deluje zelo hitro. S povečanjem hitrosti prenosa in odpravo zapisovanja na disk bi zdaj lahko obdelal 5 terabajtni paket v samo nekaj urah.

Ta tvit bi moral omeniti 'TSV'. žal

Uporaba na novo razčlenjenih podatkov

Kaj sem se naučil: Spark ima rad nestisnjene podatke in ne mara združevanja particij.

Zdaj so bili podatki v S3 v nepakirani (beri: v skupni rabi) in pol urejeni obliki in lahko sem se znova vrnil v Spark. Čakalo me je presenečenje: spet mi ni uspelo doseči želenega! Sparku je bilo zelo težko natančno povedati, kako so bili podatki razdeljeni. In tudi ko sem to naredil, se je izkazalo, da je particij preveč (95 tisoč), in ko sem uporabil coalesce zmanjšal njihovo število na razumne meje, to je uničilo mojo particijo. Prepričan sem, da se to da popraviti, a po nekaj dneh iskanja nisem našel rešitve. Na koncu sem dokončal vse naloge v Sparku, čeprav je trajalo nekaj časa in moje razdeljene datoteke Parquet niso bile zelo majhne (~200 KB). Vendar so bili podatki tam, kjer so bili potrebni.

Razčlenjevanje 25TB z uporabo AWK in R
Premajhna in neenakomerna, čudovito!

Preizkušanje lokalnih poizvedb Spark

Kaj sem se naučil: Spark ima preveč stroškov pri reševanju preprostih problemov.

S prenosom podatkov v pametni obliki sem lahko preizkusil hitrost. Nastavite skript R za zagon lokalnega strežnika Spark in nato naložite podatkovni okvir Spark iz določenega pomnilnika skupine Parquet (smetnjak). Poskušal sem naložiti vse podatke, vendar Sparklyrja nisem mogel prepričati, da bi prepoznal particijo.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Izvedba je trajala 29,415 sekunde. Veliko boljši, vendar ne preveč dober za množično testiranje česar koli. Poleg tega nisem mogel pospešiti stvari s predpomnjenjem, ker ko sem poskušal predpomniti podatkovni okvir v pomnilnik, se je Spark vedno zrušil, tudi ko sem naboru podatkov, ki je tehtal manj kot 50, dodelil več kot 15 GB pomnilnika.

Nazaj na AWK

Kaj sem se naučil: Asociativni nizi v AWK so zelo učinkoviti.

Spoznal sem, da lahko dosežem višje hitrosti. Tega sem se spomnil v čudovitem Vadnica AWK Brucea Barnetta Prebral sem o kul funkciji, imenovani »asociativni nizi" V bistvu so to pari ključ-vrednost, ki so se v AWK iz nekega razloga imenovali drugače, zato o njih nekako nisem veliko razmišljal. Roman Čepljaka spomnil, da je izraz "asociativni nizi" veliko starejši od izraza "par ključ-vrednost". Tudi če ti poiščite ključ-vrednost v Google Ngram, tega izraza tam ne boste videli, našli pa boste asociativne nize! Poleg tega je "par ključ-vrednost" najpogosteje povezan z bazami podatkov, zato ga je veliko bolj smiselno primerjati s hashmapom. Spoznal sem, da bi lahko uporabil te asociativne nize za povezovanje svojih SNP-jev z bin tabelo in neobdelanimi podatki brez uporabe Spark.

Da bi to naredil, sem v skriptu AWK uporabil blok BEGIN. To je del kode, ki se izvede, preden se prva vrstica podatkov posreduje glavnemu delu skripta.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Ekipa while(getline...) naloži vse vrstice iz skupine CSV (bin), nastavite prvi stolpec (ime SNP) kot ključ za asociativno polje bin in drugo vrednost (skupino) kot vrednost. Potem v bloku { }, ki se izvaja v vseh vrsticah glavne datoteke, se vsaka vrstica pošlje v izhodno datoteko, ki prejme edinstveno ime glede na svojo skupino (bin): ..._bin_"bin[$1]"_....

Spremenljivke batch_num и chunk_id se ujemajo s podatki, ki jih zagotavlja cevovod, s čimer se izognejo pogojem tekmovanja in izvajajo vsako nit izvajanja parallel, je zapisal v svojo edinstveno datoteko.

Ker sem vse neobdelane podatke razpršil v mape na kromosomih, ki so ostali od mojega prejšnjega poskusa z AWK, bi zdaj lahko napisal še en skript Bash za obdelavo enega kromosoma naenkrat in poslal globlje particionirane podatke v S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scenarij ima dva dela parallel.

V prvem razdelku se preberejo podatki iz vseh datotek, ki vsebujejo informacije o želenem kromosomu, nato se ti podatki porazdelijo po nitih, ki porazdelijo datoteke v ustrezne skupine (bin). Da bi se izognili tekmovalnim pogojem, ko več niti piše v isto datoteko, AWK posreduje imena datotek za pisanje podatkov na različna mesta, npr. chr_10_bin_52_batch_2_aa.csv. Posledično se na disku ustvari veliko majhnih datotek (za to sem uporabil terabajtne nosilce EBS).

Transporter iz drugega odseka parallel gre skozi skupine (bin) in združi njihove posamezne datoteke v skupni CSV c catin jih nato pošlje v izvoz.

Oddajanje v R?

Kaj sem se naučil: Lahko kontaktirate stdin и stdout iz skripta R in ga zato uporabite v cevovodu.

Morda ste opazili to vrstico v skriptu Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Prevede vse povezane datoteke skupine (bin) v skript R spodaj. {} je posebna tehnika parallel, ki vse podatke, ki jih pošlje podanemu toku, vstavi neposredno v sam ukaz. Možnost {#} zagotavlja edinstven ID niti in {%} predstavlja številko delovne reže (ponavlja se, vendar nikoli hkrati). Seznam vseh možnosti najdete v dokumentacijo.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Ko spremenljivka file("stdin") prenese na readr::read_csv, se podatki, prevedeni v skript R, naložijo v okvir, ki je nato v obliki .rds- uporaba datoteke aws.s3 napisano neposredno na S3.

RDS je nekaj podobnega mlajši različici Parquet, brez dodatkov za shranjevanje zvočnikov.

Ko sem dokončal skript Bash, sem dobil paket .rds-datoteke, ki se nahajajo v S3, kar mi je omogočilo uporabo učinkovitega stiskanja in vgrajenih vrst.

Kljub uporabi zavore R je vse delovalo zelo hitro. Ni presenetljivo, da so deli R, ki berejo in pišejo podatke, zelo optimizirani. Po testiranju na enem srednje velikem kromosomu je delo na primerku C5n.4xl opravljeno v približno dveh urah.

S3 Omejitve

Kaj sem se naučil: Zahvaljujoč implementaciji pametne poti lahko S3 obravnava veliko datotek.

Skrbelo me je, ali bo S3 zmogel obdelati številne datoteke, ki so bile prenesene vanj. Lahko bi naredil imena datotek smiselna, ampak kako bi jih S3 iskal?

Razčlenjevanje 25TB z uporabo AWK in R
Mape v S3 so samo za predstavo, v bistvu sistem ne zanima simbol /. Na strani s pogostimi vprašanji S3.

Zdi se, da S3 predstavlja pot do določene datoteke kot preprost ključ v nekakšni zgoščeni tabeli ali podatkovni zbirki, ki temelji na dokumentu. Vedro si lahko predstavljamo kot tabelo, datoteke pa lahko obravnavamo kot zapise v tej tabeli.

Ker sta hitrost in učinkovitost pomembni za ustvarjanje dobička pri Amazonu, ni presenetljivo, da je ta sistem ključ-kot-datotečna pot presneto optimiziran. Poskušal sem najti ravnotežje: da mi ni bilo treba narediti veliko get prošenj, ampak da so bile prošnje hitro izvedene. Izkazalo se je, da je najbolje narediti približno 20 tisoč bin datotek. Mislim, da če nadaljujemo z optimizacijo, lahko dosežemo povečanje hitrosti (na primer, da naredimo posebno vedro samo za podatke, s čimer zmanjšamo velikost iskalne tabele). Toda za nadaljnje poskuse ni bilo časa in denarja.

Kaj pa navzkrižna združljivost?

Kaj sem se naučil: Glavni vzrok izgubljenega časa je prezgodnja optimizacija načina shranjevanja.

Na tej točki je zelo pomembno, da se vprašate: "Zakaj uporabljati lastniški format datoteke?" Razlog je v hitrosti nalaganja (gzipirane datoteke CSV so se nalagale 7-krat dlje) in združljivost z našimi poteki dela. Morda bom premislil, ali lahko R zlahka naloži datoteke Parquet (ali Arrow) brez nalaganja Spark. Vsi v našem laboratoriju uporabljajo R in če moram podatke pretvoriti v drugo obliko, imam še vedno izvirne besedilne podatke, tako da lahko znova zaženem cevovod.

Delitev dela

Kaj sem se naučil: Ne poskušajte ročno optimizirati opravil, pustite, da to naredi računalnik.

Odpravil sem napake v poteku dela na enem kromosomu, zdaj pa moram obdelati vse druge podatke.
Želel sem dvigniti več primerkov EC2 za pretvorbo, vendar sem se hkrati bal, da bi dobil zelo neuravnoteženo obremenitev med različnimi opravili obdelave (tako kot je Spark trpel zaradi neuravnoteženih particij). Poleg tega me ni zanimalo dvigovanje enega primerka na kromosom, ker je za račune AWS privzeta omejitev 10 primerkov.

Potem sem se odločil napisati skript v R za optimizacijo opravil obdelave.

Najprej sem prosil S3, da izračuna, koliko prostora za shranjevanje zaseda vsak kromosom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Nato sem napisal funkcijo, ki vzame skupno velikost, premeša vrstni red kromosomov in jih razdeli v skupine num_jobs in vam pove, kako različne so velikosti vseh opravil obdelave.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Nato sem s pomočjo purrr pregledal tisoč premešanj in izbral najboljše.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Tako sem končal z nizom nalog, ki so bile zelo podobne velikosti. Potem je preostalo samo to, da sem svoj prejšnji skript Bash zavil v veliko zanko for. Pisanje te optimizacije je trajalo približno 10 minut. In to je veliko manj, kot bi porabil za ročno ustvarjanje nalog, če bi bile neuravnotežene. Zato menim, da sem imel prav s to predhodno optimizacijo.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na koncu dodam ukaz za zaustavitev:

sudo shutdown -h now

... in vse se je izšlo! Z uporabo AWS CLI sem dvignil primerke z možnostjo user_data jim je v obdelavo dal Bash skripte njihovih nalog. Zagnali so se in izklopili samodejno, tako da nisem plačeval dodatne procesorske moči.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pakirajmo!

Kaj sem se naučil: API mora biti preprost zaradi enostavne in prilagodljive uporabe.

Končno sem dobil podatke na pravem mestu in v pravi obliki. Preostalo je le še, da čim bolj poenostavim proces uporabe podatkov, da ga sodelavcem olajšam. Želel sem narediti preprost API za ustvarjanje zahtev. Če se v prihodnosti odločim za zamenjavo .rds na Pile za parket, potem bi to moral biti problem zame, ne za moje sodelavce. Za to sem se odločil narediti interni paket R.

Zgradite in dokumentirajte zelo preprost paket, ki vsebuje le nekaj funkcij za dostop do podatkov, organiziranih okoli funkcije get_snp. Izdelal sem tudi spletno stran za svoje sodelavce pkgdown, tako da si lahko preprosto ogledajo primere in dokumentacijo.

Razčlenjevanje 25TB z uporabo AWK in R

Pametno predpomnjenje

Kaj sem se naučil: Če so vaši podatki dobro pripravljeni, bo predpomnjenje enostavno!

Ker je eden od glavnih delovnih tokov uporabil isti model analize za paket SNP, sem se odločil uporabiti združevanje v svojo korist. Pri prenosu podatkov prek SNP so vse informacije iz skupine (bin) pripete vrnjenemu objektu. To pomeni, da lahko stare poizvedbe (teoretično) pospešijo obdelavo novih poizvedb.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Pri gradnji paketa sem izvedel veliko primerjalnih testov, da sem primerjal hitrost pri uporabi različnih metod. Priporočam, da tega ne zanemarite, saj so včasih rezultati nepričakovani. na primer dplyr::filter je bil veliko hitrejši kot zajem vrstic s filtriranjem na podlagi indeksiranja, pridobivanje enega samega stolpca iz filtriranega podatkovnega okvira pa je bilo veliko hitrejše kot z uporabo sintakse indeksiranja.

Upoštevajte, da je predmet prev_snp_results vsebuje ključ snps_in_bin. To je niz vseh edinstvenih SNP-jev v skupini (bin), ki vam omogoča, da hitro preverite, ali že imate podatke iz prejšnje poizvedbe. Omogoča tudi enostavno zanko po vseh SNP-jih v skupini (bin) s to kodo:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Ugotovitve

Zdaj lahko (in smo začeli resno) izvajamo modele in scenarije, ki nam prej niso bili dostopni. Najboljše pa je, da kolegom iz laboratorija ni treba razmišljati o zapletih. Imajo le funkcijo, ki deluje.

In čeprav jim paket prihrani podrobnosti, sem poskušal narediti format podatkov dovolj preprost, da bi lahko ugotovili, če bi jutri nenadoma izginil ...

Hitrost se je opazno povečala. Običajno skeniramo funkcionalno pomembne fragmente genoma. Prej tega nismo mogli storiti (izkazalo se je predrago), zdaj pa zaradi skupine (bin) strukture in predpomnjenja zahteva za en SNP v povprečju traja manj kot 0,1 sekunde, poraba podatkov pa je tako nizki, da so stroški za S3 oreški.

Zaključek

Ta članek sploh ni vodnik. Rešitev se je izkazala za individualno in skoraj zagotovo ne optimalno. Prej je potopis. Želim, da drugi razumejo, da se takšne odločitve ne zdijo popolnoma oblikovane v glavi, ampak so rezultat poskusov in napak. Poleg tega, če iščete podatkovnega znanstvenika, ne pozabite, da učinkovita uporaba teh orodij zahteva izkušnje, izkušnje pa stanejo. Srečen sem, da sem imel sredstva za plačilo, a mnogi drugi, ki znajo isto delo opravljati bolje kot jaz, zaradi pomanjkanja denarja ne bodo nikoli imeli priložnosti niti poskusiti.

Orodja za velike podatke so vsestranska. Če imate čas, lahko skoraj zagotovo napišete hitrejšo rešitev z uporabo pametnih tehnik čiščenja, shranjevanja in ekstrakcije podatkov. Navsezadnje pride do analize stroškov in koristi.

Kaj sem se naučil:

  • ni poceni načina za razčlenjevanje 25 TB naenkrat;
  • bodite previdni pri velikosti pilic za parket in njihovi organizaciji;
  • Predelne stene v Sparku morajo biti uravnotežene;
  • Na splošno nikoli ne poskušajte narediti 2,5 milijona particij;
  • Razvrščanje je še vedno težavno, prav tako nastavitev Spark;
  • včasih posebni podatki zahtevajo posebne rešitve;
  • Spark združevanje je hitro, vendar je particioniranje še vedno drago;
  • ne spi, ko te učijo osnov, verjetno je nekdo že v osemdesetih rešil tvoj problem;
  • gnu parallel - to je čarobna stvar, vsi bi jo morali uporabljati;
  • Spark ima rad nestisnjene podatke in ne mara združevanja particij;
  • Spark ima preveč stroškov pri reševanju preprostih problemov;
  • Asociativni nizi AWK so zelo učinkoviti;
  • lahko kontaktirate stdin и stdout iz skripta R in ga zato uporabite v cevovodu;
  • Zahvaljujoč izvedbi pametne poti lahko S3 obdela veliko datotek;
  • Glavni razlog za izgubo časa je prezgodnja optimizacija načina shranjevanja;
  • ne poskušajte ročno optimizirati nalog, pustite, da to naredi računalnik;
  • API mora biti preprost zaradi enostavne in prilagodljive uporabe;
  • Če so vaši podatki dobro pripravljeni, bo predpomnjenje enostavno!

Vir: www.habr.com

Dodaj komentar