25 TB sõelumine AWK ja R abil

25 TB sõelumine AWK ja R abil
Kuidas seda artiklit lugeda: Vabandan, et tekst on nii pikk ja kaootiline. Teie aja säästmiseks alustan iga peatükki sissejuhatusega "Mida ma õppisin", mis võtab peatüki olemuse kokku ühe või kahe lausega.

"Näita mulle lihtsalt lahendust!" Kui soovite lihtsalt näha, kust ma tulin, siis minge peatüki "Leidlikumaks muutumine" juurde, kuid minu arvates on huvitavam ja kasulikum lugeda ebaõnnestumiste kohta.

Hiljuti tehti mulle ülesandeks luua protsess suure hulga töötlemata DNA järjestuste (tehniliselt SNP-kiibi) töötlemiseks. Vajadus oli kiiresti hankida andmeid antud geneetilise asukoha kohta (nimetatakse SNP-ks) järgnevate modelleerimise ja muude ülesannete jaoks. Kasutades R-i ja AWK-d, sain andmeid puhastada ja korraldada loomulikul viisil, kiirendades oluliselt päringute töötlemist. See ei olnud minu jaoks lihtne ja nõudis mitmeid kordusi. See artikkel aitab teil vältida mõningaid minu vigu ja näitab teile, milleni ma lõpuks jõudsin.

Esiteks mõned sissejuhatavad selgitused.

Andmed

Meie ülikooli geneetilise teabe töötlemise keskus andis meile andmed 25 TB TSV kujul. Sain need jagatuna 5 Gzipiga tihendatud paketti, millest igaüks sisaldas umbes 240 nelja gigabaidist faili. Iga rida sisaldas andmeid ühe SNP kohta ühelt inimeselt. Kokku edastati andmeid ~2,5 miljoni SNP ja ~60 tuhande inimese kohta. Lisaks SNP teabele sisaldasid failid arvukalt veerge numbritega, mis kajastasid erinevaid omadusi, nagu lugemisintensiivsus, erinevate alleelide sagedus jne. Kokku oli unikaalsete väärtustega veergu umbes 30.

Eesmärk

Nagu iga andmehaldusprojekti puhul, oli kõige olulisem kindlaks teha, kuidas andmeid kasutatakse. Sel juhul valime SNP jaoks enamasti SNP-l põhinevad mudelid ja töövood. See tähendab, et vajame andmeid korraga ainult ühe SNP kohta. Pidin õppima, kuidas hankida kõik 2,5 miljonist SNP-st seotud kirjed võimalikult lihtsalt, kiiresti ja odavalt.

Kuidas seda mitte teha

Tsiteerides sobivat klišeed:

Ma ei kukkunud tuhat korda läbi, avastasin lihtsalt tuhat viisi, kuidas vältida hulga andmete sõelumist päringusõbralikus vormingus.

Esmalt proovige

Mida ma olen õppinud: 25 TB korraga sõelumiseks pole odavat viisi.

Olles läbinud Vanderbilti ülikoolis kursuse “Suurandmete töötlemise täiustatud meetodid”, olin kindel, et nipp on kotis. Tõenäoliselt kulub Hive serveri seadistamine kõigi andmete läbimiseks ja tulemusest teatamiseks tund või paar. Kuna meie andmed on salvestatud AWS S3-s, kasutasin teenust Athena, mis võimaldab rakendada Hive SQL päringuid S3 andmetele. Te ei pea Hive'i klastrit seadistama/tõstma ja maksate ka ainult otsitavate andmete eest.

Pärast seda, kui ma näitasin Athenale oma andmeid ja nende vormingut, tegin mõned testid selliste päringutega:

select * from intensityData limit 10;

Ja sai kiiresti hästi struktureeritud tulemused. Valmis.

Kuni proovisime andmeid oma töös kasutada...

Mul paluti mudeli testimiseks kogu SNP teave välja tõmmata. Tegin päringu:


select * from intensityData 
where snp = 'rs123456';

...ja hakkas ootama. Pärast kaheksat minutit ja enam kui 4 TB küsitud andmeid sain tulemuse kätte. Athena maksab leitud andmemahu järgi, 5 dollarit terabaidi kohta. Nii et see üksik taotlus maksis 20 dollarit ja kaheksa minutit ootamist. Mudeli käitamiseks kõigil andmetel pidime ootama 38 aastat ja maksma 50 miljonit dollarit. Ilmselgelt see meile ei sobinud.

Oli vaja kasutada Parkett...

Mida ma olen õppinud: Olge parketifailide suuruse ja nende korraldusega ettevaatlik.

Esmalt proovisin olukorda parandada, teisendades kõik TSV-d Parketi viilid. Need on mugavad suurte andmehulkadega töötamiseks, kuna neis sisalduv teave on salvestatud veergude kujul: iga veerg asub oma mälu-/kettasegmendis, erinevalt tekstifailidest, milles read sisaldavad iga veeru elemente. Ja kui teil on vaja midagi leida, lugege lihtsalt vajalikku veergu. Lisaks salvestab iga fail veergu väärtuste vahemikku, nii et kui otsitav väärtus ei ole veeru vahemikus, ei raiska Spark aega kogu faili skannimisele.

Tegin lihtsa ülesande AWS liim et teisendada meie TSV-d parketiks ja pani uued failid Athenasse. Aega kulus umbes 5 tundi. Aga kui ma taotluse esitasin, kulus selle täitmiseks umbes sama palju aega ja veidi vähem raha. Fakt on see, et Spark, püüdes ülesannet optimeerida, pakkis lihtsalt lahti ühe TSV tüki ja pani selle oma parketti. Ja kuna iga tükk oli piisavalt suur, et sisaldada paljude inimeste terveid kirjeid, sisaldas iga fail kõiki SNP-sid, nii et Spark pidi vajaliku teabe eraldamiseks avama kõik failid.

Huvitav on see, et Parketi vaikimisi (ja soovitatud) tihendustüüp Snappy ei ole poolitav. Seetõttu jäi iga täitja kogu 3,5 GB andmestiku lahtipakkimise ja allalaadimise juurde.

25 TB sõelumine AWK ja R abil

Mõistame probleemi

Mida ma olen õppinud: Sorteerimine on keeruline, eriti kui andmed on jaotatud.

Mulle tundus, et nüüd sain probleemi olemusest aru. Mul oli vaja andmeid sortida ainult SNP veeru, mitte inimeste järgi. Seejärel salvestatakse mitu SNP-d eraldi andmepaki sisse ja seejärel näitab Parquet'i nutikas funktsioon "avatud ainult siis, kui väärtus on vahemikus" ennast täies hiilguses. Kahjuks osutus klastrisse hajutatud miljardite ridade sorteerimine keeruliseks ülesandeks.

AWS ei taha kindlasti raha tagasi maksta põhjusel, et "olen hajameelne õpilane". Pärast seda, kui ma Amazon Glue'is sorteerisin, töötas see 2 päeva ja jooksis kokku.

Aga jaotamine?

Mida ma olen õppinud: Sparki vaheseinad peavad olema tasakaalus.

Siis tulin ideele jagada andmed kromosoomidesse. Neid on 23 (ja veel mitu, kui võtta arvesse mitokondriaalset DNA-d ja kaardistamata piirkondi).
See võimaldab jagada andmed väiksemateks tükkideks. Kui lisate Glue skriptis Sparki ekspordifunktsioonile vaid ühe rea partition_by = "chr", siis tuleks andmed jagada ämbritesse.

25 TB sõelumine AWK ja R abil
Genoom koosneb paljudest fragmentidest, mida nimetatakse kromosoomideks.

Kahjuks see ei õnnestunud. Kromosoomid on erineva suurusega, mis tähendab erinevat infohulka. See tähendab, et Sparki töötajatele saadetud ülesanded ei olnud tasakaalustatud ja täideti aeglaselt, kuna mõned sõlmed lõpetasid varakult ja olid jõude. Ülesanded said siiski täidetud. Kuid ühe SNP küsimisel tekitas tasakaalustamatus taas probleeme. SNP-de töötlemise kulud suuremates kromosoomides (st kus me tahame andmeid saada) on vähenenud vaid umbes 10 korda. Palju, aga mitte piisavalt.

Mis siis, kui jagame selle veelgi väiksemateks osadeks?

Mida ma olen õppinud: Ärge kunagi proovige teha 2,5 miljonit partitsiooni.

Otsustasin teha kõik endast oleneva ja eraldasin iga SNP. See tagas vaheseinte võrdse suurusega. SEE OLI HALB IDEE. Kasutasin Glue'i ja lisasin süütu joone partition_by = 'snp'. Ülesanne algas ja hakkas täitma. Päev hiljem kontrollisin ja nägin, et S3-le pole ikka veel midagi kirjutatud, nii et ma tegin ülesande ära. Näib, et Glue kirjutas vahefaile S3 peidetud asukohta, palju faile, võib-olla paar miljonit. Selle tulemusena maksis minu viga rohkem kui tuhat dollarit ja see ei meeldinud mu mentorile.

Jaotamine + sorteerimine

Mida ma olen õppinud: Sorteerimine on endiselt keeruline, nagu ka Sparki häälestamine.

Minu viimane jaotuskatse hõlmas kromosoomide jaotamist ja seejärel iga partitsiooni sorteerimist. Teoreetiliselt kiirendaks see iga päringut, kuna soovitud SNP-andmed pidid olema antud vahemikus mõne Parketi tüki sees. Kahjuks osutus isegi jaotatud andmete sorteerimine keeruliseks ülesandeks. Selle tulemusel lülitusin kohandatud klastri jaoks EMR-ile ja kasutasin kaheksat võimsat eksemplari (C5.4xl) ja Sparklyri, et luua paindlikum töövoog...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...ülesanne jäi aga ikkagi lõpetamata. Seadistasin seda erineval viisil: suurendasin iga päringutäitja mälu eraldamist, kasutasin suure mälumahuga sõlme, kasutasin leviedastuse muutujaid (edastusmuutujaid), kuid iga kord osutusid need poolmeetmeteks ja järk-järgult hakkasid täitjad hakkama. ebaõnnestuda, kuni kõik peatub.

Ma muutun loovamaks

Mida ma olen õppinud: Mõnikord nõuavad eriandmed erilahendusi.

Igal SNP-l on positsiooni väärtus. See on arv, mis vastab selle kromosoomi aluste arvule. See on kena ja loomulik viis oma andmete korrastamiseks. Alguses tahtsin jagada iga kromosoomi piirkondade kaupa. Näiteks positsioonid 1 - 2000, 2001 - 4000 jne. Kuid probleem on selles, et SNP-d ei ole kromosoomide vahel ühtlaselt jaotunud, seega on rühmade suurused väga erinevad.

25 TB sõelumine AWK ja R abil

Selle tulemusena jõudsin positsioonide jaotamiseni kategooriatesse (aste). Kasutades juba allalaaditud andmeid, esitasin taotluse ainulaadsete SNP-de, nende positsioonide ja kromosoomide loendi saamiseks. Seejärel sorteerisin andmed iga kromosoomi sees ja kogusin SNP-d etteantud suurusega rühmadesse (bin). Oletame, et igaüks 1000 SNP-d. See andis mulle seose SNP-rühma ja kromosoomi kohta.

Lõpuks tegin 75 SNP-st rühmad (bin), põhjust selgitatakse allpool.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Esmalt proovige Sparkiga

Mida ma olen õppinud: Säde koondamine on kiire, kuid jaotamine on siiski kallis.

Tahtsin selle väikese (2,5 miljonit rida) andmeraami Sparki lugeda, kombineerida algandmetega ja seejärel sektsioonida äsja lisatud veeru järgi bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ma kasutasin sdf_broadcast(), nii et Spark teab, et ta peaks saatma andmeraami kõigile sõlmedele. See on kasulik, kui andmed on väikese suurusega ja neid on vaja kõigi ülesannete jaoks. Vastasel juhul püüab Spark olla nutikas ja jagab andmeid vastavalt vajadusele, mis võib põhjustada aeglustusi.

Ja jällegi, minu idee ei töötanud: ülesanded töötasid mõnda aega, lõid liidu lõpule ja siis hakkasid, nagu jaotamise teel käivitatud täitjad, ebaõnnestuma.

AWK lisamine

Mida ma olen õppinud: Ära maga, kui sulle põhitõdesid õpetatakse. Kindlasti lahendas keegi teie probleemi juba 1980ndatel.

Siiani oli kõigi minu ebaõnnestumiste põhjus Sparkiga klastri andmete segadus. Ehk saab eelraviga olukorda parandada. Otsustasin proovida toorteksti andmeid kromosoomide veergudeks jagada, nii et lootsin pakkuda Sparkile "eelselt partitsioonitud" andmeid.

Otsisin StackOverflow'st, kuidas veeru väärtuste järgi jagada, ja leidsin nii suurepärane vastus. AWK abil saate jagada tekstifaili veeru väärtuste järgi, kirjutades selle skripti, selle asemel, et saata tulemusi stdout.

Kirjutasin selle proovimiseks Bashi skripti. Laadis alla ühe pakitud TSV-dest ja pakkis selle seejärel lahti gzip ja saadeti aadressile awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

See toimis!

Südamike täitmine

Mida ma olen õppinud: gnu parallel - see on maagiline asi, kõik peaksid seda kasutama.

Lahkumine oli üsna aeglane ja kui ma alustasin htopvõimsa (ja kalli) EC2 eksemplari kasutamise kontrollimiseks selgus, et kasutasin ainult ühte tuuma ja umbes 200 MB mälu. Probleemi lahendamiseks ja mitte palju raha kaotamiseks pidime välja mõtlema, kuidas tööd paralleelselt teha. Õnneks täiesti hämmastavas raamatus Andmeteadus käsureal Leidsin Jeron Janssensi peatüki paralleelsusest. Sellest sain teada gnu parallel, väga paindlik meetod mitme lõime rakendamiseks Unixis.

25 TB sõelumine AWK ja R abil
Kui alustasin partitsiooniga uue protsessi abil, oli kõik korras, kuid siiski oli kitsaskoht - S3 objektide kettale allalaadimine ei olnud väga kiire ja ei olnud täielikult paralleelne. Selle parandamiseks tegin järgmist:

  1. Sain teada, et S3 allalaadimisetappi on võimalik realiseerida otse torujuhtmes, välistades täielikult ketta vahepealse salvestamise. See tähendab, et saan vältida töötlemata andmete kettale kirjutamist ja kasutada AWS-is veelgi väiksemat ja seega odavamat salvestusruumi.
  2. meeskond aws configure set default.s3.max_concurrent_requests 50 suurendas oluliselt AWS CLI poolt kasutatavate lõimede arvu (vaikimisi on neid 10).
  3. Lülitasin võrgu kiiruse jaoks optimeeritud EC2 eksemplarile, mille nimes on täht n. Olen avastanud, et töötlemisvõimsuse kadu n-eksemplaride kasutamisel kompenseerib enam kui laadimiskiiruse suurenemine. Enamiku ülesannete jaoks kasutasin c5n.4xl.
  4. Muudetud gzip edasi pigz, see on gzip-tööriist, mis suudab teha lahedaid asju, et paralleelida algselt mitteparalleelseks tehtud failide lahtipakkimise ülesanne (see aitas kõige vähem).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Need sammud on omavahel kombineeritud, et kõik toimiks väga kiiresti. Suurendades allalaadimiskiirust ja kaotades kettale kirjutamise, saan nüüd 5 terabaidise paketi töödelda vaid mõne tunniga.

See säuts oleks pidanud mainima "TSV". Kahjuks.

Äsja sõelutud andmete kasutamine

Mida ma olen õppinud: Sparkile meeldivad tihendamata andmed ja talle ei meeldi partitsioonide kombineerimine.

Nüüd olid andmed S3-s lahtipakkimata (loe: jagatud) ja pooljärjestatud formaadis ning võisin uuesti Sparki naasta. Mind ootas üllatus: ma ei saavutanud jälle seda, mida tahtsin! Väga raske oli Sparkile täpselt öelda, kuidas andmed olid jaotatud. Ja isegi kui ma seda tegin, selgus, et vaheseinu oli liiga palju (95 tuhat) ja kui ma seda kasutasin coalesce vähendas nende arvu mõistliku piirini, see hävitas mu partitsiooni. Olen kindel, et seda saab parandada, kuid pärast paaripäevast otsimist ei leidnud ma lahendust. Lõpuks tegin Sparkis kõik ülesanded valmis, kuigi see võttis veidi aega ja mu poolitatud Parqueti failid ei olnud väga väikesed (~200 KB). Andmed olid aga seal, kus vaja.

25 TB sõelumine AWK ja R abil
Liiga väike ja ebaühtlane, imeline!

Kohalike Sparki päringute testimine

Mida ma olen õppinud: Sparkil on lihtsate ülesannete lahendamisel liiga palju üldkulusid.

Andmeid nutikas vormingus alla laadides sain katsetada kiirust. Seadistage kohaliku Spark-serveri käitamiseks R-skript ja laadige seejärel määratud Parqueti rühmasalvestusest (salvest) Sparki andmeraam. Üritasin kõiki andmeid laadida, kuid ei suutnud Sparklyril partitsiooni tuvastada.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Täitmiseks kulus 29,415 sekundit. Palju parem, kuid mitte liiga hea millegi massiliseks testimiseks. Lisaks ei saanud ma vahemällu salvestamisega asju kiirendada, sest kui proovisin andmeraami mällu vahemällu salvestada, jooksis Spark alati kokku, isegi kui eraldasin alla 50 kaaluvale andmekogumile rohkem kui 15 GB mälu.

Tagasi AWK-sse

Mida ma olen õppinud: AWK assotsiatiivsed massiivid on väga tõhusad.

Sain aru, et suudan saavutada suuremaid kiirusi. See jäi mulle imeliselt meelde AWK õpetus Bruce Barnettilt Lugesin lahedast funktsioonist nimega "assotsiatiivsed massiivid" Sisuliselt on need võtme-väärtuste paarid, mida AWK-s millegipärast nimetati erinevalt ja seetõttu ma millegipärast ei mõelnud nendele eriti. Roman Cheplyaka tuletas meelde, et mõiste "assotsiatiivsed massiivid" on palju vanem kui "võtme-väärtuse paar". Isegi kui sa otsige Google Ngramist võtmeväärtust, te seda terminit seal ei näe, kuid leiate assotsiatiivsed massiivid! Lisaks seostatakse võtme-väärtuste paari kõige sagedamini andmebaasidega, mistõttu on palju mõttekam võrrelda seda räsikaardiga. Sain aru, et saan neid assotsiatiivseid massiive kasutada oma SNP-de sidumiseks prügitabeli ja toorandmetega ilma Sparki kasutamata.

Selleks kasutasin AWK skriptis plokki BEGIN. See on koodiosa, mis käivitatakse enne, kui esimene andmerida skripti põhiosasse edastatakse.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Meeskond while(getline...) laadis kõik read CSV-rühmast (bin), määrake esimene veerg (SNP nimi) assotsiatiivse massiivi võtmeks bin ja väärtuseks teine ​​väärtus (rühm). Siis blokis { }, mis käivitatakse põhifaili kõigil ridadel, iga rida saadetakse väljundfaili, mis saab sõltuvalt selle rühmast (bin) kordumatu nime: ..._bin_"bin[$1]"_....

Muutujad batch_num и chunk_id sobitas konveieri pakutavate andmetega, vältides võistlustingimust ja iga täitmislõimi töötamist parallel, kirjutas oma ainulaadsesse faili.

Kuna ma hajutasin kõik algandmed kromosoomide kaustadesse, mis jäid üle eelmisest AWK-ga seotud katsest, siis nüüd sain kirjutada teise Bashi skripti, et töödelda korraga ühte kromosoomi ja saata S3-le sügavamalt jaotatud andmeid.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skriptil on kaks osa parallel.

Esimeses jaotises loetakse andmed kõikidest failidest, mis sisaldavad teavet soovitud kromosoomi kohta, seejärel jaotatakse need andmed lõimedesse, mis jaotavad failid vastavatesse rühmadesse (bin). Võistlustingimuste vältimiseks, kui samasse faili kirjutab mitu lõime, edastab AWK failinimed, et kirjutada andmeid erinevatesse kohtadesse, nt. chr_10_bin_52_batch_2_aa.csv. Selle tulemusena tekib kettale palju väikeseid faile (selleks kasutasin terabaidiseid EBS-i köiteid).

Konveier teisest sektsioonist parallel läbib rühmad (bin) ja ühendab nende üksikud failid ühiseks CSV-ks c catja saadab need seejärel ekspordiks.

Ringhääling R?

Mida ma olen õppinud: Võite ühendust võtta stdin и stdout R-skriptist ja seetõttu kasutada seda konveieris.

Võib-olla olete oma Bashi skriptis seda rida märganud: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... See tõlgib kõik ühendatud rühmafailid (bin) allolevasse R-skripti. {} on eriline tehnika parallel, mis lisab kõik andmed, mille ta saadab määratud voogu, otse käsku endasse. Võimalus {#} pakub ainulaadset lõime ID-d ja {%} tähistab töökoha numbrit (korduv, kuid mitte kunagi samaaegselt). Kõigi valikute loendi leiate siit dokumentatsioon.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kui muutuja file("stdin") edastatakse readr::read_csv, laaditakse R-skripti tõlgitud andmed raami, mis on seejärel kujul .rds- faili kasutamine aws.s3 kirjutatakse otse S3-le.

RDS on midagi Parqueti noorema versiooni sarnast, ilma kõlarite hoiustamise nüanssideta.

Pärast Bashi skripti lõpetamist sain komplekti .rds-failid, mis asuvad S3-s, mis võimaldas mul kasutada tõhusat tihendamist ja sisseehitatud tüüpe.

Vaatamata piduri R kasutamisele toimis kõik väga kiiresti. Pole üllatav, et R-i osad, mis loevad ja kirjutavad andmeid, on väga optimeeritud. Pärast ühe keskmise suurusega kromosoomi testimist lõpetati töö C5n.4xl eksemplaris umbes kahe tunniga.

S3 piirangud

Mida ma olen õppinud: Tänu nutika tee juurutamisele saab S3 hakkama paljude failidega.

Ma olin mures, kas S3 saab hakkama paljude failidega, mis talle edastati. Ma saaksin failinimed mõistlikuks muuta, aga kuidas S3 neid otsiks?

25 TB sõelumine AWK ja R abil
S3 kaustad on lihtsalt näitamiseks, tegelikult ei ole süsteem sümbolist huvitatud /. S3 KKK lehelt.

Näib, et S3 tähistab konkreetse faili teed lihtsa võtmena mingis räsitabelis või dokumendipõhises andmebaasis. Ämbrit võib pidada tabeliks ja faile võib pidada selle tabeli kirjeteks.

Kuna kiirus ja tõhusus on Amazoni kasumi teenimisel olulised, pole üllatav, et see võtme-faili-tee süsteem on hullult optimeeritud. Üritasin leida tasakaalu: et ma ei peaks palju hankima taotlusi, vaid et päringud täidetaks kiiresti. Selgus, et kõige parem on teha umbes 20 tuhat prügikasti faili. Arvan, et kui jätkame optimeerimist, võime saavutada kiiruse tõusu (näiteks tehes spetsiaalse ämbri ainult andmete jaoks, vähendades nii otsingutabeli suurust). Edasisteks katseteks polnud aga aega ega raha.

Aga ristühilduvus?

Mida ma õppisin: raisatud aja põhjus number üks on salvestusmeetodi enneaegne optimeerimine.

Siinkohal on väga oluline endalt küsida: "Miks kasutada patenteeritud failivormingut?" Põhjus peitub laadimiskiiruses (gzipitud CSV-failide laadimine võttis 7 korda kauem aega) ja ühilduvuses meie töövoogudega. Võin uuesti kaaluda, kas R saab hõlpsasti laadida Parketi (või Noole) faile ilma Sparki koormuseta. Kõik meie laboris kasutavad R-i ja kui mul on vaja andmed teise vormingusse teisendada, on mul endiselt algsed tekstiandmed, nii et saan konveieri uuesti käivitada.

Tööjaotus

Mida ma olen õppinud: Ärge proovige töid käsitsi optimeerida, laske seda teha arvutil.

Olen silunud ühe kromosoomi töövoo, nüüd pean töötlema kõiki teisi andmeid.
Tahtsin tõsta mitu EC2 eksemplari teisendamiseks, kuid samal ajal kartsin saada erinevate töötlemistööde vahel väga tasakaalustamata koormust (nagu Spark kannatas tasakaalustamata partitsioonide tõttu). Lisaks ei olnud ma huvitatud ühe eksemplari tõstmisest kromosoomi kohta, kuna AWS-i kontode jaoks on vaikimisi 10 eksemplari piirang.

Seejärel otsustasin töötlemistööde optimeerimiseks kirjutada R-s skripti.

Esiteks palusin S3-l arvutada, kui palju salvestusruumi iga kromosoom hõivas.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Seejärel kirjutasin funktsiooni, mis võtab kogu suuruse, segab kromosoomide järjekorda, jagab need rühmadesse num_jobs ja ütleb teile, kui erinevad on kõigi töötlemistööde suurused.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Seejärel jooksin purrr abil läbi tuhat segamist ja valisin välja parima.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Nii et ma jõudsin ülesannete komplekti, mis olid väga sarnase suurusega. Siis ei jäänud muud üle, kui mu eelmine Bashi stsenaarium suure ahelaga kokku keerata for. Selle optimeerimise kirjutamine võttis aega umbes 10 minutit. Ja see on palju vähem, kui kulutaksin ülesannete käsitsi loomisele, kui need oleksid tasakaalustamata. Seetõttu arvan, et mul oli selle esialgse optimeerimisega õigus.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Lisan lõppu käsu shutdown:

sudo shutdown -h now

... ja kõik läks korda! AWS-i CLI-d kasutades tõstsin eksemplare, kasutades valikut user_data andis neile töötlemiseks nende ülesannete Bashi skriptid. Need jooksid ja lülitusid automaatselt välja, nii et ma ei maksnud täiendava töötlemisvõimsuse eest.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Hakkame pakkima!

Mida ma olen õppinud: API peaks kasutusmugavuse ja paindlikkuse huvides olema lihtne.

Lõpuks sain andmed õigesse kohta ja vormi. Jäi vaid andmete kasutamise protsessi nii palju kui võimalik lihtsustada, et kolleegidele oleks lihtsam. Tahtsin teha päringute loomiseks lihtsa API. Kui otsustan tulevikus vahetada .rds parketifailidele, siis peaks see probleem olema minu, mitte kolleegide jaoks. Selleks otsustasin teha sisemise R-paketi.

Koostage ja dokumenteerige väga lihtne pakett, mis sisaldab vaid mõnda funktsiooni ümber korraldatud andmetele juurdepääsu funktsiooni get_snp. Tegin ka oma kolleegidele kodulehe pkgdown, et nad näeksid hõlpsasti näiteid ja dokumentatsiooni.

25 TB sõelumine AWK ja R abil

Nutikas vahemälu

Mida ma olen õppinud: Kui teie andmed on hästi ette valmistatud, on vahemällu salvestamine lihtne!

Kuna üks peamistest töövoogudest rakendas sama analüüsimudelit SNP paketile, otsustasin kasutada binningut enda huvides. Andmete edastamisel SNP kaudu lisatakse tagastatavale objektile kogu grupist (bin) pärinev teave. See tähendab, et vanad päringud võivad (teoreetiliselt) kiirendada uute päringute töötlemist.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Paketi koostamisel võtsin läbi palju võrdlusuuringuid, et võrrelda kiirust erinevate meetodite kasutamisel. Soovitan seda mitte tähelepanuta jätta, sest mõnikord on tulemused ootamatud. Näiteks, dplyr::filter oli palju kiirem kui ridade hõivamine indekseerimispõhise filtreerimise abil ja ühe veeru toomine filtreeritud andmeraamist oli palju kiirem kui indekseerimise süntaksi kasutamine.

Pange tähele, et objekt prev_snp_results sisaldab võtit snps_in_bin. See on kõigi unikaalsete SNP-de massiiv rühmas (bin), mis võimaldab teil kiiresti kontrollida, kas teil on juba eelmise päringu andmeid. Samuti on selle koodi abil lihtne rühmas (salvestis) olevate SNP-de vahel ringi käia:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Järeldused

Nüüd saame (ja oleme tõsiselt hakanud) käivitama mudeleid ja stsenaariume, mis olid meile varem kättesaamatud. Parim on see, et mu laborikaaslased ei pea mõtlema tüsistustele. Neil on lihtsalt funktsioon, mis töötab.

Ja kuigi pakett säästab neid detaile, püüdsin andmevormingu teha piisavalt lihtsaks, et nad saaksid sellest aru, kui ma homme ootamatult kaoksin...

Kiirus on märgatavalt kasvanud. Tavaliselt skaneerime funktsionaalselt olulisi genoomi fragmente. Varem me seda teha ei saanud (see osutus liiga kalliks), kuid nüüd kulub tänu grupi (bin) struktuurile ja vahemällu salvestamisele ühe SNP päring keskmiselt vähem kui 0,1 sekundit ja andmekasutus on nii madalad, et S3 kulud on maapähklid.

Järeldus

See artikkel ei ole üldse juhend. Lahendus osutus individuaalseks ja peaaegu kindlasti mitte optimaalseks. Pigem on see reisikiri. Ma tahan, et teised mõistaksid, et sellised otsused ei paista täielikult pähe, vaid on katse-eksituse tulemus. Samuti, kui otsite andmeteadlast, pidage meeles, et nende tööriistade tõhus kasutamine nõuab kogemusi ja kogemused maksavad raha. Olen õnnelik, et mul oli vahendeid maksta, kuid paljudel teistel, kes saavad sama tööga minust paremini hakkama, ei teki rahapuudusel kunagi võimalust isegi proovida.

Suurandmete tööriistad on mitmekülgsed. Kui teil on aega, saate peaaegu kindlasti kirjutada kiirema lahenduse, kasutades nutikaid andmete puhastamise, salvestamise ja eraldamise tehnikaid. Lõppkokkuvõttes taandub see kulude-tulude analüüsile.

Mida ma õppisin:

  • pole odavat viisi 25 TB korraga sõelumiseks;
  • olge oma Parketifailide suuruse ja nende korraldusega ettevaatlik;
  • Sparki vaheseinad peavad olema tasakaalustatud;
  • Üldiselt ärge kunagi proovige teha 2,5 miljonit partitsiooni;
  • Sorteerimine on endiselt keeruline, nagu ka Sparki seadistamine;
  • mõnikord nõuavad eriandmed erilahendusi;
  • Säde koondamine on kiire, kuid eraldamine on endiselt kallis;
  • ära maga, kui sulle põhitõdesid õpetatakse, arvatavasti lahendas keegi sinu probleemi juba 1980ndatel;
  • gnu parallel - see on maagiline asi, kõik peaksid seda kasutama;
  • Sparkile meeldivad tihendamata andmed ja talle ei meeldi partitsioonide kombineerimine;
  • Sparkil on lihtsate probleemide lahendamisel liiga palju üldkulusid;
  • AWK assotsiatiivsed massiivid on väga tõhusad;
  • saate ühendust võtta stdin и stdout R-skriptist ja seetõttu kasutada seda konveieris;
  • Tänu nutika tee juurutamisele suudab S3 töödelda paljusid faile;
  • Ajaraiskamise peamiseks põhjuseks on salvestusmeetodi enneaegne optimeerimine;
  • ärge proovige ülesandeid käsitsi optimeerida, laske seda teha arvutil;
  • API peaks kasutamise lihtsuse ja paindlikkuse huvides olema lihtne;
  • Kui teie andmed on hästi ette valmistatud, on vahemällu salvestamine lihtne!

Allikas: www.habr.com

Lisa kommentaar