Analýza 25 TB pomocou AWK a R

Analýza 25 TB pomocou AWK a R
Ako čítať tento článok: Ospravedlňujem sa, že text je taký dlhý a chaotický. Aby som vám ušetril čas, každú kapitolu začínam úvodom „Čo som sa naučil“, ktorý zhŕňa podstatu kapitoly v jednej alebo dvoch vetách.

"Len mi ukáž riešenie!" Ak chcete len vidieť, odkiaľ som prišiel, preskočte na kapitolu „Staneme sa vynaliezavejšou“, ale myslím si, že zaujímavejšie a užitočnejšie je čítať o neúspechu.

Nedávno som dostal za úlohu nastaviť proces spracovania veľkého objemu surových sekvencií DNA (technicky ide o SNP čip). Potreba bola rýchlo získať údaje o danej genetickej polohe (nazývanej SNP) pre následné modelovanie a ďalšie úlohy. Pomocou R a AWK som mohol čistiť a organizovať dáta prirodzeným spôsobom, čo výrazne urýchlilo spracovanie dotazov. Nebolo to pre mňa ľahké a vyžadovalo si to množstvo opakovaní. Tento článok vám pomôže vyhnúť sa niektorým mojim chybám a ukáže vám, s čím som skončil.

Najprv niekoľko vysvetlení na úvod.

Dáta

Naše univerzitné centrum spracovania genetických informácií nám poskytlo údaje vo forme 25 TB TSV. Dostal som ich rozdelené do 5 balíčkov, komprimovaných Gzipom, z ktorých každý obsahoval asi 240 štvorgigabajtových súborov. Každý riadok obsahoval údaje pre jeden SNP od jedného jedinca. Celkovo boli prenesené údaje o ~2,5 miliónoch SNP a ~60 tisícoch ľudí. Okrem informácií o SNP súbory obsahovali početné stĺpce s číslami odrážajúcimi rôzne charakteristiky, ako je intenzita čítania, frekvencia rôznych alel atď. Celkovo tam bolo asi 30 stĺpcov s jedinečnými hodnotami.

Cieľ

Ako pri každom projekte správy údajov, najdôležitejšou vecou bolo určiť, ako sa budú údaje používať. V tomto prípade budeme väčšinou vyberať modely a pracovné postupy pre SNP založené na SNP. To znamená, že naraz budeme potrebovať údaje len o jednom SNP. Musel som sa naučiť, ako čo najjednoduchšie, najrýchlejšie a najlacnejšie získať všetky záznamy spojené s jedným z 2,5 milióna SNP.

Ako to nerobiť

Aby som citoval vhodné klišé:

Nezlyhal som tisíckrát, len som objavil tisíc spôsobov, ako sa vyhnúť analýze množstva údajov vo formáte, ktorý je vhodný pre dopyty.

Prvý pokus

Čo som sa naučil: Neexistuje lacný spôsob, ako analyzovať 25 TB naraz.

Po absolvovaní kurzu „Pokročilé metódy spracovania veľkých dát“ na univerzite Vanderbilt som si bol istý, že trik je vo vrecku. Nastavenie servera Hive na spustenie všetkých údajov a nahlásenie výsledku bude pravdepodobne trvať hodinu alebo dve. Keďže naše dáta sú uložené v AWS S3, službu som využil Athena, ktorý vám umožňuje aplikovať dotazy Hive SQL na údaje S3. Nemusíte zakladať/zakladať klaster Hive a tiež platíte len za dáta, ktoré hľadáte.

Potom, čo som Athene ukázal svoje údaje a ich formát, spustil som niekoľko testov s dotazmi, ako je tento:

select * from intensityData limit 10;

A rýchlo získali dobre štruktúrované výsledky. Pripravený.

Kým sme sa nepokúsili použiť dáta v našej práci...

Bol som požiadaný, aby som vytiahol všetky informácie o SNP na testovanie modelu. Spustil som dotaz:


select * from intensityData 
where snp = 'rs123456';

...a začal čakať. Po ôsmich minútach a viac ako 4 TB požadovaných dát som dostal výsledok. Athena si účtuje podľa objemu nájdených dát 5 USD za terabajt. Takže táto jediná žiadosť stála 20 dolárov a osem minút čakania. Na spustenie modelu na všetkých údajoch sme museli čakať 38 rokov a zaplatiť 50 miliónov dolárov, čo nám zjavne nevyhovovalo.

Bolo potrebné použiť parkety...

Čo som sa naučil: Dávajte pozor na veľkosť vašich parketových súborov a ich usporiadanie.

Najprv som sa pokúsil napraviť situáciu konverziou všetkých TSV na Pilníky na parkety. Sú vhodné na prácu s veľkými súbormi údajov, pretože informácie v nich sú uložené v stĺpcovom tvare: každý stĺpec leží vo vlastnom segmente pamäte/disku, na rozdiel od textových súborov, v ktorých riadky obsahujú prvky každého stĺpca. A ak potrebujete niečo nájsť, stačí si prečítať požadovaný stĺpec. Každý súbor navyše ukladá rozsah hodnôt v stĺpci, takže ak hodnota, ktorú hľadáte, nie je v rozsahu stĺpca, Spark nebude strácať čas skenovaním celého súboru.

Zvládol som jednoduchú úlohu AWS lepidlo na konverziu našich TSV na parkety a vložili nové súbory do Atheny. Trvalo to asi 5 hodín. Keď som však žiadosť spustil, dokončenie trvalo približne rovnaký čas a o niečo menej peňazí. Faktom je, že Spark, snažiac sa optimalizovať úlohu, jednoducho rozbalil jeden kus TSV a vložil ho do vlastného kusu parkiet. A pretože každý kus bol dostatočne veľký na to, aby obsahoval celé záznamy mnohých ľudí, každý súbor obsahoval všetky SNP, takže Spark musel otvoriť všetky súbory, aby získal potrebné informácie.

Zaujímavé je, že predvolený (a odporúčaný) typ kompresie Parquet, snappy, nie je možné rozdeliť. Každý exekútor sa preto zasekol pri úlohe rozbaliť a stiahnuť celý 3,5 GB dataset.

Analýza 25 TB pomocou AWK a R

Poďme pochopiť problém

Čo som sa naučil: Triedenie je náročné, najmä ak sú dáta distribuované.

Zdalo sa mi, že teraz som pochopil podstatu problému. Potreboval som len zoradiť údaje podľa stĺpca SNP, nie podľa ľudí. Potom bude niekoľko SNP uložených v samostatnom dátovom bloku a potom sa Parquetova „inteligentná“ funkcia „otvorí iba vtedy, ak je hodnota v rozsahu“ ukáže v celej svojej kráse. Bohužiaľ, triedenie medzi miliardami riadkov roztrúsených po klastri sa ukázalo ako náročná úloha.

AWS rozhodne nechce vrátiť peniaze z dôvodu „som roztržitý študent“. Keď som spustil triedenie na Amazon Glue, bežalo to 2 dni a havarovalo.

A čo rozdelenie?

Čo som sa naučil: Priečky v Sparku musia byť vyvážené.

Potom som prišiel s myšlienkou rozdeliť údaje na chromozómy. Je ich 23 (a niekoľko ďalších, ak vezmete do úvahy mitochondriálnu DNA a nezmapované oblasti).
To vám umožní rozdeliť údaje na menšie časti. Ak do funkcie exportu Spark v skripte Glue pridáte iba jeden riadok partition_by = "chr", potom by sa údaje mali rozdeliť do segmentov.

Analýza 25 TB pomocou AWK a R
Genóm pozostáva z mnohých fragmentov nazývaných chromozómy.

Žiaľ, nepodarilo sa. Chromozómy majú rôzne veľkosti, čo znamená rôzne množstvo informácií. To znamená, že úlohy, ktoré Spark posielal pracovníkom, neboli vyvážené a dokončené pomaly, pretože niektoré uzly skončili skôr a boli nečinné. Úlohy však boli splnené. No pri požiadavke na jedno SNP nerovnováha opäť spôsobila problémy. Náklady na spracovanie SNP na väčších chromozómoch (teda tam, kde chceme získať údaje) sa znížili len asi 10-krát. Veľa, ale málo.

Čo ak to rozdelíme na ešte menšie časti?

Čo som sa naučil: Nikdy sa nepokúšajte urobiť 2,5 milióna partícií.

Rozhodol som sa ísť naplno a rozdelil som každé SNP. Tým sa zabezpečilo, že oddiely budú mať rovnakú veľkosť. BOL TO ZLÝ NÁPAD. Použila som Lepidlo a pridala nevinnú linku partition_by = 'snp'. Úloha začala a začala sa vykonávať. O deň neskôr som to skontroloval a zistil som, že do S3 stále nie je nič zapísané, tak som úlohu zabil. Vyzerá to, že Glue zapisoval prechodné súbory na skryté miesto v S3, veľa súborov, možno niekoľko miliónov. V dôsledku toho moja chyba stála viac ako tisíc dolárov a nepotešila môjho mentora.

Rozdelenie + triedenie

Čo som sa naučil: Radenie je stále ťažké, rovnako ako ladenie Sparku.

Môj posledný pokus o rozdelenie zahŕňal rozdelenie chromozómov a následné triedenie každého oddielu. Teoreticky by to urýchlilo každý dopyt, pretože požadované údaje SNP museli byť v rámci niekoľkých častí Parquet v danom rozsahu. Žiaľ, triedenie aj rozdelených údajov sa ukázalo ako náročná úloha. V dôsledku toho som prešiel na EMR pre vlastný klaster a použil som osem výkonných inštancií (C5.4xl) a Sparklyr na vytvorenie flexibilnejšieho pracovného toku...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...úloha však stále nebola dokončená. Konfiguroval som to rôznymi spôsobmi: zvýšil alokáciu pamäte pre každého vykonávateľa dotazu, použil uzly s veľkým množstvom pamäte, použil vysielacie premenné (premenné vysielania), ale zakaždým sa ukázalo, že to boli polovičné opatrenia a postupne spúšťače začali zlyhať, kým sa všetko nezastaví.

Stávam sa kreatívnejším

Čo som sa naučil: Niekedy si špeciálne údaje vyžadujú špeciálne riešenia.

Každý SNP má hodnotu polohy. Toto je číslo zodpovedajúce počtu báz pozdĺž jeho chromozómu. Je to pekný a prirodzený spôsob, ako organizovať naše údaje. Najprv som chcel rozdeliť podľa oblastí každého chromozómu. Napríklad pozície 1 – 2000, 2001 – 4000 atď. Problém je však v tom, že SNP nie sú rovnomerne rozložené v chromozómoch, takže veľkosť skupín sa bude veľmi líšiť.

Analýza 25 TB pomocou AWK a R

V dôsledku toho som dospel k rozdeleniu pozícií do kategórií (rank). Pomocou už stiahnutých údajov som spustil požiadavku na získanie zoznamu jedinečných SNP, ich pozícií a chromozómov. Potom som zoradil údaje v rámci každého chromozómu a zhromaždil SNP do skupín (bin) danej veľkosti. Povedzme 1000 SNP každý. To mi poskytlo vzťah medzi SNP a skupinou na chromozóm.

Nakoniec som vytvoril skupiny (bin) 75 SNP, dôvod bude vysvetlený nižšie.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Najprv vyskúšajte Spark

Čo som sa naučil: Agregácia iskier je rýchla, ale rozdelenie je stále drahé.

Chcel som prečítať tento malý (2,5 milióna riadkov) dátový rámec do Sparku, skombinovať ho s nespracovanými údajmi a potom rozdeliť podľa novo pridaného stĺpca bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

použil som sdf_broadcast(), takže Spark vie, že by mal poslať dátový rámec do všetkých uzlov. Je to užitočné, ak majú údaje malú veľkosť a sú potrebné pre všetky úlohy. V opačnom prípade sa Spark snaží byť chytrý a distribuuje dáta podľa potreby, čo môže spôsobiť spomalenie.

A môj nápad opäť nefungoval: úlohy nejaký čas fungovali, dokončili zväzok a potom, ako exekútori spustení delením, začali zlyhávať.

Pridáva sa AWK

Čo som sa naučil: Nespi, keď ťa učia základy. Určite už niekto riešil váš problém v 1980. rokoch.

Až do tohto bodu bola príčinou všetkých mojich neúspechov so Sparkom spleť údajov v klastri. Možno sa situácia dá zlepšiť predbežnou liečbou. Rozhodol som sa, že skúsim rozdeliť nespracované textové údaje do stĺpcov chromozómov, takže som dúfal, že Sparkovi poskytnem „vopred rozdelené“ údaje.

Hľadal som na StackOverflow, ako rozdeliť podľa hodnôt stĺpcov a našiel som taká skvelá odpoveď. Pomocou AWK môžete rozdeliť textový súbor podľa hodnôt stĺpcov tak, že ho napíšete do skriptu a nie odošlete výsledky stdout.

Napísal som Bash skript, aby som to vyskúšal. Stiahol som jeden zo zabalených súborov TSV a potom ho rozbalil pomocou gzip a odoslaný na awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Fungovalo to!

Plnenie jadier

Čo som sa naučil: gnu parallel - je to čarovná vec, mal by ju používať každý.

Rozchod bol dosť pomalý a keď som začal htopna kontrolu použitia výkonnej (a drahej) inštancie EC2 sa ukázalo, že používam iba jedno jadro a asi 200 MB pamäte. Aby sme problém vyriešili a neprišli o veľa peňazí, museli sme prísť na to, ako prácu paralelizovať. Našťastie v úplne úžasnej knihe Dátová veda na príkazovom riadku Našiel som kapitolu od Jerona Janssena o paralelizácii. Z toho som sa dozvedel o gnu parallel, veľmi flexibilná metóda na implementáciu multithreadingu v Unixe.

Analýza 25 TB pomocou AWK a R
Keď som spustil rozdelenie pomocou nového procesu, všetko bolo v poriadku, ale stále tu bolo úzke miesto - sťahovanie objektov S3 na disk nebolo veľmi rýchle a nie úplne paralelizované. Aby som to napravil, urobil som toto:

  1. Zistil som, že je možné implementovať fázu sťahovania S3 priamo v pipeline, čím sa úplne eliminuje medziskladovanie na disku. To znamená, že sa môžem vyhnúť zápisu nespracovaných údajov na disk a používať ešte menšie, a teda lacnejšie úložisko na AWS.
  2. tím aws configure set default.s3.max_concurrent_requests 50 výrazne zvýšil počet vlákien, ktoré používa AWS CLI (v predvolenom nastavení je ich 10).
  3. Prešiel som na inštanciu EC2 optimalizovanú pre rýchlosť siete s písmenom n v názve. Zistil som, že strata výpočtového výkonu pri použití n-inštancií je viac než kompenzovaná zvýšením rýchlosti načítania. Pre väčšinu úloh som použil c5n.4xl.
  4. Zmenené gzip na pigzToto je nástroj gzip, ktorý dokáže urobiť skvelé veci na paralelizáciu pôvodne neparalelizovanej úlohy dekompresie súborov (toto pomohlo najmenej).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Tieto kroky sa navzájom kombinujú, aby všetko fungovalo veľmi rýchlo. Zvýšením rýchlosti sťahovania a odstránením zápisov na disk som teraz mohol spracovať 5 terabajtový balík len za pár hodín.

Tento tweet mal obsahovať „TSV“. žiaľ.

Použitie novo analyzovaných údajov

Čo som sa naučil: Spark má rád nekomprimované dáta a nemá rád kombinovanie oddielov.

Teraz boli dáta v S3 v rozbalenom (čítaj: zdieľanom) a poloobjednanom formáte a mohol som sa opäť vrátiť do Sparku. Čakalo ma prekvapenie: opäť sa mi nepodarilo dosiahnuť to, čo som chcel! Bolo veľmi ťažké povedať Sparkovi presne, ako boli dáta rozdelené. A aj keď som to urobil, ukázalo sa, že bolo príliš veľa oddielov (95 tisíc) a keď som použil coalesce znížil ich počet na rozumné hranice, zničilo to moje rozdelenie. Som si istý, že sa to dá opraviť, ale po pár dňoch hľadania som nenašiel riešenie. Nakoniec som dokončil všetky úlohy v Sparku, aj keď to chvíľu trvalo a moje rozdelené súbory Parquet neboli veľmi malé (~200 KB). Dáta však boli tam, kde ich bolo treba.

Analýza 25 TB pomocou AWK a R
Príliš malé a nerovnomerné, úžasné!

Testovanie miestnych dopytov Spark

Čo som sa naučil: Spark má príliš veľkú réžiu pri riešení jednoduchých problémov.

Stiahnutím dát v šikovnom formáte som mohol otestovať rýchlosť. Nastavte R skript na spustenie lokálneho servera Spark a potom načítajte dátový rámec Spark zo špecifikovaného úložiska skupiny Parquet (zásobníka). Pokúsil som sa načítať všetky údaje, ale nepodarilo sa mi, aby Sparklyr rozpoznal rozdelenie.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Poprava trvala 29,415 sekundy. Oveľa lepšie, ale nie príliš dobré na masové testovanie čohokoľvek. Okrem toho som nemohol veci urýchliť ukladaním do vyrovnávacej pamäte, pretože keď som sa pokúsil uložiť dátový rámec do vyrovnávacej pamäte, Spark vždy zlyhal, aj keď som pridelil viac ako 50 GB pamäte súboru údajov, ktorý vážil menej ako 15.

Návrat na AWK

Čo som sa naučil: Asociatívne polia v AWK sú veľmi efektívne.

Uvedomil som si, že môžem dosiahnuť vyššie rýchlosti. Spomenul som si na to úžasne AWK návod od Brucea Barnetta Čítal som o skvelej funkcii s názvom „asociatívne polia" V podstate ide o páry kľúč-hodnota, ktoré sa v AWK z nejakého dôvodu volali inak, a preto som o nich nejako moc nepremýšľal. Roman Cheplyaka pripomenul, že výraz „asociatívne polia“ je oveľa starší ako výraz „pár kľúč – hodnota“. Aj keď ty vyhľadajte pár kľúč – hodnota v Google Ngram, tento výraz tam neuvidíte, ale nájdete asociatívne polia! Okrem toho sa „pár kľúč-hodnota“ najčastejšie spája s databázami, takže je oveľa zmysluplnejšie porovnávať ho s hashmapou. Uvedomil som si, že tieto asociatívne polia môžem použiť na priradenie svojich SNP k tabuľke bin a nespracovaných údajov bez použitia Spark.

Na tento účel som v skripte AWK použil blok BEGIN. Toto je časť kódu, ktorá sa vykoná pred odoslaním prvého riadku údajov do hlavného tela skriptu.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Tím while(getline...) načítal všetky riadky zo skupiny CSV (bin), nastavte prvý stĺpec (názov SNP) ako kľúč pre asociatívne pole bin a druhá hodnota (skupina) ako hodnota. Potom v bloku { }, ktorý sa vykonáva na všetkých riadkoch hlavného súboru, každý riadok sa odošle do výstupného súboru, ktorý dostane jedinečný názov v závislosti od svojej skupiny (bin): ..._bin_"bin[$1]"_....

Premenné batch_num и chunk_id sa zhodovali s údajmi poskytnutými kanálom, vyhli sa tak konfliktnému stavu a spusteniu každého vykonávacieho vlákna parallel, zapísal do vlastného jedinečného súboru.

Keďže som rozhádzal všetky nespracované údaje do priečinkov na chromozómoch, ktoré zostali z môjho predchádzajúceho experimentu s AWK, teraz som mohol napísať ďalší Bash skript na spracovanie jedného chromozómu po druhom a odoslať hlbšie rozdelené údaje do S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scenár má dve časti parallel.

V prvej časti sa načítajú údaje zo všetkých súborov obsahujúcich informácie o požadovanom chromozóme, potom sa tieto údaje rozdelia medzi vlákna, ktoré rozdelia súbory do príslušných skupín (bin). Aby sa predišlo konfliktným podmienkam, keď viaceré vlákna zapisujú do rovnakého súboru, AWK odovzdáva názvy súborov na zapisovanie údajov na rôzne miesta, napr. chr_10_bin_52_batch_2_aa.csv. V dôsledku toho sa na disku vytvára veľa malých súborov (na to som použil terabajtové zväzky EBS).

Dopravník z druhej sekcie parallel prechádza skupinami (bin) a spája ich jednotlivé súbory do spoločného CSV c cata potom ich odošle na export.

Vysielanie v R?

Čo som sa naučil: Môžete kontaktovať stdin и stdout z R skriptu, a preto ho použite v potrubí.

Možno ste si všimli tento riadok v skripte Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Preloží všetky zreťazené súbory skupín (bin) do R skriptu nižšie. {} je špeciálna technika parallel, ktorý vloží všetky údaje, ktoré odošle do určeného streamu, priamo do samotného príkazu. Možnosť {#} poskytuje jedinečné ID vlákna a {%} predstavuje číslo pracovného miesta (opakované, ale nikdy nie súčasne). Zoznam všetkých možností nájdete v dokumentáciu.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Keď premenná file("stdin") prenášané na readr::read_csv, dáta preložené do R skriptu sa načítajú do rámca, ktorý je potom vo forme .rds- použitie súboru aws.s3 zapísané priamo do S3.

RDS je niečo ako juniorská verzia Parquet, bez zbytočností v úložisku reproduktorov.

Po dokončení Bash skriptu som dostal balík .rds-súbory umiestnené v S3, čo mi umožnilo používať efektívnu kompresiu a vstavané typy.

Napriek použitiu brzdy R všetko fungovalo veľmi rýchlo. Nie je prekvapením, že časti R, ktoré čítajú a zapisujú dáta, sú vysoko optimalizované. Po testovaní na jednom stredne veľkom chromozóme bola úloha dokončená na inštancii C5n.4xl približne za dve hodiny.

Obmedzenia S3

Čo som sa naučil: Vďaka implementácii inteligentnej cesty dokáže S3 spracovať veľa súborov.

Obával som sa, či S3 zvládne to množstvo súborov, ktoré doň boli prenesené. Mohol by som dať menám súborov zmysel, ale ako by ich S3 hľadal?

Analýza 25 TB pomocou AWK a R
Priečinky v S3 sú len na ukážku, v skutočnosti systém symbol nezaujíma /. Zo stránky S3 FAQ.

Zdá sa, že S3 predstavuje cestu ku konkrétnemu súboru ako jednoduchý kľúč v akejsi hašovacej tabuľke alebo databáze založenej na dokumentoch. Vedro si možno predstaviť ako tabuľku a súbory možno považovať za záznamy v tejto tabuľke.

Keďže rýchlosť a efektívnosť sú dôležité pre dosahovanie zisku v Amazone, nie je prekvapením, že tento systém cesty kľúča ako súboru je neuveriteľne optimalizovaný. Snažil som sa nájsť rovnováhu: aby som nemusel zadávať veľa žiadostí, ale aby boli požiadavky rýchlo realizované. Ukázalo sa, že najlepšie je urobiť asi 20 tisíc bin súborov. Myslím si, že ak budeme pokračovať v optimalizácii, môžeme dosiahnuť zvýšenie rýchlosti (napríklad vytvorením špeciálneho vedra len pre dáta, čím sa zníži veľkosť vyhľadávacej tabuľky). Na ďalšie experimenty však nebol čas ani peniaze.

A čo krížová kompatibilita?

Čo som sa naučil: Hlavnou príčinou plytvania časom je predčasná optimalizácia spôsobu ukladania.

V tomto bode je veľmi dôležité položiť si otázku: „Prečo používať proprietárny formát súboru? Dôvodom je rýchlosť načítania (súbory CSV sa načítavali 7-krát dlhšie) a kompatibilita s našimi pracovnými postupmi. Môžem prehodnotiť, či R môže ľahko načítať súbory Parquet (alebo Arrow) bez načítania Spark. Každý v našom laboratóriu používa R a ak potrebujem previesť údaje do iného formátu, stále mám pôvodné textové údaje, takže môžem znova spustiť pipeline.

Rozdelenie práce

Čo som sa naučil: Nepokúšajte sa optimalizovať úlohy manuálne, nechajte to urobiť počítač.

Odladil som pracovný postup na jednom chromozóme, teraz potrebujem spracovať všetky ostatné údaje.
Chcel som zvýšiť niekoľko inštancií EC2 na konverziu, ale zároveň som sa obával veľmi nevyváženej záťaže medzi rôznymi úlohami spracovania (rovnako ako Spark trpel nevyváženými oddielmi). Okrem toho som nemal záujem zvýšiť jednu inštanciu na chromozóm, pretože pre účty AWS je predvolený limit 10 inštancií.

Potom som sa rozhodol napísať skript v R na optimalizáciu úloh spracovania.

Najprv som požiadal S3, aby vypočítal, koľko úložného priestoru zaberal každý chromozóm.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Potom som napísal funkciu, ktorá vezme celkovú veľkosť, zamieša poradie chromozómov a rozdelí ich do skupín num_jobs a povie vám, aké rozdielne sú veľkosti všetkých úloh spracovania.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Potom som prešiel tisíckami náhodných zmien pomocou purrr a vybral som to najlepšie.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Takže som skončil so súborom úloh, ktoré boli veľmi podobné veľkosti. Potom už len zostávalo zabaliť môj predchádzajúci Bash skript do veľkej slučky for. Napísanie tejto optimalizácie trvalo asi 10 minút. A to je oveľa menej, ako by som minul na manuálne vytváranie úloh, ak by boli nevyvážené. Preto si myslím, že s touto predbežnou optimalizáciou som mal pravdu.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na koniec pridávam príkaz na vypnutie:

sudo shutdown -h now

...a všetko vyšlo! Pomocou AWS CLI som vyvolal inštancie pomocou možnosti user_data dal im Bash skripty ich úloh na spracovanie. Bežali a vypínali sa automaticky, takže som neplatil za extra výpočtový výkon.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Poďme sa zbaliť!

Čo som sa naučil: Rozhranie API by malo byť jednoduché kvôli jednoduchosti a flexibilite používania.

Konečne som dostal dáta na správne miesto a vo forme. Ostávalo už len čo najviac zjednodušiť proces využívania dát, aby to bolo pre kolegov jednoduchšie. Chcel som vytvoriť jednoduché API na vytváranie požiadaviek. Ak sa v budúcnosti rozhodnem prejsť z .rds na parkety, potom by to mal byť problém pre mňa, nie pre mojich kolegov. Na tento účel som sa rozhodol urobiť vnútorný balík R.

Zostavte a zdokumentujte veľmi jednoduchý balík obsahujúci len niekoľko funkcií prístupu k údajom usporiadaných okolo funkcie get_snp. Pre kolegov som urobil aj webovú stránku pkgdown, takže môžu jednoducho vidieť príklady a dokumentáciu.

Analýza 25 TB pomocou AWK a R

Inteligentné ukladanie do vyrovnávacej pamäte

Čo som sa naučil: Ak sú vaše údaje dobre pripravené, ukladanie do vyrovnávacej pamäte bude jednoduché!

Keďže jeden z hlavných pracovných postupov aplikoval rovnaký analytický model na balík SNP, rozhodol som sa využiť binning vo svoj prospech. Pri prenose dát cez SNP sú všetky informácie zo skupiny (bin) pripojené k vrátenému objektu. To znamená, že staré dopyty môžu (teoreticky) urýchliť spracovanie nových dopytov.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Pri zostavovaní balíka som spustil mnoho benchmarkov na porovnanie rýchlosti pri použití rôznych metód. Odporúčam to nezanedbávať, pretože niekedy sú výsledky neočakávané. Napríklad, dplyr::filter bolo oveľa rýchlejšie ako zachytávanie riadkov pomocou filtrovania založeného na indexovaní a načítanie jedného stĺpca z filtrovaného dátového rámca bolo oveľa rýchlejšie ako pomocou syntaxe indexovania.

Upozorňujeme, že objekt prev_snp_results obsahuje kľúč snps_in_bin. Toto je pole všetkých jedinečných SNP v skupine (bin), čo vám umožňuje rýchlo skontrolovať, či už máte údaje z predchádzajúceho dotazu. Tiež to uľahčuje prechádzať cez všetky SNP v skupine (zásobníku) s týmto kódom:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

výsledky

Teraz môžeme (a začali sme vážne) spúšťať modely a scenáre, ktoré nám boli predtým nedostupné. Najlepšie je, že moji kolegovia z laboratória nemusia myslieť na žiadne komplikácie. Majú len funkciu, ktorá funguje.

A hoci ich balík šetrí detailmi, snažil som sa, aby bol formát údajov dostatočne jednoduchý, aby na to prišli, keby som zajtra náhle zmizol...

Rýchlosť sa citeľne zvýšila. Zvyčajne skenujeme funkčne významné fragmenty genómu. Predtým sme to nemohli urobiť (ukázalo sa to príliš drahé), ale teraz vďaka skupinovej (bin) štruktúre a cachovaniu trvá požiadavka na jeden SNP v priemere menej ako 0,1 sekundy a spotreba dát je taká nízke, že náklady na S3 sú arašidy.

Záver

Tento článok vôbec nie je návodom. Riešenie sa ukázalo ako individuálne a takmer určite nie optimálne. Ide skôr o cestopis. Chcem, aby ostatní pochopili, že takéto rozhodnutia sa v hlave nezdajú úplne sformované, sú výsledkom pokusov a omylov. Tiež, ak hľadáte dátového vedca, majte na pamäti, že efektívne používanie týchto nástrojov vyžaduje skúsenosti a skúsenosti stoja peniaze. Som rád, že som mal prostriedky na zaplatenie, ale mnohí iní, ktorí vedia robiť tú istú prácu lepšie ako ja, nikdy nebudú mať príležitosť kvôli nedostatku peňazí to ani skúsiť.

Nástroje veľkých dát sú všestranné. Ak máte čas, takmer určite môžete napísať rýchlejšie riešenie pomocou inteligentných techník čistenia, ukladania a extrakcie údajov. Nakoniec ide o analýzu nákladov a výnosov.

Čo som sa naučil:

  • neexistuje lacný spôsob, ako analyzovať 25 TB naraz;
  • buďte opatrní s veľkosťou vašich parketových súborov a ich usporiadaním;
  • Priečky v Sparku musia byť vyvážené;
  • Vo všeobecnosti sa nikdy nepokúšajte vytvoriť 2,5 milióna oddielov;
  • Triedenie je stále ťažké, rovnako ako nastavenie Sparku;
  • niekedy si špeciálne údaje vyžadujú špeciálne riešenia;
  • Agregácia iskier je rýchla, ale rozdelenie je stále drahé;
  • nespi, keď ťa učia základy, tvoj problém už zrejme niekto riešil v 1980. rokoch;
  • gnu parallel - toto je čarovná vec, mal by ju používať každý;
  • Spark má rád nekomprimované dáta a nemá rád kombinovanie oddielov;
  • Spark má príliš veľa réžie pri riešení jednoduchých problémov;
  • Asociatívne polia AWK sú veľmi efektívne;
  • môžete kontaktovať stdin и stdout z R skriptu, a preto ho použiť v potrubí;
  • Vďaka implementácii inteligentnej cesty dokáže S3 spracovať veľa súborov;
  • Hlavným dôvodom plytvania časom je predčasná optimalizácia spôsobu skladovania;
  • nepokúšajte sa optimalizovať úlohy manuálne, nechajte to urobiť počítač;
  • API by malo byť jednoduché z dôvodu jednoduchosti a flexibility používania;
  • Ak sú vaše údaje dobre pripravené, ukladanie do vyrovnávacej pamäte bude jednoduché!

Zdroj: hab.com

Pridať komentár