Parsar 25TB med AWK och R

Parsar 25TB med AWK och R
Hur man lÀser den hÀr artikeln: Jag ber om ursÀkt för att texten Àr sÄ lÄng och kaotisk. För att spara tid börjar jag varje kapitel med en introduktion "Vad jag lÀrde mig", som sammanfattar kÀrnan i kapitlet i en eller tvÄ meningar.

"Visa mig bara lösningen!" Om du bara vill se var jag kom ifrÄn, hoppa till kapitlet "Bli mer uppfinningsrik", men jag tycker att det Àr mer intressant och anvÀndbart att lÀsa om misslyckanden.

Jag fick nyligen i uppdrag att sÀtta upp en process för att bearbeta en stor volym av rÄa DNA-sekvenser (tekniskt sett ett SNP-chip). Behovet var att snabbt fÄ data om en given genetisk plats (kallad SNP) för efterföljande modellering och andra uppgifter. Med R och AWK kunde jag rengöra och organisera data pÄ ett naturligt sÀtt, vilket avsevÀrt pÄskyndade frÄgebehandlingen. Detta var inte lÀtt för mig och krÀvde mÄnga iterationer. Den hÀr artikeln hjÀlper dig att undvika nÄgra av mina misstag och visar dig vad jag slutade med.

Först nÄgra inledande förklaringar.

Data

VÄrt center för behandling av genetisk information vid universitetet försÄg oss med data i form av en 25 TB TSV. Jag fick dem uppdelade i 5 Gzip-komprimerade paket, som vart och ett innehöll cirka 240 fyra gigabyte filer. Varje rad innehöll data för en SNP frÄn en individ. Totalt överfördes data om ~2,5 miljoner SNP:er och ~60 tusen mÀnniskor. Förutom SNP-information innehöll filerna mÄnga kolumner med siffror som speglar olika egenskaper, sÄsom lÀsintensitet, frekvens av olika alleler, etc. Totalt fanns det cirka 30 kolumner med unika vÀrden.

MĂ„l

Som med alla datahanteringsprojekt var det viktigaste att bestÀmma hur datan skulle anvÀndas. I detta fall vi kommer mestadels att vÀlja modeller och arbetsflöden för SNP baserat pÄ SNP. Det vill sÀga att vi bara behöver data pÄ en SNP Ät gÄngen. Jag var tvungen att lÀra mig hur man hÀmtar alla poster associerade med en av de 2,5 miljoner SNP:erna sÄ enkelt, snabbt och billigt som möjligt.

Hur man inte gör detta

För att citera en passande klyscha:

Jag misslyckades inte tusen gÄnger, jag upptÀckte bara tusen sÀtt att undvika att analysera en massa data i ett frÄgevÀnligt format.

Första försöket

Vad har jag lÀrt mig: Det finns inget billigt sÀtt att analysera 25 TB Ät gÄngen.

Efter att ha gÄtt kursen "Avancerade metoder för Big Data Processing" vid Vanderbilt University var jag sÀker pÄ att tricket lÄg i bagaget. Det kommer förmodligen att ta en timme eller tvÄ att stÀlla in Hive-servern för att köra igenom all data och rapportera resultatet. Eftersom vÄr data lagras i AWS S3 anvÀnde jag tjÀnsten Athena, som lÄter dig tillÀmpa Hive SQL-frÄgor pÄ S3-data. Du behöver inte konfigurera/höja ett Hive-kluster, och du betalar ocksÄ endast för den data du letar efter.

Efter att jag visade Athena mina data och dess format, körde jag nÄgra tester med frÄgor som detta:

select * from intensityData limit 10;

Och fick snabbt vÀlstrukturerade resultat. Redo.

Tills vi försökte anvÀnda uppgifterna i vÄrt arbete...

Jag blev ombedd att ta fram all SNP-information för att testa modellen pÄ. Jag körde frÄgan:


select * from intensityData 
where snp = 'rs123456';

...och började vÀnta. Efter Ätta minuter och mer Àn 4 TB begÀrd data fick jag resultatet. Athena tar betalt efter mÀngden data som hittas, 5 USD per terabyte. SÄ denna enda begÀran kostade $20 och Ätta minuters vÀntan. För att köra modellen pÄ all data var vi tvungna att vÀnta i 38 Är och betala 50 miljoner dollar. Det hÀr var uppenbarligen inte lÀmpligt för oss.

Det var nödvÀndigt att anvÀnda parkett...

Vad har jag lÀrt mig: Var försiktig med storleken pÄ dina Parquet-filer och deras organisation.

Jag försökte först ÄtgÀrda situationen genom att konvertera alla TSV till Parkettfiler. De Àr praktiska för att arbeta med stora datamÀngder eftersom informationen i dem lagras i kolumnform: varje kolumn ligger i sitt eget minne/disksegment, till skillnad frÄn textfiler, dÀr rader innehÄller element i varje kolumn. Och om du behöver hitta nÄgot, lÀs bara den obligatoriska kolumnen. Dessutom lagrar varje fil ett intervall av vÀrden i en kolumn, sÄ om vÀrdet du letar efter inte Àr i kolumnens intervall, kommer Spark inte att slösa tid pÄ att skanna hela filen.

Jag körde en enkel uppgift AWS-lim att konvertera vÄra TSV till Parkett och slÀppte de nya filerna i Athena. Det tog ca 5 timmar. Men nÀr jag körde förfrÄgan tog det ungefÀr lika lÄng tid och lite mindre pengar att slutföra. Faktum Àr att Spark, i ett försök att optimera uppgiften, helt enkelt packade upp en TSV-bit och lade den i sin egen Parkett-bit. Och eftersom varje bit var tillrÀckligt stor för att hÄlla alla register för mÄnga mÀnniskor, innehöll varje fil alla SNP:er, sÄ Spark var tvungen att öppna alla filer för att extrahera den information den behövde.

Intressant nog Àr Parquets standard (och rekommenderade) komprimeringstyp, snappy, inte delbar. DÀrför var varje executor fast i uppgiften att packa upp och ladda ner hela 3,5 GB dataset.

Parsar 25TB med AWK och R

LÄt oss förstÄ problemet

Vad har jag lÀrt mig: Sortering Àr svÄrt, speciellt om data Àr distribuerad.

Det verkade för mig att nu förstod jag kÀrnan i problemet. Jag behövde bara sortera data efter SNP-kolumn, inte efter personer. Sedan kommer flera SNP:er att lagras i en separat dataklump, och dÄ kommer Parquets "smarta" funktion "öppna endast om vÀrdet Àr inom intervallet" att visa sig i all Àra. TyvÀrr visade det sig vara en svÄr uppgift att sortera igenom miljarder rader utspridda över ett kluster.

AWS vill definitivt inte ge en Äterbetalning pÄ grund av skÀlet "Jag Àr en distraherad student". Efter att jag körde sortering pÄ Amazon Glue, körde den i 2 dagar och kraschade.

Hur Àr det med partitionering?

Vad har jag lÀrt mig: SkiljevÀggar i Spark mÄste vara balanserade.

Sedan kom jag pÄ idén att partitionera data i kromosomer. Det finns 23 av dem (och flera till om man tar hÀnsyn till mitokondrie-DNA och omappade regioner).
Detta gör att du kan dela upp data i mindre bitar. Om du bara lÀgger till en rad till Spark-exportfunktionen i Glue-skriptet partition_by = "chr", dÄ bör data delas upp i hinkar.

Parsar 25TB med AWK och R
Genomet bestÄr av mÄnga fragment som kallas kromosomer.

TyvÀrr fungerade det inte. Kromosomer har olika storlekar, vilket betyder olika mÀngder information. Detta innebÀr att uppgifterna som Spark skickade till arbetarna inte var balanserade och slutfördes lÄngsamt eftersom vissa noder slutade tidigt och var inaktiva. Uppgifterna var dock klara. Men nÀr man bad om en SNP orsakade obalansen Äterigen problem. Kostnaden för att bearbeta SNP pÄ större kromosomer (det vill sÀga dÀr vi vill hÀmta data) har bara minskat med ungefÀr en faktor 10. Mycket, men inte tillrÀckligt.

TÀnk om vi delar upp det i Ànnu mindre delar?

Vad har jag lÀrt mig: Försök aldrig att göra 2,5 miljoner partitioner alls.

Jag bestĂ€mde mig för att gĂ„ all out och partitionera varje SNP. Detta sĂ€kerstĂ€llde att partitionerna var lika stora. DET VAR EN DÅLIG IDÉ. Jag anvĂ€nde Glue och lade till en oskyldig linje partition_by = 'snp'. Uppgiften startade och började utföras. En dag senare kollade jag och sĂ„g att det fortfarande inte fanns nĂ„got skrivet till S3, sĂ„ jag dödade uppgiften. Det ser ut som att Glue skrev mellanfiler till en dold plats i S3, mĂ„nga filer, kanske ett par miljoner. Som ett resultat kostade mitt misstag mer Ă€n tusen dollar och behagade inte min mentor.

Partitionering + sortering

Vad har jag lÀrt mig: Sortering Àr fortfarande svÄrt, liksom att trimma Spark.

Mitt sista försök att partitionera innebar att jag partitionerade kromosomerna och sedan sorterade varje partition. I teorin skulle detta pÄskynda varje frÄga eftersom den önskade SNP-datan mÄste ligga inom nÄgra parkettbitar inom ett givet intervall. TyvÀrr visade det sig vara en svÄr uppgift att sortera Àven partitionerade data. Som ett resultat bytte jag till EMR för ett anpassat kluster och anvÀnde Ätta kraftfulla instanser (C5.4xl) och Sparklyr för att skapa ett mer flexibelt arbetsflöde...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...uppgiften var dock fortfarande inte klar. Jag konfigurerade det pÄ olika sÀtt: ökade minnesallokeringen för varje frÄgeexekutor, anvÀnde noder med en stor mÀngd minne, anvÀnde broadcastvariabler (sÀndningsvariabler), men varje gÄng visade det sig att dessa var halvmÄtt, och gradvis började exekverarna att misslyckas tills allt stannade.

Jag blir mer kreativ

Vad har jag lÀrt mig: Ibland krÀver specialdata speciallösningar.

Varje SNP har ett positionsvÀrde. Detta Àr ett tal som motsvarar antalet baser lÀngs dess kromosom. Detta Àr ett trevligt och naturligt sÀtt att organisera vÄr data. Först ville jag dela upp efter regioner av varje kromosom. Till exempel positioner 1 - 2000, 2001 - 4000, etc. Men problemet Àr att SNP inte Àr jÀmnt fördelade över kromosomerna, sÄ gruppstorlekarna kommer dÀrför att variera mycket.

Parsar 25TB med AWK och R

Som ett resultat kom jag till en uppdelning av positioner i kategorier (rang). Med hjÀlp av redan nedladdade data körde jag en begÀran om att fÄ en lista över unika SNP, deras positioner och kromosomer. Sedan sorterade jag data inom varje kromosom och samlade in SNPs i grupper (bin) av en given storlek. LÄt oss sÀga 1000 SNP vardera. Detta gav mig förhÄllandet SNP-till-grupp-per-kromosom.

Till slut gjorde jag grupper (bin) med 75 SNP:er, anledningen kommer att förklaras nedan.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Försök först med Spark

Vad har jag lÀrt mig: Gnistaggregering Àr snabb, men partitionering Àr fortfarande dyrt.

Jag ville lÀsa den hÀr lilla (2,5 miljoner rader) dataramen i Spark, kombinera den med rÄdata och sedan partitionera den efter den nyligen tillagda kolumnen bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

jag anvÀnde sdf_broadcast(), sÄ Spark vet att den ska skicka dataramen till alla noder. Detta Àr anvÀndbart om informationen Àr liten och krÀvs för alla uppgifter. Annars försöker Spark vara smart och distribuerar data efter behov, vilket kan orsaka nedgÄngar.

Och Äterigen, min idé fungerade inte: uppgifterna fungerade under en tid, fullbordade facket, och sedan började de misslyckas, precis som exekutörerna som startade genom partitionering.

LĂ€gger till AWK

Vad har jag lÀrt mig: Sov inte nÀr du lÀr dig grunderna. NÄgon har sÀkert löst ditt problem redan pÄ 1980-talet.

Fram till denna punkt var orsaken till alla mina misslyckanden med Spark virrvarret av data i klustret. Kanske kan situationen förbÀttras med förbehandling. Jag bestÀmde mig för att försöka dela upp den rÄa textdatan i kolumner med kromosomer i hopp om att förse Spark med "förpartitionerade" data.

Jag sökte pÄ StackOverflow efter hur man delar upp efter kolumnvÀrden och hittade sÄ bra svar. Med AWK kan du dela upp en textfil efter kolumnvÀrden genom att skriva den i ett skript istÀllet för att skicka resultaten till stdout.

Jag skrev ett Bash-manus för att testa det. Laddade ner en av de paketerade TSV:erna och packade sedan upp den med hjÀlp av gzip och skickas till awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Det fungerade!

Fyller kÀrnorna

Vad har jag lĂ€rt mig: gnu parallel – det Ă€r en magisk sak, alla borde anvĂ€nda den.

Separationen var ganska lÄngsam och nÀr jag började htopför att kontrollera anvÀndningen av en kraftfull (och dyr) EC2-instans visade det sig att jag bara anvÀnde en kÀrna och cirka 200 MB minne. För att lösa problemet och inte förlora en massa pengar var vi tvungna att ta reda pÄ hur vi skulle parallellisera arbetet. Lyckligtvis i en helt fantastisk bok Datavetenskap vid kommandoraden Jag hittade ett kapitel av Jeron Janssens om parallellisering. Av det lÀrde jag mig om gnu parallel, en mycket flexibel metod för att implementera multithreading i Unix.

Parsar 25TB med AWK och R
NÀr jag startade partitioneringen med den nya processen var allt bra, men det fanns fortfarande en flaskhals - nedladdning av S3-objekt till disk var inte sÀrskilt snabb och inte helt parallelliserad. För att fixa detta gjorde jag sÄ hÀr:

  1. Jag fick reda pÄ att det Àr möjligt att implementera S3-nedladdningssteget direkt i pipeline, vilket helt eliminerar mellanlagring pÄ disk. Det betyder att jag kan undvika att skriva rÄdata till disk och anvÀnda Ànnu mindre, och dÀrför billigare, lagring pÄ AWS.
  2. Team aws configure set default.s3.max_concurrent_requests 50 ökade kraftigt antalet trÄdar som AWS CLI anvÀnder (som standard finns det 10).
  3. Jag bytte till en EC2-instans optimerad för nÀtverkshastighet, med bokstaven n i namnet. Jag har funnit att förlusten av processorkraft vid anvÀndning av n-instanser mer Àn kompenseras av ökningen i laddningshastighet. För de flesta uppgifter anvÀnde jag c5n.4xl.
  4. Ändrats gzip pĂ„ pigz, detta Ă€r ett gzip-verktyg som kan göra coola saker för att parallellisera den initialt icke-parallelliserade uppgiften att dekomprimera filer (detta hjĂ€lpte minst).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Dessa steg kombineras med varandra för att fÄ allt att fungera vÀldigt snabbt. Genom att öka nedladdningshastigheterna och eliminera diskskrivningar kunde jag nu bearbeta ett paket pÄ 5 terabyte pÄ bara nÄgra timmar.

Denna tweet borde ha nÀmnt "TSV". Ack.

AnvÀnder nyligen analyserad data

Vad har jag lÀrt mig: Spark gillar okomprimerad data och gillar inte att kombinera partitioner.

Nu fanns data i S3 i ett uppackat (lÀs: delat) och halvorganiserat format, och jag kunde ÄtervÀnda till Spark igen. En överraskning vÀntade mig: jag lyckades Äterigen inte uppnÄ det jag ville! Det var mycket svÄrt att berÀtta för Spark exakt hur data var uppdelad. Och Àven nÀr jag gjorde detta visade det sig att det fanns för mÄnga partitioner (95 tusen), och nÀr jag anvÀnde coalesce reducerade deras antal till rimliga grÀnser, detta förstörde min partitionering. Jag Àr sÀker pÄ att detta gÄr att fixa, men efter ett par dagars letande kunde jag inte hitta nÄgon lösning. Jag avslutade sÄ smÄningom alla uppgifter i Spark, Àven om det tog ett tag och mina delade parkettfiler var inte sÀrskilt smÄ (~200 KB). DÀremot fanns uppgifterna dÀr de behövdes.

Parsar 25TB med AWK och R
För liten och ojÀmn, underbar!

Testar lokala Spark-frÄgor

Vad har jag lÀrt mig: Spark har för mycket omkostnader nÀr man löser enkla problem.

Genom att ladda ner datan i ett smart format kunde jag testa hastigheten. StÀll in ett R-skript för att köra en lokal Spark-server och laddade sedan en Spark-dataram frÄn den angivna Parquet-grupplagringen (bin). Jag försökte ladda all data men kunde inte fÄ Sparklyr att kÀnna igen partitioneringen.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

AvrÀttningen tog 29,415 sekunder. Mycket bÀttre, men inte för bra för masstestning av nÄgonting. Dessutom kunde jag inte pÄskynda saker och ting med cachelagring eftersom nÀr jag försökte cachelagra en dataram i minnet kraschade Spark alltid, Àven nÀr jag allokerade mer Àn 50 GB minne till en datauppsÀttning som vÀgde mindre Àn 15.

ÅtergĂ„ till AWK

Vad har jag lÀrt mig: Associativa arrayer i AWK Àr mycket effektiva.

Jag insĂ„g att jag kunde uppnĂ„ högre hastigheter. Jag kom ihĂ„g det i en underbar AWK handledning av Bruce Barnett Jag lĂ€ste om en cool funktion som heter "associativa arrayer" I huvudsak Ă€r dessa nyckel-vĂ€rdepar, som av nĂ„gon anledning kallades annorlunda i AWK, och dĂ€rför tĂ€nkte jag pĂ„ nĂ„got sĂ€tt inte sĂ„ mycket pĂ„ dem. Roman Cheplyaka pĂ„minde om att termen "associativa arrayer" Ă€r mycket Ă€ldre Ă€n termen "nyckel-vĂ€rdepar". Även om du slĂ„ upp nyckel-vĂ€rdet i Google Ngram, du kommer inte att se den hĂ€r termen dĂ€r, men du hittar associativa arrayer! Dessutom Ă€r "nyckel-vĂ€rde-paret" oftast associerat med databaser, sĂ„ det Ă€r mycket mer meningsfullt att jĂ€mföra det med en hashmap. Jag insĂ„g att jag kunde anvĂ€nda dessa associativa arrayer för att associera mina SNP:er med en bin-tabell och rĂ„data utan att anvĂ€nda Spark.

För att göra detta anvÀnde jag blocket i AWK-skriptet BEGIN. Detta Àr en bit kod som exekveras innan den första raden med data skickas till skriptets huvuddel.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) laddade alla rader frÄn CSV-gruppen (bin), ange den första kolumnen (SNP-namn) som nyckel för den associativa arrayen bin och det andra vÀrdet (gruppen) som vÀrdet. Sedan i blocket { }, som exekveras pÄ alla rader i huvudfilen, skickas varje rad till utdatafilen, som fÄr ett unikt namn beroende pÄ dess grupp (bin): ..._bin_"bin[$1]"_....

variabler batch_num О chunk_id matchade data som tillhandahÄlls av pipelinen, undviker ett race-tillstÄnd och varje exekveringstrÄd som körs parallel, skrev till sin egen unika fil.

Eftersom jag spred alla rÄdata i mappar pÄ kromosomer som blev över frÄn mitt tidigare experiment med AWK, kunde jag nu skriva ett annat Bash-skript för att bearbeta en kromosom i taget och skicka djupare partitionerad data till S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Manuset har tvÄ avsnitt parallel.

I det första avsnittet lÀses data frÄn alla filer som innehÄller information om den önskade kromosomen, sedan distribueras denna data över trÄdar, som distribuerar filerna i lÀmpliga grupper (bin). För att undvika tÀvlingsförhÄllanden nÀr flera trÄdar skriver till samma fil skickar AWK filnamnen för att skriva data till olika platser, t.ex. chr_10_bin_52_batch_2_aa.csv. Som ett resultat skapas mÄnga smÄ filer pÄ disken (för detta anvÀnde jag terabyte EBS-volymer).

Transportör frÄn andra sektionen parallel gÄr igenom grupperna (bin) och kombinerar deras individuella filer till gemensamma CSV c catoch skickar dem sedan för export.

SĂ€ndning i R?

Vad har jag lÀrt mig: Du kan kontakta stdin О stdout frÄn ett R-skript, och anvÀnd det dÀrför i pipelinen.

Du kanske har lagt mÀrke till den hÀr raden i ditt Bash-skript: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Den översÀtter alla sammanlÀnkade gruppfiler (bin) till R-skriptet nedan. {} Àr en speciell teknik parallel, som infogar all data som den skickar till den angivna strömmen direkt i sjÀlva kommandot. Alternativ {#} ger ett unikt trÄd-ID och {%} representerar jobbplatsnumret (upprepas, men aldrig samtidigt). En lista över alla alternativ finns i dokumentation.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

NÀr en variabel file("stdin") överförs till readr::read_csv, data som översatts till R-skriptet laddas in i en ram, som sedan Àr i form .rds-fil med hjÀlp av aws.s3 skrivet direkt till S3.

RDS Àr ungefÀr en juniorversion av Parkett, utan krusidullerna med högtalarförvaring.

Efter att ha avslutat Bash-manuset fick jag ett paket .rds-filer som finns i S3, vilket gjorde att jag kunde anvÀnda effektiv komprimering och inbyggda typer.

Trots anvÀndningen av broms R fungerade allt vÀldigt snabbt. Inte överraskande Àr de delar av R som lÀser och skriver data mycket optimerade. Efter testning pÄ en medelstor kromosom slutfördes jobbet pÄ en C5n.4xl-instans pÄ cirka tvÄ timmar.

S3 BegrÀnsningar

Vad har jag lÀrt mig: Tack vare smart vÀgimplementering kan S3 hantera mÄnga filer.

Jag var orolig om S3 skulle kunna hantera de mÄnga filer som överfördes till den. Jag skulle kunna göra filnamnen vettiga, men hur skulle S3 leta efter dem?

Parsar 25TB med AWK och R
Mappar i S3 Àr bara för att visa, i sjÀlva verket Àr systemet inte intresserad av symbolen /. FrÄn S3 FAQ-sidan.

Det verkar som att S3 representerar sökvÀgen till en viss fil som en enkel nyckel i en sorts hashtabell eller dokumentbaserad databas. En hink kan ses som en tabell, och filer kan betraktas som poster i den tabellen.

Eftersom snabbhet och effektivitet Àr viktiga för att göra en vinst hos Amazon, Àr det ingen överraskning att detta nyckel-som-fil-sökvÀgssystem Àr galet optimerat. Jag försökte hitta en balans: sÄ att jag inte behövde göra mÄnga get-förfrÄgningar, men att förfrÄgningarna utfördes snabbt. Det visade sig att det Àr bÀst att göra cirka 20 tusen bin-filer. Jag tror att om vi fortsÀtter att optimera kan vi uppnÄ en ökning av hastigheten (till exempel genom att göra en speciell hink enbart för data och dÀrmed minska storleken pÄ uppslagstabellen). Men det fanns varken tid eller pengar för ytterligare experiment.

Hur Àr det med korskompatibilitet?

Vad jag lÀrde mig: Den frÀmsta orsaken till bortkastad tid Àr att optimera din lagringsmetod i förtid.

Vid det hÀr laget Àr det mycket viktigt att frÄga dig sjÀlv: "Varför anvÀnda ett proprietÀrt filformat?" Anledningen ligger i laddningshastigheten (gzippade CSV-filer tog 7 gÄnger lÀngre tid att ladda) och kompatibilitet med vÄra arbetsflöden. Jag kan ompröva om R enkelt kan ladda Parquet (eller Arrow) filer utan Spark-belastningen. Alla i vÄrt labb anvÀnder R, och om jag behöver konvertera data till ett annat format har jag fortfarande den ursprungliga textdatan, sÄ jag kan bara köra pipelinen igen.

Uppdelning av arbete

Vad har jag lÀrt mig: Försök inte att optimera jobb manuellt, lÄt datorn göra det.

Jag har felsökt arbetsflödet pÄ en kromosom, nu mÄste jag bearbeta alla andra data.
Jag ville ta upp flera EC2-instanser för konvertering, men samtidigt var jag rÀdd för att fÄ en vÀldigt obalanserad belastning över olika bearbetningsjobb (precis som Spark led av obalanserade partitioner). Dessutom var jag inte intresserad av att höja en instans per kromosom, eftersom det för AWS-konton finns en standardgrÀns pÄ 10 instanser.

Sedan bestÀmde jag mig för att skriva ett manus i R för att optimera bearbetningsjobb.

Först bad jag S3 att berÀkna hur mycket lagringsutrymme varje kromosom upptog.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# 
 with 17 more rows

Sedan skrev jag en funktion som tar den totala storleken, blandar ordningen pÄ kromosomerna, delar upp dem i grupper num_jobs och berÀttar hur olika storlekarna pÄ alla bearbetningsjobb Àr.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Sedan gick jag igenom tusen shuffles med purrr och valde det bÀsta.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

SÄ jag fick en uppsÀttning uppgifter som var vÀldigt lika i storlek. Sedan var det bara att linda in mitt tidigare Bash-manus i en stor loop for. Denna optimering tog cirka 10 minuter att skriva. Och detta Àr mycket mindre Àn jag skulle spendera pÄ att manuellt skapa uppgifter om de var obalanserade. DÀrför tror jag att jag hade rÀtt med denna preliminÀra optimering.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

I slutet lÀgger jag till kommandot shutdown:

sudo shutdown -h now

... och allt löste sig! Med hjÀlp av AWS CLI tog jag upp instanser med alternativet user_data gav dem Bash-skript av deras uppgifter för bearbetning. De körde och stÀngdes av automatiskt, sÄ jag betalade inte för extra processorkraft.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

LĂ„t oss packa!

Vad har jag lÀrt mig: API:et bör vara enkelt för enkelhetens och flexibilitetens skull.

Äntligen fick jag uppgifterna pĂ„ rĂ€tt plats och i rĂ€tt form. Allt som Ă„terstod var att förenkla processen att anvĂ€nda data sĂ„ mycket som möjligt för att göra det enklare för mina kollegor. Jag ville göra ett enkelt API för att skapa förfrĂ„gningar. Om jag i framtiden bestĂ€mmer mig för att byta frĂ„n .rds till Parkettfiler, sĂ„ borde detta vara ett problem för mig, inte för mina kollegor. För detta bestĂ€mde jag mig för att göra ett internt R-paket.

Bygg och dokumentera ett mycket enkelt paket som innehÄller bara nÄgra fÄ dataÄtkomstfunktioner organiserade runt en funktion get_snp. Jag gjorde Àven en hemsida för mina kollegor packdown, sÄ att de enkelt kan se exempel och dokumentation.

Parsar 25TB med AWK och R

Smart cachning

Vad har jag lÀrt mig: Om din data Àr vÀl förberedd blir caching enkelt!

Eftersom ett av huvudarbetsflödena tillÀmpade samma analysmodell pÄ SNP-paketet, bestÀmde jag mig för att anvÀnda binning till min fördel. Vid överföring av data via SNP bifogas all information frÄn gruppen (bin) till det returnerade objektet. Det vill sÀga, gamla frÄgor kan (i teorin) pÄskynda behandlingen av nya frÄgor.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

NÀr jag byggde paketet körde jag mÄnga benchmarks för att jÀmföra hastighet nÀr jag anvÀnde olika metoder. Jag rekommenderar att inte försumma detta, för ibland Àr resultaten ovÀntade. Till exempel, dplyr::filter var mycket snabbare Àn att fÄnga rader med indexeringsbaserad filtrering, och att hÀmta en enda kolumn frÄn en filtrerad dataram var mycket snabbare Àn att anvÀnda indexeringssyntax.

Observera att objektet prev_snp_results innehÄller nyckeln snps_in_bin. Detta Àr en uppsÀttning av alla unika SNP:er i en grupp (bin), sÄ att du snabbt kan kontrollera om du redan har data frÄn en tidigare frÄga. Det gör det ocksÄ enkelt att gÄ igenom alla SNP:er i en grupp (bin) med denna kod:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Resultat

Nu kan vi (och har börjat pÄ allvar) köra modeller och scenarier som tidigare var otillgÀngliga för oss. Det bÀsta Àr att mina labbkollegor inte behöver tÀnka pÄ nÄgra komplikationer. De har bara en funktion som fungerar.

Och Àven om paketet besparar dem detaljerna, försökte jag göra dataformatet tillrÀckligt enkelt för att de skulle kunna lista ut det om jag plötsligt försvann imorgon...

Hastigheten har ökat mÀrkbart. Vi skannar vanligtvis funktionellt signifikanta genomfragment. Tidigare kunde vi inte göra detta (det visade sig vara för dyrt), men nu, tack vare grupp (bin) struktur och caching, tar en begÀran om en SNP i genomsnitt mindre Àn 0,1 sekunder, och dataanvÀndningen Àr sÄ lÄg att kostnaderna för S3 Àr jordnötter.

Slutsats

Den hÀr artikeln Àr inte alls en guide. Lösningen visade sig vara individuell, och nÀstan sÀkert inte optimal. Det Àr snarare en reseskildring. Jag vill att andra ska förstÄ att sÄdana beslut inte verkar helt formade i huvudet, de Àr resultatet av försök och misstag. Dessutom, om du letar efter en datavetare, kom ihÄg att anvÀndningen av dessa verktyg effektivt krÀver erfarenhet och erfarenhet kostar pengar. Jag Àr glad att jag hade möjlighet att betala, men mÄnga andra som kan göra samma jobb bÀttre Àn jag kommer aldrig att fÄ möjlighet pÄ grund av brist pÄ pengar att ens prova.

Big data-verktyg Àr mÄngsidiga. Om du har tid kan du nÀstan sÀkert skriva en snabbare lösning med smarta tekniker för datarensning, lagring och extraktion. I slutÀndan handlar det om en kostnads-nyttoanalys.

Vad jag lÀrde mig:

  • det finns inget billigt sĂ€tt att analysera 25 TB Ă„t gĂ„ngen;
  • var försiktig med storleken pĂ„ dina Parquet-filer och deras organisation;
  • SkiljevĂ€ggar i Spark mĂ„ste vara balanserade;
  • I allmĂ€nhet, försök aldrig att göra 2,5 miljoner partitioner;
  • Sortering Ă€r fortfarande svĂ„rt, liksom att sĂ€tta upp Spark;
  • ibland krĂ€ver speciella data speciella lösningar;
  • Gnistaggregering Ă€r snabb, men partitionering Ă€r fortfarande dyrt;
  • sov inte nĂ€r de lĂ€r dig grunderna, nĂ„gon har förmodligen löst ditt problem redan pĂ„ 1980-talet;
  • gnu parallel - det hĂ€r Ă€r en magisk sak, alla borde anvĂ€nda den;
  • Spark gillar okomprimerad data och gillar inte att kombinera partitioner;
  • Spark har för mycket omkostnader nĂ€r man löser enkla problem;
  • AWK:s associativa arrayer Ă€r mycket effektiva;
  • du kan kontakta stdin Đž stdout frĂ„n ett R-skript, och anvĂ€nd det dĂ€rför i pipelinen;
  • Tack vare smart vĂ€gimplementering kan S3 bearbeta mĂ„nga filer;
  • Den frĂ€msta anledningen till att slösa tid Ă€r att i förtid optimera din lagringsmetod;
  • försök inte att optimera uppgifter manuellt, lĂ„t datorn göra det;
  • API:et bör vara enkelt för enkelhetens och flexibilitetens skull;
  • Om din data Ă€r vĂ€l förberedd blir caching enkelt!

KĂ€lla: will.com

LĂ€gg en kommentar