Ontleding van 25 TB met AWK en R

Ontleding van 25 TB met AWK en R
Hoe om hierdie artikel te lees: Ek vra om verskoning dat die teks so lank en chaoties is. Om jou tyd te bespaar, begin ek elke hoofstuk met 'n "Wat ek geleer het" inleiding, wat die essensie van die hoofstuk in een of twee sinne opsom.

"Wys my net die oplossing!" As jy net wil sien waar ek vandaan kom, slaan dan oor na die hoofstuk "Becoming More Inventive", maar ek dink dit is interessanter en nuttiger om oor mislukking te lees.

Ek is onlangs getaak om 'n proses op te stel vir die verwerking van 'n groot volume rou DNS-volgordes (tegnies 'n SNP-skyfie). Die behoefte was om vinnig data te verkry oor 'n gegewe genetiese ligging (genoem 'n SNP) vir daaropvolgende modellering en ander take. Deur R en AWK te gebruik, kon ek data op 'n natuurlike manier skoonmaak en organiseer, wat die verwerking van navrae aansienlik versnel het. Dit was nie vir my maklik nie en het talle herhalings vereis. Hierdie artikel sal jou help om sommige van my foute te vermy en jou te wys waarmee ek geëindig het.

Eerstens 'n paar inleidende verduidelikings.

Data

Ons universiteit se genetiese inligtingverwerkingsentrum het ons van data voorsien in die vorm van 'n 25 TB TSV. Ek het hulle opgedeel in 5 Gzip-saamgeperste pakkette ontvang, wat elkeen ongeveer 240 vier-gigagreep-lêers bevat. Elke ry het data vir een SNP van een individu bevat. In totaal is data oor ~2,5 miljoen SNP's en ~60 duisend mense oorgedra. Benewens SNP-inligting, bevat die lêers talle kolomme met getalle wat verskeie kenmerke weerspieël, soos leesintensiteit, frekwensie van verskillende allele, ens. In totaal was daar ongeveer 30 kolomme met unieke waardes.

Doel

Soos met enige databestuursprojek, was die belangrikste ding om te bepaal hoe die data gebruik sou word. In hierdie geval ons sal meestal modelle en werkvloeie vir SNP kies gebaseer op SNP. Dit wil sê, ons sal slegs data op een SNP op 'n slag nodig hê. Ek moes leer hoe om al die rekords wat met een van die 2,5 miljoen SNP'e geassosieer word so maklik, vinnig en goedkoop moontlik te herwin.

Hoe om dit nie te doen nie

Om 'n geskikte cliché aan te haal:

Ek het nie 'n duisend keer misluk nie, ek het net 'n duisend maniere ontdek om te verhoed dat 'n klomp data in 'n navraagvriendelike formaat ontleed word.

Eerste probeer

Wat het ek geleer: Daar is geen goedkoop manier om 25 TB op 'n slag te ontleed nie.

Nadat ek die kursus “Gevorderde metodes vir verwerking van groot data” by Vanderbilt Universiteit geneem het, was ek seker dat die truuk in die sak was. Dit sal waarskynlik 'n uur of twee neem om die Hive-bediener op te stel om deur al die data te loop en die resultaat te rapporteer. Aangesien ons data in AWS S3 gestoor word, het ek die diens gebruik Athena, wat jou toelaat om Hive SQL-navrae op S3-data toe te pas. Jy hoef nie 'n Hive-kluster op te stel/in te samel nie, en jy betaal ook net vir die data waarna jy soek.

Nadat ek my data en die formaat aan Athena gewys het, het ek 'n paar toetse uitgevoer met navrae soos hierdie:

select * from intensityData limit 10;

En vinnig goed gestruktureerde resultate ontvang. Gereed.

Totdat ons probeer het om die data in ons werk te gebruik...

Ek is gevra om al die SNP-inligting uit te haal om die model op te toets. Ek het die navraag gedoen:


select * from intensityData 
where snp = 'rs123456';

...en begin wag. Na agt minute en meer as 4 TB se aangevraagde data het ek die resultaat ontvang. Athena vra volgens die volume data wat gevind is, $5 per teragreep. So hierdie enkele versoek kos $20 en agt minute se wag. Om die model op al die data te laat loop, moes ons 38 jaar wag en $50 miljoen betaal.Dit was natuurlik nie geskik vir ons nie.

Dit was nodig om Parket te gebruik...

Wat het ek geleer: Wees versigtig met die grootte van jou Parket-lêers en hul organisasie.

Ek het eers probeer om die situasie reg te stel deur alle TSV's om te skakel na Parket lêers. Hulle is gerieflik om met groot datastelle te werk omdat die inligting daarin in kolomvorm gestoor word: elke kolom lê in sy eie geheue/skyfsegment, in teenstelling met tekslêers, waarin rye elemente van elke kolom bevat. En as jy iets moet vind, lees dan net die vereiste kolom. Daarbenewens stoor elke lêer 'n reeks waardes in 'n kolom, so as die waarde waarna u soek nie in die kolom se reeks is nie, sal Spark nie tyd mors om die hele lêer te skandeer nie.

Ek het 'n eenvoudige taak uitgevoer AWS-gom om ons TSV's na Parket te omskep en die nuwe lêers in Athena laat val. Dit het ongeveer 5 ure geneem. Maar toe ek die versoek uitgevoer het, het dit omtrent dieselfde tyd en 'n bietjie minder geld geneem om te voltooi. Die feit is dat Spark, wat probeer het om die taak te optimaliseer, eenvoudig een TSV-stuk uitgepak het en dit in sy eie Parket-stuk gesit het. En omdat elke stuk groot genoeg was om die hele rekords van baie mense te bevat, het elke lêer al die SNP's bevat, so Spark moes al die lêers oopmaak om die nodige inligting te onttrek.

Interessant genoeg is Parquet se verstek (en aanbevole) kompressietipe, snappy, nie verdeelbaar nie. Daarom was elke eksekuteur vas met die taak om die volle 3,5 GB-datastel uit te pak en af ​​te laai.

Ontleding van 25 TB met AWK en R

Kom ons verstaan ​​die probleem

Wat het ek geleer: Sorteer is moeilik, veral as die data versprei word.

Dit het vir my gelyk asof ek nou die essensie van die probleem verstaan. Ek moes net die data volgens SNP-kolom sorteer, nie volgens mense nie. Dan sal verskeie SNP's in 'n aparte data-stuk gestoor word, en dan sal Parquet se "slim" funksie "maak net oop as die waarde in die reeks is" homself in al sy glorie vertoon. Ongelukkig was dit 'n moeilike taak om deur miljarde rye wat oor 'n groep versprei is te sorteer.

AWS wil beslis nie 'n terugbetaling uitreik nie weens die "Ek is 'n afgelei student" rede. Nadat ek op Amazon Glue gesorteer het, het dit vir 2 dae gehardloop en neergestort.

Wat van partisionering?

Wat het ek geleer: Afskortings in Spark moet gebalanseerd wees.

Toe het ek die idee gekry om data in chromosome te verdeel. Daar is 23 van hulle (en verskeie meer as jy mitochondriale DNA en ongekarteerde streke in ag neem).
Dit sal jou toelaat om die data in kleiner stukke te verdeel. As jy net een reël by die Spark-uitvoerfunksie in die Glue script voeg partition_by = "chr", dan moet die data in emmers verdeel word.

Ontleding van 25 TB met AWK en R
Die genoom bestaan ​​uit talle fragmente wat chromosome genoem word.

Ongelukkig het dit nie gewerk nie. Chromosome het verskillende groottes, wat verskillende hoeveelhede inligting beteken. Dit beteken dat die take wat Spark aan werkers gestuur het, nie gebalanseerd en stadig afgehandel is nie omdat sommige nodusse vroeg klaar was en ledig was. Die take is egter afgehandel. Maar toe daar vir een SNP gevra is, het die wanbalans weer probleme veroorsaak. Die koste van die verwerking van SNP's op groter chromosome (dit wil sê waar ons data wil kry) het net met ongeveer 'n faktor van 10 afgeneem. Baie, maar nie genoeg nie.

Wat as ons dit in nog kleiner dele verdeel?

Wat het ek geleer: Moet glad nie probeer om 2,5 miljoen partisies te doen nie.

Ek het besluit om alles te doen en elke SNP te verdeel. Dit het verseker dat die afskortings ewe groot was. DIT WAS 'N SLEGTE IDEE. Ek het Glue gebruik en 'n onskuldige lyn bygevoeg partition_by = 'snp'. Die taak het begin en begin uitvoer. 'n Dag later het ek nagegaan en gesien dat daar nog niks aan S3 geskryf is nie, so ek het die taak doodgemaak. Dit lyk asof Glue tussenlêers na 'n versteekte plek in S3 geskryf het, baie lêers, miskien 'n paar miljoen. Gevolglik het my fout meer as 'n duisend dollar gekos en nie my mentor behaag nie.

Partisionering + sortering

Wat het ek geleer: Sorteer is steeds moeilik, net soos om Spark te stem.

My laaste poging om te partisioneer het behels dat ek die chromosome partisieer en dan elke partisie sorteer. In teorie sou dit elke navraag versnel omdat die verlangde SNP-data binne 'n paar Parket-stukke binne 'n gegewe reeks moes wees. Ongelukkig was dit 'n moeilike taak om selfs gepartisioneerde data te sorteer. Gevolglik het ek oorgeskakel na EMR vir 'n pasgemaakte groepering en agt kragtige gevalle (C5.4xl) en Sparklir gebruik om 'n meer buigsame werkvloei te skep ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...die taak was egter steeds nie afgehandel nie. Ek het dit op verskillende maniere gekonfigureer: het die geheuetoewysing vir elke navraaguitvoerder verhoog, nodusse met 'n groot hoeveelheid geheue gebruik, uitsaaiveranderlikes (uitsaaiveranderlikes) gebruik, maar elke keer het dit halwe maatreëls geblyk te wees, en geleidelik het die eksekuteurs begin om te misluk totdat alles gestop het.

Ek raak meer kreatief

Wat het ek geleer: Soms vereis spesiale data spesiale oplossings.

Elke SNP het 'n posisiewaarde. Dit is 'n getal wat ooreenstem met die aantal basisse langs sy chromosoom. Dit is 'n lekker en natuurlike manier om ons data te organiseer. Ek wou eers volgens streke van elke chromosoom verdeel. Byvoorbeeld, posisies 1 - 2000, 2001 - 4000, ens. Maar die probleem is dat SNP's nie eweredig oor die chromosome versprei is nie, so die groepgroottes sal dus baie verskil.

Ontleding van 25 TB met AWK en R

Gevolglik het ek tot 'n uiteensetting van posisies in kategorieë (rang) gekom. Deur die reeds afgelaaide data te gebruik, het ek 'n versoek gedoen om 'n lys van unieke SNP's, hul posisies en chromosome te bekom. Toe het ek die data binne elke chromosoom gesorteer en SNP's in groepe (bin) van 'n gegewe grootte versamel. Kom ons sê 1000 SNP's elk. Dit het my die SNP-tot-groep-per-chromosoom-verhouding gegee.

Op die ou end het ek groepe (binne) van 75 SNP's gemaak, die rede sal hieronder verduidelik word.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Probeer eers met Spark

Wat het ek geleer: Vonk-aggregasie is vinnig, maar partisieering is steeds duur.

Ek wou hierdie klein (2,5 miljoen rye) dataraampie in Spark lees, dit met die rou data kombineer en dit dan verdeel deur die nuut bygevoegde kolom bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ek het gebruik sdf_broadcast(), so Spark weet dat dit die dataraamwerk na alle nodusse moet stuur. Dit is nuttig as die data klein is en vir alle take benodig word. Andersins probeer Spark slim wees en versprei data soos nodig, wat verlangsamings kan veroorsaak.

En weer, my idee het nie gewerk nie: die take het vir 'n geruime tyd gewerk, die unie voltooi, en toe, soos die eksekuteurs wat deur partisionering van stapel gestuur is, het hulle begin misluk.

Voeg AWK by

Wat het ek geleer: Moenie slaap as jy die basiese beginsels geleer word nie. Iemand het sekerlik reeds in die 1980's jou probleem opgelos.

Tot op hierdie stadium was die rede vir al my mislukkings met Spark die mengelmoes van data in die groepie. Miskien kan die situasie verbeter word met voorafbehandeling. Ek het besluit om te probeer om die rou teksdata in kolomme van chromosome te verdeel, so ek het gehoop om Spark van "vooraf verdeelde" data te voorsien.

Ek het op StackOverflow gesoek na hoe om volgens kolomwaardes te verdeel en gevind so 'n wonderlike antwoord. Met AWK kan u 'n tekslêer volgens kolomwaardes verdeel deur dit in 'n skrif te skryf eerder as om die resultate na stdout.

Ek het 'n Bash-skrif geskryf om dit te probeer. Het een van die verpakte TSV's afgelaai en dit dan uitgepak met behulp van gzip en gestuur word aan awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Dit het gewerk!

Vul die kerne

Wat het ek geleer: gnu parallel - dit is 'n magiese ding, almal moet dit gebruik.

Die skeiding was redelik stadig en toe ek begin het htopom die gebruik van 'n kragtige (en duur) EC2-instansie na te gaan, het dit geblyk dat ek slegs een kern en ongeveer 200 MB geheue gebruik. Om die probleem op te los en nie baie geld te verloor nie, moes ons uitvind hoe om die werk te paralleliseer. Gelukkig in 'n absoluut wonderlike boek Datawetenskap by die opdraglyn Ek het 'n hoofstuk deur Jeron Janssens oor parallelisering gevind. Daaruit het ek geleer oor gnu parallel, 'n baie buigsame metode vir die implementering van multithreading in Unix.

Ontleding van 25 TB met AWK en R
Toe ek die partisionering met die nuwe proses begin het, was alles goed, maar daar was steeds 'n bottelnek - die aflaai van S3-voorwerpe na skyf was nie baie vinnig nie en nie ten volle geparalleliseer nie. Om dit reg te stel, het ek dit gedoen:

  1. Ek het uitgevind dat dit moontlik is om die S3-aflaaistadium direk in die pyplyn te implementeer, wat intermediêre berging op skyf heeltemal uitskakel. Dit beteken ek kan vermy om rou data na skyf te skryf en selfs kleiner, en dus goedkoper, berging op AWS te gebruik.
  2. span aws configure set default.s3.max_concurrent_requests 50 het die aantal drade wat AWS CLI gebruik aansienlik verhoog (by verstek is daar 10).
  3. Ek het oorgeskakel na 'n EC2-instansie wat vir netwerkspoed geoptimaliseer is, met die letter n in die naam. Ek het gevind dat die verlies aan verwerkingskrag by die gebruik van n-gevalle meer as vergoed word deur die toename in laaispoed. Vir die meeste take het ek c5n.4xl gebruik.
  4. Verander gzip op pigz, dit is 'n gzip-instrument wat oulike dinge kan doen om die aanvanklik nie-geparalleliseerde taak om lêers te dekomprimeer paralleliseer (dit het die minste gehelp).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Hierdie stappe word met mekaar gekombineer om alles baie vinnig te laat werk. Deur aflaaispoed te verhoog en skyfskryf uit te skakel, kon ek nou 'n 5 teragreep-pakket binne 'n paar uur verwerk.

Hierdie twiet moes 'TSV' genoem het. Helaas.

Gebruik nuut ontleed data

Wat het ek geleer: Spark hou van ongecomprimeerde data en hou nie daarvan om partisies te kombineer nie.

Nou was die data in S3 in 'n uitgepakte (lees: gedeelde) en semi-geordende formaat, en ek kon weer na Spark terugkeer. ’n Verrassing het op my gewag: ek het weer nie daarin geslaag om te bereik wat ek wou hê nie! Dit was baie moeilik om vir Spark presies te sê hoe die data verdeel is. En selfs toe ek dit gedoen het, het dit geblyk dat daar te veel partisies was (95 duisend), en toe ek dit gebruik het coalesce hul getal tot redelike perke verminder het, het dit my partisie vernietig. Ek is seker dit kan reggestel word, maar na 'n paar dae se soektog kon ek nie 'n oplossing vind nie. Ek het uiteindelik al die take in Spark voltooi, hoewel dit 'n rukkie geneem het en my gesplete Parket-lêers nie baie klein was nie (~200 KB). Die data was egter waar dit nodig was.

Ontleding van 25 TB met AWK en R
Te klein en ongelyk, wonderlik!

Toets plaaslike Spark-navrae

Wat het ek geleer: Spark het te veel oorhoofse koste wanneer eenvoudige probleme opgelos word.

Deur die data in 'n slim formaat af te laai, kon ek die spoed toets. Stel 'n R-skrip op om 'n plaaslike Spark-bediener te laat loop, en laai dan 'n Spark-dataraam van die gespesifiseerde Parquet-groepberging (bak). Ek het probeer om al die data te laai, maar kon nie Sparklir kry om die partisionering te herken nie.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Die teregstelling het 29,415 sekondes geneem. Baie beter, maar nie te goed vir massatoetsing van enigiets nie. Boonop kon ek nie dinge bespoedig met cache nie, want wanneer ek probeer het om 'n dataraam in die geheue te kas, het Spark altyd neergestort, selfs wanneer ek meer as 50 GB geheue toegewys het aan 'n datastel wat minder as 15 weeg.

Keer terug na AWK

Wat het ek geleer: Assosiatiewe skikkings in AWK is baie doeltreffend.

Ek het besef dat ek hoër spoed kan bereik. Ek het dit in 'n wonderlike onthou AWK-tutoriaal deur Bruce Barnett Ek het gelees van 'n oulike kenmerk genaamd "assosiatiewe skikkings" In wese is dit sleutel-waarde-pare, wat om een ​​of ander rede anders in AWK genoem is, en daarom het ek op een of ander manier nie veel daaroor gedink nie. Roman Cheplyaka onthou dat die term "assosiatiewe skikkings" baie ouer is as die term "sleutel-waarde-paar". Selfs as jy soek die sleutel-waarde in Google Ngram op, jy sal nie hierdie term daar sien nie, maar jy sal assosiatiewe skikkings vind! Daarbenewens word die "sleutel-waarde-paar" meestal met databasisse geassosieer, so dit maak baie meer sin om dit met 'n hashmap te vergelyk. Ek het besef dat ek hierdie assosiatiewe skikkings kan gebruik om my SNP's met 'n bin-tabel en rou data te assosieer sonder om Spark te gebruik.

Om dit te doen, het ek die blok in die AWK-skrif gebruik BEGIN. Dit is 'n stukkie kode wat uitgevoer word voordat die eerste reël data na die hoofliggaam van die skrif oorgedra word.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Span while(getline...) het alle rye van die CSV-groep (bin) gelaai, stel die eerste kolom (SNP-naam) as die sleutel vir die assosiatiewe skikking bin en die tweede waarde (groep) as die waarde. Toe in die blok { }, wat op alle reëls van die hooflêer uitgevoer word, word elke reël na die uitvoerlêer gestuur, wat 'n unieke naam ontvang afhangende van die groep (bin): ..._bin_"bin[$1]"_....

veranderlikes batch_num и chunk_id ooreenstem met die data wat deur die pyplyn verskaf word, om 'n wedlooptoestand te vermy, en elke uitvoeringsdraad wat loop parallel, geskryf na sy eie unieke lêer.

Aangesien ek al die rou data in dopgehou het op chromosome wat oorgebly het van my vorige eksperiment met AWK, kon ek nou nog 'n Bash-skrif skryf om een ​​chromosoom op 'n slag te verwerk en dieper gepartisioneerde data na S3 te stuur.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Die draaiboek het twee afdelings parallel.

In die eerste afdeling word data gelees van alle lêers wat inligting oor die verlangde chromosoom bevat, dan word hierdie data oor drade versprei, wat die lêers in die toepaslike groepe (bin) versprei. Om rastoestande te vermy wanneer verskeie drade na dieselfde lêer skryf, gee AWK die lêername deur om data na verskillende plekke te skryf, bv. chr_10_bin_52_batch_2_aa.csv. As gevolg hiervan word baie klein lêers op die skyf geskep (hiervoor het ek terabyte EBS-volumes gebruik).

Vervoerband van die tweede afdeling parallel gaan deur die groepe (bin) en kombineer hul individuele lêers in algemene CSV c caten stuur dit dan vir uitvoer.

Uitsaai in R?

Wat het ek geleer: Jy kan kontak stdin и stdout vanaf 'n R-skrip, en gebruik dit dus in die pyplyn.

Jy het dalk hierdie reël in jou Bash-skrif opgemerk: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Dit vertaal alle aaneengeskakelde groeplêers (bin) in die R-skrif hieronder. {} is 'n spesiale tegniek parallel, wat enige data wat dit na die gespesifiseerde stroom stuur direk in die opdrag self invoeg. Opsie {#} bied 'n unieke draad-ID, en {%} verteenwoordig die werkgleufnommer (herhaal, maar nooit gelyktydig nie). 'n Lys van alle opsies kan gevind word in dokumentasie.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Wanneer 'n veranderlike file("stdin") oorgedra word aan readr::read_csv, word die data wat in die R-skrip vertaal is, in 'n raam gelaai, wat dan in die vorm is .rds-lêer gebruik aws.s3 direk na S3 geskryf.

RDS is iets soos 'n junior weergawe van Parket, sonder die fieterjasies van luidsprekerberging.

Nadat ek die Bash-skrif voltooi het, het ek 'n bundel gekry .rds-lêers geleë in S3, wat my toegelaat het om doeltreffende kompressie en ingeboude tipes te gebruik.

Ten spyte van die gebruik van rem R, het alles baie vinnig gewerk. Dit is nie verbasend dat die dele van R wat data lees en skryf hoogs geoptimaliseer is nie. Nadat op een mediumgrootte chromosoom getoets is, is die taak binne ongeveer twee uur op 'n C5n.4xl-instansie voltooi.

S3 Beperkings

Wat het ek geleer: Danksy slim pad-implementering kan S3 baie lêers hanteer.

Ek was bekommerd of S3 die baie lêers wat na hom oorgeplaas is, sou kon hanteer. Ek kan die lêername sin maak, maar hoe sal S3 daarvoor lyk?

Ontleding van 25 TB met AWK en R
Dopgehou in S3 is net om te wys, in werklikheid stel die stelsel nie belang in die simbool nie /. Van die S3 FAQ-bladsy.

Dit blyk dat S3 die pad na 'n spesifieke lêer verteenwoordig as 'n eenvoudige sleutel in 'n soort hash-tabel of dokument-gebaseerde databasis. 'n Emmer kan as 'n tabel beskou word, en lêers kan as rekords in daardie tabel beskou word.

Aangesien spoed en doeltreffendheid belangrik is om wins te maak by Amazon, is dit geen verrassing dat hierdie sleutel-as-'n-lêer-pad stelsel freaking geoptimaliseer is. Ek het probeer om 'n balans te vind: sodat ek nie baie get-versoeke hoef te maak nie, maar dat die versoeke vinnig uitgevoer is. Dit het geblyk dat dit die beste is om ongeveer 20 duisend bin-lêers te maak. Ek dink as ons voortgaan om te optimaliseer, kan ons 'n toename in spoed bereik (byvoorbeeld deur 'n spesiale emmer net vir data te maak, en sodoende die grootte van die opsoektabel te verminder). Maar daar was nie tyd of geld vir verdere eksperimente nie.

Wat van kruisversoenbaarheid?

Wat ek geleer het: Die nommer een oorsaak van tydmors is om jou bergingsmetode voortydig te optimaliseer.

Op hierdie punt is dit baie belangrik om jouself af te vra: "Hoekom gebruik 'n eie lêerformaat?" Die rede lê in laaispoed (gzipped CSV-lêers het 7 keer langer geneem om te laai) en verenigbaarheid met ons werkvloeie. Ek kan dalk heroorweeg of R maklik Parket (of Arrow) lêers kan laai sonder die Spark-lading. Almal in ons laboratorium gebruik R, en as ek die data na 'n ander formaat moet omskakel, het ek steeds die oorspronklike teksdata, so ek kan net weer die pyplyn laat loop.

Verdeling van werk

Wat het ek geleer: Moenie probeer om take met die hand te optimaliseer nie, laat die rekenaar dit doen.

Ek het die werkvloei op een chromosoom ontfout, nou moet ek al die ander data verwerk.
Ek wou verskeie EC2-instansies vir omskakeling insamel, maar terselfdertyd was ek bang om 'n baie ongebalanseerde vrag oor verskillende verwerkingstake te kry (net soos Spark aan ongebalanseerde partisies gely het). Daarbenewens het ek nie daarin belang gestel om een ​​geval per chromosoom te verhoog nie, want vir AWS-rekeninge is daar 'n versteklimiet van 10 gevalle.

Toe besluit ek om 'n draaiboek in R te skryf om verwerkingstake te optimaliseer.

Eerstens het ek S3 gevra om te bereken hoeveel stoorspasie elke chromosoom beset het.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Toe skryf ek 'n funksie wat die totale grootte neem, die volgorde van die chromosome skommel, hulle in groepe verdeel num_jobs en vertel jou hoe verskillend die groottes van alle verwerkingstake is.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Toe hardloop ek deur 'n duisend skommelings met purrr en kies die beste.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Ek het dus met 'n stel take geëindig wat baie soortgelyk in grootte was. Dan was al wat oorgebly het om my vorige Bash-skrif in 'n groot lus toe te draai for. Hierdie optimalisering het ongeveer 10 minute geneem om te skryf. En dit is baie minder as wat ek sou spandeer om take met die hand te skep as hulle ongebalanseerd was. Daarom dink ek dat ek reg was met hierdie voorlopige optimalisering.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Aan die einde voeg ek die afsluit-opdrag by:

sudo shutdown -h now

... en alles het uitgewerk! Deur die AWS CLI te gebruik, het ek gevalle geopper deur die opsie te gebruik user_data het vir hulle Bash-skrifte van hul take vir verwerking gegee. Hulle het outomaties gehardloop en afgesluit, so ek het nie vir ekstra verwerkingskrag betaal nie.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Kom ons pak!

Wat het ek geleer: Die API moet eenvoudig wees ter wille van gemak en buigsaamheid van gebruik.

Uiteindelik het ek die data op die regte plek en vorm gekry. Al wat oorgebly het, was om die proses om data te gebruik soveel as moontlik te vereenvoudig om dit vir my kollegas makliker te maak. Ek wou 'n eenvoudige API maak om versoeke te skep. As ek in die toekoms besluit om oor te skakel van .rds na Parket-lêers, dan behoort dit vir my 'n probleem te wees, nie vir my kollegas nie. Hiervoor het ek besluit om 'n interne R-pakket te maak.

Bou en dokumenteer 'n baie eenvoudige pakket wat net 'n paar datatoegangsfunksies bevat wat rondom 'n funksie georganiseer is get_snp. Ek het ook 'n webwerf vir my kollegas gemaak pak af, sodat hulle maklik voorbeelde en dokumentasie kan sien.

Ontleding van 25 TB met AWK en R

Slim kas

Wat het ek geleer: As jou data goed voorberei is, sal cache maklik wees!

Aangesien een van die hoofwerkstrome dieselfde ontledingsmodel op die SNP-pakket toegepas het, het ek besluit om binning tot my voordeel te gebruik. Wanneer data via SNP oorgedra word, word alle inligting van die groep (bin) aan die teruggekeerde voorwerp geheg. Dit wil sê, ou navrae kan (in teorie) die verwerking van nuwe navrae versnel.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Toe ek die pakket bou, het ek baie maatstawwe gebruik om spoed te vergelyk wanneer ek verskillende metodes gebruik. Ek beveel aan om dit nie te verwaarloos nie, want soms is die resultate onverwags. Byvoorbeeld, dplyr::filter was baie vinniger as om rye vas te vang met indekseringsgebaseerde filtering, en om 'n enkele kolom uit 'n gefiltreerde dataraam te haal was baie vinniger as om indekseringssintaksis te gebruik.

Neem asseblief kennis dat die voorwerp prev_snp_results die sleutel bevat snps_in_bin. Dit is 'n reeks van alle unieke SNP's in 'n groep (bin), wat jou toelaat om vinnig te kyk of jy reeds data van 'n vorige navraag het. Dit maak dit ook maklik om deur al die SNP's in 'n groep (bin) te loop met hierdie kode:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Bevindinge

Nou kan ons (en het ernstig begin) modelle en scenario's bestuur wat voorheen vir ons ontoeganklik was. Die beste ding is dat my laboratoriumkollegas nie aan enige komplikasies hoef te dink nie. Hulle het net 'n funksie wat werk.

En hoewel die pakkie hulle die besonderhede spaar, het ek probeer om die dataformaat eenvoudig genoeg te maak dat hulle dit kon uitvind as ek môre skielik verdwyn...

Die spoed het merkbaar toegeneem. Ons skandeer gewoonlik funksioneel beduidende genoomfragmente. Voorheen kon ons dit nie doen nie (dit het geblyk te duur te wees), maar nou, danksy die groep (bin) struktuur en kas, neem 'n versoek vir een SNP gemiddeld minder as 0,1 sekondes, en die datagebruik is so laag dat die koste vir S3 grondboontjies is.

Gevolgtrekking

Hierdie artikel is glad nie 'n gids nie. Die oplossing blyk individueel te wees, en byna beslis nie optimaal nie. Dit is eerder 'n reisverhaal. Ek wil hê ander moet verstaan ​​dat sulke besluite nie ten volle gevorm in die kop voorkom nie, dit is die resultaat van beproewing en fout. Ook, as jy op soek is na 'n data wetenskaplike, hou in gedagte dat die gebruik van hierdie instrumente effektief ervaring vereis, en ervaring kos geld. Ek is bly dat ek die middele gehad het om te betaal, maar baie ander wat dieselfde werk beter as ek kan doen, sal weens 'n gebrek aan geld nooit die geleentheid kry om selfs te probeer nie.

Groot data-instrumente is veelsydig. As jy die tyd het, kan jy byna seker 'n vinniger oplossing skryf deur slim data skoonmaak, berging en onttrekkingstegnieke te gebruik. Uiteindelik kom dit neer op 'n koste-voordeel-ontleding.

Wat ek geleer het:

  • daar is geen goedkoop manier om 25 TB op 'n slag te ontleed nie;
  • wees versigtig met die grootte van jou Parket-lêers en hul organisasie;
  • Afskortings in Spark moet gebalanseerd wees;
  • Oor die algemeen moet jy nooit probeer om 2,5 miljoen partisies te maak nie;
  • Sortering is steeds moeilik, so ook die opstel van Spark;
  • soms vereis spesiale data spesiale oplossings;
  • Vonk-aggregasie is vinnig, maar partisieering is steeds duur;
  • moenie slaap as hulle jou die basiese beginsels leer nie, iemand het jou probleem seker al in die 1980's opgelos;
  • gnu parallel - dit is 'n magiese ding, almal moet dit gebruik;
  • Spark hou van ongecomprimeerde data en hou nie daarvan om partisies te kombineer nie;
  • Spark het te veel oorhoofse koste wanneer eenvoudige probleme opgelos word;
  • AWK se assosiatiewe skikkings is baie doeltreffend;
  • jy kan kontak stdin и stdout vanaf 'n R-skrip, en gebruik dit dus in die pyplyn;
  • Danksy slim pad-implementering kan S3 baie lêers verwerk;
  • Die hoofrede vir tydmors is om jou bergingsmetode voortydig te optimaliseer;
  • moenie probeer om take met die hand te optimaliseer nie, laat die rekenaar dit doen;
  • Die API moet eenvoudig wees ter wille van gemak en buigsaamheid van gebruik;
  • As jou data goed voorberei is, sal cache maklik wees!

Bron: will.com

Voeg 'n opmerking