Analýza 25 TB pomocí AWK a R

Analýza 25 TB pomocí AWK a R
Jak číst tento článek: Omlouvám se za tak dlouhý a chaotický text. Abych vám ušetřil čas, každou kapitolu začínám úvodem „Co jsem se naučil“, který shrnuje podstatu kapitoly v jedné nebo dvou větách.

"Prostě mi ukaž řešení!" Pokud se jen chcete podívat, odkud jsem přišel, pak přeskočte na kapitolu „Stát se vynalézavější“, ale myslím, že je zajímavější a užitečnější číst o neúspěchu.

Nedávno jsem dostal za úkol nastavit proces pro zpracování velkého objemu nezpracovaných sekvencí DNA (technicky jde o SNP čip). Bylo potřeba rychle získat data o dané genetické poloze (nazývané SNP) pro následné modelování a další úkoly. Pomocí R a AWK jsem mohl čistit a organizovat data přirozeným způsobem, což výrazně urychlilo zpracování dotazů. Nebylo to pro mě snadné a vyžadovalo to mnoho opakování. Tento článek vám pomůže vyhnout se některým mým chybám a ukáže vám, k čemu jsem nakonec dospěl.

Nejprve několik úvodních vysvětlení.

Data

Naše univerzitní centrum zpracování genetických informací nám poskytlo data ve formě 25TB TSV. Dostal jsem je rozdělené do 5 balíčků, komprimovaných Gzipem, z nichž každý obsahoval asi 240 čtyřgigabajtových souborů. Každý řádek obsahoval data pro jeden SNP od jednoho jedince. Celkem byly přeneseny údaje o ~2,5 milionu SNP a ~60 tisících osob. Kromě informací SNP obsahovaly soubory četné sloupce s čísly odrážejícími různé charakteristiky, jako je intenzita čtení, frekvence různých alel atd. Celkem tam bylo asi 30 sloupců s jedinečnými hodnotami.

terč

Jako u každého projektu správy dat bylo nejdůležitější určit, jak budou data použita. V tomto případě budeme většinou vybírat modely a pracovní postupy pro SNP založené na SNP. To znamená, že budeme potřebovat údaje pouze o jednom SNP najednou. Musel jsem se naučit, jak co nejsnáze, rychle a levně získat všechny záznamy spojené s jedním z 2,5 milionů SNP.

Jak to nedělat

Abych citoval vhodné klišé:

Neselhal jsem tisíckrát, jen jsem objevil tisíc způsobů, jak se vyhnout analýze hromady dat ve formátu vhodném pro dotazy.

První pokus

Co jsem se naučil: Neexistuje žádný levný způsob, jak analyzovat 25 TB najednou.

Po absolvování kurzu „Pokročilé metody pro zpracování velkých dat“ na Vanderbilt University jsem si byl jistý, že trik je v pytli. Pravděpodobně bude trvat hodinu nebo dvě, než nastavíte server Hive, aby prošel všechna data a oznámil výsledek. Vzhledem k tomu, že naše data jsou uložena v AWS S3, službu jsem využil Athena, který umožňuje aplikovat dotazy Hive SQL na data S3. Nemusíte zakládat/zakládat Hive cluster a také platíte pouze za data, která hledáte.

Poté, co jsem Atheně ukázal svá data a jejich formát, provedl jsem několik testů s dotazy, jako je tento:

select * from intensityData limit 10;

A rychle získal dobře strukturované výsledky. Připraven.

Dokud jsme se nepokusili použít data v naší práci...

Byl jsem požádán, abych vytáhl všechny informace o SNP, abych mohl model otestovat. Spustil jsem dotaz:


select * from intensityData 
where snp = 'rs123456';

...a začal čekat. Po osmi minutách a více než 4 TB požadovaných dat jsem obdržel výsledek. Athena si účtuje podle objemu nalezených dat 5 USD za terabajt. Takže tato jediná žádost stála 20 dolarů a osm minut čekání. Abychom mohli model spustit na všech datech, museli jsme čekat 38 let a zaplatit 50 milionů dolarů. Je zřejmé, že to pro nás nebylo vhodné.

Bylo nutné použít parkety...

Co jsem se naučil: Dávejte pozor na velikost souborů parket a jejich uspořádání.

Nejprve jsem se pokusil situaci napravit převedením všech TSV na Pilníky na parkety. Jsou vhodné pro práci s velkými datovými soubory, protože informace v nich jsou uloženy ve sloupcové formě: každý sloupec leží ve vlastním segmentu paměti/disku, na rozdíl od textových souborů, ve kterých řádky obsahují prvky každého sloupce. A pokud potřebujete něco najít, pak si stačí přečíst požadovanou rubriku. Každý soubor navíc ukládá rozsah hodnot ve sloupci, takže pokud hodnota, kterou hledáte, není v rozsahu sloupce, Spark nebude ztrácet čas skenováním celého souboru.

Provedl jsem jednoduchý úkol Lepidlo AWS převést naše TSV na parkety a uložit nové soubory do Atheny. Trvalo to asi 5 hodin. Ale když jsem požadavek spustil, jeho dokončení trvalo přibližně stejně dlouho a o něco méně peněz. Faktem je, že Spark ve snaze optimalizovat úlohu jednoduše rozbalil jeden kus TSV a vložil ho do vlastního kusu parket. A protože každý blok byl dostatečně velký, aby obsahoval celé záznamy mnoha lidí, každý soubor obsahoval všechny SNP, takže Spark musel otevřít všechny soubory, aby extrahoval informace, které potřeboval.

Zajímavé je, že výchozí (a doporučený) typ komprese Parquet, snappy, nelze rozdělit. Každý exekutor se proto zasekl u úkolu rozbalit a stáhnout celý datový soubor o velikosti 3,5 GB.

Analýza 25 TB pomocí AWK a R

Pojďme pochopit problém

Co jsem se naučil: Třídění je obtížné, zvláště pokud jsou data distribuována.

Zdálo se mi, že nyní chápu podstatu problému. Potřeboval jsem pouze seřadit data podle sloupce SNP, ne podle lidí. Poté bude několik SNP uloženo v samostatném datovém bloku a poté se Parquetova „chytrá“ funkce „otevře, pouze pokud je hodnota v rozsahu“ ukáže v celé své kráse. Bohužel se ukázalo, že třídění miliard řádků roztroušených po shluku je obtížný úkol.

AWS rozhodně nechce vrátit peníze z důvodu „Jsem roztržitý student“. Poté, co jsem spustil třídění na Amazon Glue, běželo to 2 dny a spadlo.

A co rozdělení?

Co jsem se naučil: Oddíly ve Sparku musí být vyvážené.

Pak jsem přišel s myšlenkou rozdělení dat v chromozomech. Je jich 23 (a několik dalších, pokud vezmete v úvahu mitochondriální DNA a nezmapované oblasti).
To vám umožní rozdělit data na menší části. Pokud přidáte pouze jeden řádek do funkce exportu Spark ve skriptu Glue partition_by = "chr", pak by měla být data rozdělena do segmentů.

Analýza 25 TB pomocí AWK a R
Genom se skládá z mnoha fragmentů nazývaných chromozomy.

Bohužel to nevyšlo. Chromozomy mají různé velikosti, což znamená různé množství informací. To znamená, že úkoly, které Spark poslal pracovníkům, nebyly vyvážené a dokončeny pomalu, protože některé uzly skončily brzy a byly nečinné. Úkoly však byly splněny. Ale při žádosti o jedno SNP nevyváženost opět způsobila problémy. Náklady na zpracování SNP na větších chromozomech (tedy tam, kde chceme získat data) se snížily jen asi 10krát. Hodně, ale málo.

Co když to rozdělíme na ještě menší části?

Co jsem se naučil: Nikdy se nepokoušejte udělat 2,5 milionu oddílů.

Rozhodl jsem se jít do toho a rozdělit každé SNP. Tím bylo zajištěno, že oddíly mají stejnou velikost. BYL TO ŠPATNÝ NÁPAD. Použil jsem Lepidlo a přidal nevinnou linku partition_by = 'snp'. Úkol začal a začal se plnit. O den později jsem zkontroloval a zjistil, že do S3 stále není nic zapsáno, tak jsem úkol zabil. Vypadá to, že Glue zapisoval přechodné soubory do skrytého umístění v S3, hodně souborů, možná několik milionů. Výsledkem bylo, že moje chyba stála více než tisíc dolarů a nepotěšila mého mentora.

Rozdělení + třídění

Co jsem se naučil: Řazení je stále obtížné, stejně jako ladění Sparku.

Můj poslední pokus o rozdělení zahrnoval rozdělení chromozomů a následné třídění každého oddílu. Teoreticky by to urychlilo každý dotaz, protože požadovaná data SNP musela být v rámci několika Parquetových kousků v daném rozsahu. Bohužel, třídění i rozdělených dat se ukázalo jako obtížný úkol. V důsledku toho jsem přešel na EMR pro vlastní cluster a použil osm výkonných instancí (C5.4xl) a Sparklyr k vytvoření flexibilnějšího pracovního postupu...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...úkol však stále nebyl splněn. Nakonfiguroval jsem to různými způsoby: zvýšil alokaci paměti pro každého vykonavatele dotazu, použil uzly s velkým množstvím paměti, použil proměnné vysílání (proměnné vysílání), ale pokaždé se ukázalo, že jde o poloviční opatření, a postupně začali vykonavatelé selhat, dokud se vše nezastaví.

Jsem stále kreativnější

Co jsem se naučil: Někdy speciální data vyžadují speciální řešení.

Každý SNP má hodnotu pozice. Toto je číslo odpovídající počtu bází podél jeho chromozomu. Je to pěkný a přirozený způsob, jak organizovat naše data. Nejprve jsem chtěl rozdělit podle oblastí každého chromozomu. Například pozice 1 – 2000, 2001 – 4000 atd. Problém je však v tom, že SNP nejsou rovnoměrně distribuovány napříč chromozomy, takže velikosti skupin se proto budou velmi lišit.

Analýza 25 TB pomocí AWK a R

Tím jsem došel k rozdělení pozic do kategorií (rank). Pomocí již stažených dat jsem spustil požadavek na získání seznamu unikátních SNP, jejich pozic a chromozomů. Poté jsem seřadil data v rámci každého chromozomu a shromáždil SNP do skupin (bin) dané velikosti. Řekněme 1000 SNP každý. To mi dalo vztah SNP-to-group-per-chromozom.

Nakonec jsem vytvořil skupiny (bin) po 75 SNP, důvod bude vysvětlen níže.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Nejprve zkuste se Sparkem

Co jsem se naučil: Agregace Spark je rychlá, ale rozdělení je stále drahé.

Chtěl jsem tento malý (2,5 milionu řádků) datový rámec načíst do Sparku, zkombinovat jej s nezpracovanými daty a poté rozdělit podle nově přidaného sloupce bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Použil jsem sdf_broadcast(), takže Spark ví, že by měl poslat datový rámec všem uzlům. To je užitečné, pokud jsou data malá a jsou vyžadována pro všechny úlohy. Jinak se Spark snaží být chytrý a distribuuje data podle potřeby, což může způsobit zpomalení.

A můj nápad opět nefungoval: úkoly nějakou dobu fungovaly, dokončily sjednocení a pak, stejně jako vykonavatelé spuštění rozdělením, začali selhávat.

Přidání AWK

Co jsem se naučil: Nespěte, když vás učí základy. Váš problém už jistě někdo řešil v 1980. letech.

Až do této chvíle byla důvodem všech mých selhání se Sparkem změť dat v clusteru. Možná se dá situace zlepšit předléčením. Rozhodl jsem se zkusit rozdělit nezpracovaná textová data do sloupců chromozomů, takže jsem doufal, že poskytnu Sparkovi „předrozdělená“ data.

Hledal jsem na StackOverflow, jak rozdělit podle hodnot sloupců a našel taková skvělá odpověď. Pomocí AWK můžete rozdělit textový soubor podle hodnot sloupců tak, že jej zapíšete do skriptu, místo abyste výsledky posílali do stdout.

Napsal jsem Bash skript, abych to vyzkoušel. Stáhli jsme jeden ze zabalených souborů TSV a poté jej rozbalili pomocí gzip a odeslána na awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Fungovalo to!

Plnění jader

Co jsem se naučil: gnu parallel - je to kouzelná věc, měl by ji používat každý.

Odloučení bylo docela pomalé a když jsem začal htoppro kontrolu použití výkonné (a drahé) instance EC2 se ukázalo, že používám pouze jedno jádro a asi 200 MB paměti. Abychom problém vyřešili a nepřišli o spoustu peněz, museli jsme vymyslet, jak práci paralelizovat. Naštěstí v naprosto úžasné knize Data Science na příkazovém řádku Našel jsem kapitolu od Jerona Janssena o paralelizaci. Z toho jsem se dozvěděl o gnu parallel, velmi flexibilní metoda pro implementaci multithreadingu v Unixu.

Analýza 25 TB pomocí AWK a R
Když jsem spustil dělení pomocí nového procesu, bylo vše v pořádku, ale stále tu bylo úzké hrdlo - stahování objektů S3 na disk nebylo příliš rychlé a nebylo plně paralelizované. Abych to napravil, udělal jsem toto:

  1. Zjistil jsem, že je možné implementovat fázi stahování S3 přímo v pipeline, přičemž zcela odpadá mezisklad na disku. To znamená, že se mohu vyhnout zápisu nezpracovaných dat na disk a používat ještě menší, a tedy levnější úložiště na AWS.
  2. tým aws configure set default.s3.max_concurrent_requests 50 výrazně zvýšil počet vláken, která AWS CLI používá (ve výchozím nastavení je jich 10).
  3. Přešel jsem na instanci EC2 optimalizovanou pro rychlost sítě s písmenem n v názvu. Zjistil jsem, že ztráta výpočetního výkonu při použití n-instancí je více než kompenzována zvýšením rychlosti načítání. Pro většinu úloh jsem použil c5n.4xl.
  4. Změněno gzip na pigz, jedná se o nástroj gzip, který dokáže skvělé věci k paralelizaci původně neparalelizovaného úkolu dekomprese souborů (toto pomohlo nejméně).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Tyto kroky se vzájemně kombinují, aby vše fungovalo velmi rychle. Díky zvýšení rychlosti stahování a eliminaci zápisů na disk jsem nyní mohl zpracovat balíček o velikosti 5 terabajtů během několika hodin.

Tento tweet měl zmínit „TSV“. Běda.

Použití nově analyzovaných dat

Co jsem se naučil: Spark má rád nekomprimovaná data a nemá rád kombinování oddílů.

Nyní byla data v S3 v rozbaleném (čti: sdíleném) a poloobjednaném formátu a mohl jsem se znovu vrátit do Sparku. Čekalo mě překvapení: opět se mi nepodařilo dosáhnout toho, co jsem chtěl! Bylo velmi obtížné říci Sparkovi přesně, jak byla data rozdělena. A i když jsem to udělal, ukázalo se, že bylo příliš mnoho oddílů (95 tisíc), a když jsem použil coalesce snížil jejich počet na rozumné limity, zničilo to moje rozdělení. Jsem si jistý, že to lze opravit, ale po několika dnech hledání jsem nenašel řešení. Nakonec jsem dokončil všechny úkoly ve Sparku, i když to chvíli trvalo a moje rozdělené soubory Parquet nebyly příliš malé (~200 KB). Data však byla tam, kde byla potřeba.

Analýza 25 TB pomocí AWK a R
Příliš malé a nerovnoměrné, úžasné!

Testování místních dotazů Spark

Co jsem se naučil: Spark má příliš velkou režii při řešení jednoduchých problémů.

Stažením dat v chytrém formátu jsem mohl otestovat rychlost. Nastavte skript R pro spuštění místního serveru Spark a poté načtěte datový rámec Spark ze zadaného úložiště skupiny Parquet (přihrádky). Pokusil jsem se načíst všechna data, ale nepodařilo se mi přimět Sparklyr, aby rozpoznal rozdělení.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Provedení trvalo 29,415 sekund. Mnohem lepší, ale ne příliš dobré pro hromadné testování čehokoli. Navíc jsem nemohl urychlit věci pomocí ukládání do mezipaměti, protože když jsem se pokusil uložit datový rámec do mezipaměti, Spark vždy selhal, i když jsem přidělil více než 50 GB paměti datové sadě, která váží méně než 15.

Návrat na AWK

Co jsem se naučil: Asociativní pole v AWK jsou velmi efektivní.

Uvědomil jsem si, že mohu dosáhnout vyšších rychlostí. Vzpomněl jsem si na to úžasně AWK tutoriál od Bruce Barnetta Četl jsem o skvělé funkci s názvem „asociativní pole" V podstatě se jedná o páry klíč-hodnota, kterým se v AWK z nějakého důvodu říkalo jinak, a proto jsem o nich nějak moc nepřemýšlel. Roman Cheplyaka připomněl, že výraz „asociativní pole“ je mnohem starší než výraz „pár klíč-hodnota“. Jen pokud ty vyhledejte pár klíč–hodnota v Google Ngram, tento termín tam neuvidíte, ale najdete asociativní pole! Navíc je „pár klíč-hodnota“ nejčastěji spojován s databázemi, takže je mnohem smysluplnější jej porovnávat s hashmapou. Uvědomil jsem si, že mohu použít tato asociativní pole ke spojení mých SNP s tabulkou bin a nezpracovanými daty bez použití Spark.

K tomu jsem ve skriptu AWK použil blok BEGIN. Toto je část kódu, která se provede před předáním prvního řádku dat do hlavní části skriptu.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Tým while(getline...) načetl všechny řádky ze skupiny CSV (bin), nastavte první sloupec (název SNP) jako klíč pro asociativní pole bin a druhá hodnota (skupina) jako hodnota. Pak v bloku { }, který se provádí na všech řádcích hlavního souboru, je každý řádek odeslán do výstupního souboru, který obdrží jedinečný název v závislosti na své skupině (přihrádce): ..._bin_"bin[$1]"_....

Proměnné batch_num и chunk_id odpovídala datům poskytnutým kanálem, vyhnula se konfliktnímu stavu a každé běžící vlákno provádění parallel, zapsal do vlastního jedinečného souboru.

Vzhledem k tomu, že jsem rozházel všechna nezpracovaná data do složek na chromozomech, které zbyly z mého předchozího experimentu s AWK, nyní jsem mohl napsat další Bash skript, který zpracovává jeden chromozom po druhém a posílá hlouběji rozdělená data do S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scénář má dvě části parallel.

V první části se načtou data ze všech souborů obsahujících informace o požadovaném chromozomu, poté se tato data rozdělí mezi vlákna, která rozdělí soubory do příslušných skupin (bin). Aby se předešlo konfliktním podmínkám při zápisu více vláken do stejného souboru, předává AWK názvy souborů pro zápis dat na různá místa, např. chr_10_bin_52_batch_2_aa.csv. V důsledku toho se na disku vytváří mnoho malých souborů (k tomu jsem použil terabajtové svazky EBS).

Dopravník z druhé sekce parallel prochází skupiny (bin) a spojuje jejich jednotlivé soubory do společného CSV c cata poté je odešle na export.

Vysílání v R?

Co jsem se naučil: Můžete kontaktovat stdin и stdout z R skriptu, a proto jej použít v potrubí.

Možná jste si všimli tohoto řádku ve skriptu Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Přeloží všechny zřetězené soubory skupin (bin) do R skriptu níže. {} je speciální technika parallel, který vloží všechna data, která odešle do určeného streamu, přímo do samotného příkazu. Volba {#} poskytuje jedinečné ID vlákna a {%} představuje číslo pozice úlohy (opakuje se, ale nikdy současně). Seznam všech možností naleznete v dokumentace.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Když proměnná file("stdin") přenášeno na readr::read_csv, data přeložená do R skriptu se načtou do rámce, který je pak ve tvaru .rds- použití souboru aws.s3 zapsané přímo do S3.

RDS je něco jako juniorská verze Parquet, bez zbytečných skladů reproduktorů.

Po dokončení skriptu Bash jsem dostal balíček .rds-soubory umístěné v S3, což mi umožnilo používat efektivní kompresi a vestavěné typy.

I přes použití brzdy R vše fungovalo velmi rychle. Není překvapením, že části R, které čtou a zapisují data, jsou vysoce optimalizované. Po testování na jednom chromozomu střední velikosti byla úloha dokončena na instanci C5n.4xl přibližně za dvě hodiny.

Omezení S3

Co jsem se naučil: Díky implementaci chytré cesty dokáže S3 zpracovat mnoho souborů.

Bál jsem se, zda S3 zvládne to množství souborů, které do něj byly přeneseny. Mohl bych dát názvům souborů smysl, ale jak by je S3 hledal?

Analýza 25 TB pomocí AWK a R
Složky v S3 jsou jen pro parádu, ve skutečnosti systém ten symbol nezajímá /. Ze stránky S3 FAQ.

Zdá se, že S3 představuje cestu ke konkrétnímu souboru jako jednoduchý klíč v jakési hashovací tabulce nebo databázi založené na dokumentech. Kbelík lze považovat za tabulku a soubory lze považovat za záznamy v této tabulce.

Vzhledem k tomu, že rychlost a efektivita jsou důležité pro dosažení zisku v Amazonu, není žádným překvapením, že tento systém cesty klíče jako souboru je zatraceně optimalizovaný. Snažil jsem se najít rovnováhu: abych nemusel zadávat mnoho požadavků, ale aby byly požadavky rychle vyřízeny. Ukázalo se, že nejlepší je udělat asi 20 tisíc bin souborů. Myslím, že pokud budeme pokračovat v optimalizaci, můžeme dosáhnout zvýšení rychlosti (například vytvořením speciálního bucketu jen pro data, čímž se sníží velikost vyhledávací tabulky). Na další experimenty ale nebyl čas ani peníze.

A co křížová kompatibilita?

Co jsem se naučil: Hlavní příčinou plýtvání časem je předčasná optimalizace způsobu ukládání.

V tomto bodě je velmi důležité položit si otázku: „Proč používat proprietární formát souboru? Důvodem je rychlost načítání (soubory CSV se načítají 7krát déle) a kompatibilita s našimi pracovními postupy. Mohu znovu zvážit, zda R může snadno načíst soubory Parquet (nebo Arrow) bez načtení Spark. Každý v naší laboratoři používá R, a pokud potřebuji převést data do jiného formátu, stále mám původní textová data, takže mohu znovu spustit pipeline.

Dělba práce

Co jsem se naučil: Nepokoušejte se optimalizovat úlohy ručně, nechte to udělat počítač.

Odladil jsem pracovní postup na jednom chromozomu, nyní potřebuji zpracovat všechna ostatní data.
Chtěl jsem zvýšit několik instancí EC2 pro konverzi, ale zároveň jsem se obával, že dostanu velmi nevyvážené zatížení napříč různými úlohami zpracování (stejně jako Spark trpěl nevyváženými oddíly). Navíc mě nezajímalo zvýšení jedné instance na chromozom, protože pro účty AWS je výchozí limit 10 instancí.

Pak jsem se rozhodl napsat skript v R, abych optimalizoval úlohy zpracování.

Nejprve jsem požádal S3, aby vypočítal, kolik úložného prostoru zabíral každý chromozom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Pak jsem napsal funkci, která vezme celkovou velikost, zamíchá pořadí chromozomů a rozdělí je do skupin num_jobs a řekne vám, jak různé jsou velikosti všech úloh zpracování.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Pak jsem prošel tisíc shufflí pomocí purrr a vybral to nejlepší.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Takže jsem skončil u sady úkolů, které byly co do velikosti velmi podobné. Pak už jen zbývalo zabalit můj předchozí Bash skript do velké smyčky for. Psaní této optimalizace trvalo asi 10 minut. A to je mnohem méně, než bych utratil za ruční vytváření úkolů, pokud by byly nevyvážené. Proto si myslím, že jsem měl s touto předběžnou optimalizací pravdu.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na konec přidám příkaz k vypnutí:

sudo shutdown -h now

...a všechno klaplo! Pomocí AWS CLI jsem vyvolal instance pomocí možnosti user_data dal jim ke zpracování Bash skripty jejich úkolů. Běžely a vypínaly se automaticky, takže jsem neplatil za extra výkon.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pojďme balit!

Co jsem se naučil: API by mělo být jednoduché kvůli snadnosti a flexibilitě použití.

Konečně jsem dostal data na správné místo a ve formě. Zbývalo jen co nejvíce zjednodušit proces používání dat, aby to bylo pro kolegy jednodušší. Chtěl jsem vytvořit jednoduché API pro vytváření požadavků. Pokud se v budoucnu rozhodnu přejít z .rds na parkety, pak by to měl být problém pro mě, ne pro mé kolegy. K tomu jsem se rozhodl udělat interní R balíček.

Sestavte a zdokumentujte velmi jednoduchý balíček obsahující jen několik funkcí pro přístup k datům organizovaných kolem funkce get_snp. Udělal jsem i web pro své kolegy pkgdown, takže mohou snadno vidět příklady a dokumentaci.

Analýza 25 TB pomocí AWK a R

Inteligentní ukládání do mezipaměti

Co jsem se naučil: Pokud jsou vaše data dobře připravena, bude ukládání do mezipaměti snadné!

Protože jeden z hlavních pracovních postupů aplikoval stejný model analýzy na balíček SNP, rozhodl jsem se využít binning ve svůj prospěch. Při přenosu dat přes SNP jsou všechny informace ze skupiny (přihrádky) připojeny k vrácenému objektu. To znamená, že staré dotazy mohou (teoreticky) urychlit zpracování nových dotazů.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Při sestavování balíčku jsem spustil mnoho benchmarků, abych porovnal rychlost při použití různých metod. Doporučuji to nezanedbávat, protože někdy jsou výsledky nečekané. Například, dplyr::filter bylo mnohem rychlejší než zachycení řádků pomocí filtrování založeného na indexování a načtení jednoho sloupce z filtrovaného datového rámce bylo mnohem rychlejší než použití syntaxe indexování.

Vezměte prosím na vědomí, že objekt prev_snp_results obsahuje klíč snps_in_bin. Toto je pole všech jedinečných SNP ve skupině (bin), které vám umožní rychle zkontrolovat, zda již máte data z předchozího dotazu. Také to usnadňuje procházení všech SNP ve skupině (bin) s tímto kódem:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

výsledky

Nyní můžeme (a začali jsme vážně) provozovat modely a scénáře, které nám byly dříve nedostupné. Nejlepší je, že moji kolegové z laboratoře nemusí myslet na žádné komplikace. Mají jen funkci, která funguje.

A přestože je balíček šetří detailů, snažil jsem se, aby byl formát dat dostatečně jednoduchý, aby na to přišli, kdybych zítra náhle zmizel...

Rychlost se znatelně zvýšila. Obvykle skenujeme funkčně významné fragmenty genomu. Dříve jsme to nemohli udělat (ukázalo se to příliš drahé), ale nyní díky skupinové (přihrádkové) struktuře a ukládání do mezipaměti trvá požadavek na jeden SNP v průměru méně než 0,1 sekundy a spotřeba dat je taková nízké, že náklady na S3 jsou arašídy.

Závěr

Tento článek není vůbec návodem. Řešení se ukázalo jako individuální a téměř jistě ne optimální. Spíše je to cestopis. Chci, aby ostatní pochopili, že taková rozhodnutí se v hlavě nejeví úplně zformovaná, jsou výsledkem pokusů a omylů. Také pokud hledáte datového vědce, mějte na paměti, že efektivní používání těchto nástrojů vyžaduje zkušenosti a zkušenosti stojí peníze. Jsem rád, že jsem měl prostředky na zaplacení, ale mnoho dalších, kteří dokážou dělat stejnou práci lépe než já, nebude mít kvůli nedostatku peněz příležitost to ani zkusit.

Nástroje pro velká data jsou všestranné. Pokud máte čas, můžete téměř jistě napsat rychlejší řešení pomocí inteligentních technik čištění, ukládání a extrakce dat. Nakonec jde o analýzu nákladů a přínosů.

Co jsem se naučil:

  • neexistuje žádný levný způsob, jak analyzovat 25 TB najednou;
  • buďte opatrní s velikostí vašich parketových souborů a jejich uspořádáním;
  • Oddíly ve Sparku musí být vyvážené;
  • Obecně se nikdy nepokoušejte vytvořit 2,5 milionu oddílů;
  • Řazení je stále obtížné, stejně jako nastavení Sparku;
  • někdy speciální data vyžadují speciální řešení;
  • Agregace jisker je rychlá, ale rozdělení je stále drahé;
  • nespěte, když vás učí základy, váš problém už pravděpodobně někdo vyřešil v 1980. letech;
  • gnu parallel - to je kouzelná věc, měl by ji používat každý;
  • Spark má rád nekomprimovaná data a nemá rád kombinování oddílů;
  • Spark má příliš velkou režii při řešení jednoduchých problémů;
  • Asociativní pole AWK jsou velmi efektivní;
  • můžete kontaktovat stdin и stdout z R skriptu, a proto jej použít v potrubí;
  • Díky implementaci chytré cesty může S3 zpracovat mnoho souborů;
  • Hlavním důvodem plýtvání časem je předčasná optimalizace způsobu skladování;
  • nesnažte se optimalizovat úkoly ručně, nechte to udělat počítač;
  • API by mělo být jednoduché z důvodu snadnosti a flexibility použití;
  • Pokud jsou vaše data dobře připravena, bude ukládání do mezipaměti snadné!

Zdroj: www.habr.com

Přidat komentář