Parsing ta '25TB bl-użu ta' AWK u R

Parsing ta '25TB bl-użu ta' AWK u R
Kif taqra dan l-artikolu: Niskuża ruħi għat-test huwa daqshekk twil u kaotiku. Biex tiffranka l-ħin, nibda kull kapitlu b’introduzzjoni “Dak li Tgħallimt”, li tiġbor fil-qosor l-essenza tal-kapitolu f’sentenza waħda jew tnejn.

"Urini biss is-soluzzjoni!" Jekk trid tara biss minn fejn ġejt, imbagħad aqbeż għall-kapitolu "Insir Aktar Inventiv", imma naħseb li huwa aktar interessanti u utli li taqra dwar il-falliment.

Reċentement kont inkarigat li nwaqqaf proċess għall-ipproċessar ta 'volum kbir ta' sekwenzi tad-DNA mhux maħduma (teknikament ċippa SNP). Il-ħtieġa kienet li tinkiseb malajr dejta dwar post ġenetiku partikolari (imsejjaħ SNP) għal immudellar sussegwenti u kompiti oħra. Bl-użu ta 'R u AWK, stajt inaddaf u norganizza d-dejta b'mod naturali, u nħaffef ħafna l-ipproċessar tal-mistoqsijiet. Dan ma kienx faċli għalija u kien jeħtieġ bosta iterazzjonijiet. Dan l-artiklu jgħinek tevita xi wħud mill-iżbalji tiegħi u nuruk dak li spiċċajt bih.

L-ewwel, xi spjegazzjonijiet introduttorji.

Data

Iċ-ċentru tal-ipproċessar tal-informazzjoni ġenetika tal-università tagħna pprovdilna data fil-forma ta 'TSV ta' 25 TB. Irċevejthom maqsuma f'5 pakketti, ikkompressati b'Gzip, li kull wieħed minnhom kien fih madwar 240 fajl ta' erba' gigabyte. Kull ringiela kien fiha data għal SNP wieħed minn individwu wieħed. B'kollox, ġiet trażmessa dejta dwar ~ 2,5 miljun SNP u ~ 60 elf ruħ. Minbarra l-informazzjoni SNP, il-fajls kien fihom bosta kolonni b'numri li jirriflettu diversi karatteristiċi, bħall-intensità tal-qari, il-frekwenza ta 'alleli differenti, eċċ. B’kollox kien hemm madwar 30 kolonna b’valuri uniċi.

Goal

Bħal kull proġett ta' ġestjoni tad-dejta, l-iktar ħaġa importanti kienet li jiġi ddeterminat kif se tintuża d-dejta. F'dan il-każ l-aktar se nagħżlu mudelli u flussi tax-xogħol għal SNP ibbażati fuq SNP. Jiġifieri, ser ikollna bżonn biss data fuq SNP wieħed kull darba. Kelli nitgħallem kif nirkupra r-rekords kollha assoċjati ma 'wieħed mill-2,5 miljun SNPs faċilment, malajr u bl-irħis kemm jista' jkun.

Kif ma tagħmilx dan

Biex tikkwota cliché adattat:

Ma fallejtx elf darba, għadni kif skoprejt elf mod kif nevita l-parsing ta 'mazz ta' dejta f'format faċli għall-mistoqsijiet.

L-ewwel ipprova

X'tgħallimt: M'hemm l-ebda mod irħis biex jiġu analizzati 25 TB kull darba.

Wara li ħadt il-kors "Metodi Avvanzati għall-Ipproċessar ta 'Data Big" fl-Università ta' Vanderbilt, kont ċert li t-trick kien fil-borża. Probabbilment se tieħu siegħa jew tnejn biex twaqqaf is-server Hive biex jgħaddi mid-dejta kollha u jirrapporta r-riżultat. Peress li d-dejta tagħna hija maħżuna fl-AWS S3, użajt is-servizz Athena, li jippermettilek tapplika mistoqsijiet ta' Hive SQL għal data S3. M'għandekx bżonn twaqqaf/tqajjem cluster Hive, u tħallas ukoll biss għad-dejta li qed tfittex.

Wara li wrejt lil Athena d-dejta tiegħi u l-format tagħha, għamilt xi testijiet b'mistoqsijiet bħal dawn:

select * from intensityData limit 10;

U malajr irċieva riżultati strutturati tajjeb. Lest.

Sakemm ippruvajna nużaw id-dejta fix-xogħol tagħna...

Intalabni niġbed l-informazzjoni SNP kollha biex nittestja l-mudell fuqu. Ħadejt il-mistoqsija:


select * from intensityData 
where snp = 'rs123456';

...u bdew jistennew. Wara tmien minuti u aktar minn 4 TB ta' dejta mitluba, irċevejt ir-riżultat. Athena tiċċarġja skont il-volum tad-dejta misjuba, $5 għal kull terabyte. Allura din it-talba waħda tiswa $20 u tmien minuti ta 'stennija. Biex tmexxi l-mudell fuq id-dejta kollha, kellna nistennew 38 sena u nħallsu $ 50 miljun. Ovvjament, dan ma kienx adattat għalina.

Kien meħtieġ li tuża Parquet...

X'tgħallimt: Oqgħod attent mad-daqs tal-fajls tal-Parquet tiegħek u l-organizzazzjoni tagħhom.

L-ewwel ippruvajt nirranġa s-sitwazzjoni billi kkonverti t-TSVs kollha għal Fajls tal-parkè. Huma konvenjenti biex jaħdmu ma 'settijiet ta' dejta kbar minħabba li l-informazzjoni fihom hija maħżuna f'forma kolonni: kull kolonna tinsab fis-segment tal-memorja/diska tagħha stess, b'kuntrast mal-fajls tat-test, li fihom ringieli fihom elementi ta 'kull kolonna. U jekk għandek bżonn issib xi ħaġa, imbagħad aqra biss il-kolonna meħtieġa. Barra minn hekk, kull fajl jaħżen firxa ta 'valuri f'kolonna, għalhekk jekk il-valur li qed tfittex ma jkunx fil-firxa tal-kolonna, Spark mhux se jaħli ħin jiskenja l-fajl kollu.

I dam kompitu sempliċi Kolla AWS biex jikkonvertu t-TSVs tagħna għal Parquet u niżel il-fajls il-ġodda f'Athena. Ħa madwar 5 sigħat. Imma meta ħadt it-talba, ħa madwar l-istess ammont ta 'ħin u ftit inqas flus biex tlesti. Il-fatt hu li Spark, qed jipprova jottimizza l-kompitu, sempliċiment żppakkjat biċċa TSV waħda u poġġieha fil-biċċa tal-Parquet tagħha stess. U minħabba li kull biċċa kienet kbira biżżejjed biex iżżomm ir-rekords sħaħ ta 'ħafna nies, kull fajl kien fih l-SNPs kollha, għalhekk Spark kellha tiftaħ il-fajls kollha biex tiġbed l-informazzjoni li kellha bżonn.

Interessanti, it-tip ta 'kompressjoni default (u rakkomandat) ta' Parquet, snappy, mhuwiex splittable. Għalhekk, kull eżekutur kien imwaħħal fuq il-kompitu li jneħħi u jniżżel is-sett tad-dejta sħiħ ta '3,5 GB.

Parsing ta '25TB bl-użu ta' AWK u R

Ejja nifhmu l-problema

X'tgħallimt: Issortjar huwa diffiċli, speċjalment jekk id-dejta titqassam.

Deherli li issa fhimt l-essenza tal-problema. Kelli bżonn biss li nissortja d-dejta skont il-kolonna SNP, mhux min-nies. Imbagħad diversi SNPs se jinħażnu f'biċċa ta 'dejta separata, u mbagħad il-funzjoni "intelliġenti" ta' Parquet "miftuħa biss jekk il-valur ikun fil-medda" se turi ruħha fil-glorja kollha tagħha. Sfortunatament, l-issortjar ta 'biljuni ta' ringieli mxerrda madwar cluster wera li kien biċċa xogħol diffiċli.

AWS żgur ma tridx toħroġ rifużjoni minħabba r-raġuni "Jien student distratt". Wara li għamilt issortjar fuq Amazon Glue, dam għal jumejn u ġġarraf.

Xi ngħidu dwar il-qsim?

X'tgħallimt: Il-ħitan fi Spark għandhom ikunu bbilanċjati.

Imbagħad ħriġt bl-idea ta 'qsim tad-dejta fil-kromożomi. Hemm 23 minnhom (u diversi aktar jekk tqis id-DNA mitokondrijali u r-reġjuni mhux mappjati).
Dan jippermettilek taqsam id-data f'biċċiet iżgħar. Jekk iżżid linja waħda biss mal-funzjoni ta 'esportazzjoni ta' Spark fl-iskrittura Glue partition_by = "chr", allura d-dejta għandha tinqasam f'bramel.

Parsing ta '25TB bl-użu ta' AWK u R
Il-ġenoma jikkonsisti minn frammenti numerużi msejħa kromożomi.

Sfortunatament, ma ħadimx. Il-kromożomi għandhom daqsijiet differenti, li jfisser ammonti differenti ta 'informazzjoni. Dan ifisser li l-kompiti li Spark bagħat lill-ħaddiema ma kinux ibbilanċjati u tlestew bil-mod minħabba li xi nodi spiċċaw kmieni u kienu inattivi. Madankollu, il-kompiti tlestew. Iżda meta talbet għal SNP wieħed, l-iżbilanċ reġa' kkawża problemi. L-ispiża tal-ipproċessar ta 'SNPs fuq kromożomi akbar (jiġifieri, fejn irridu nġibu d-dejta) naqset biss b'madwar fattur ta' 10. Ħafna, iżda mhux biżżejjed.

X'jiġri jekk naqsmuh f'partijiet saħansitra iżgħar?

X'tgħallimt: Qatt ma tipprova tagħmel 2,5 miljun diviżorju.

Iddeċidejt li mmur kollha barra u qasmet kull SNP. Dan żgura li l-ħitan kienu ta 'daqs ugwali. KIENET IDEA ĦAŻNA. Użajt Glue u żidt linja innoċenti partition_by = 'snp'. Il-kompitu beda u beda jwettaq. Ġurnata wara ċċekkjajt u rajt li għad ma kien hemm xejn miktub fuq S3, għalhekk qtilt il-kompitu. Jidher li Glue kienet qed tikteb fajls intermedji f'post moħbi f'S3, ħafna fajls, forsi ftit miljuni. Bħala riżultat, l-iżball tiegħi sewa aktar minn elf dollaru u ma għoġbokx lill-parrinu tiegħi.

It-tqassim + l-issortjar

X'tgħallimt: L-issortjar għadu diffiċli, kif ukoll l-irfinar ta 'Spark.

L-aħħar tentattiv tiegħi fil-qsim involva lili nqassam il-kromożomi u mbagħad nissortja kull partizzjoni. Fit-teorija, dan iħaffef kull mistoqsija minħabba li d-dejta SNP mixtieqa kellha tkun fi ftit biċċiet tal-Parquet f'firxa partikolari. Sfortunatament, l-issortjar tad-dejta anke diviżorja rriżulta li kien kompitu diffiċli. Bħala riżultat, qlibt għal EMR għal cluster personalizzat u użajt tmien istanzi qawwija (C5.4xl) u Sparklyr biex noħloq fluss tax-xogħol aktar flessibbli...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...madankollu, il-kompitu kien għadu ma tlestiex. I kkonfiguratha b'modi differenti: żiedet l-allokazzjoni tal-memorja għal kull eżekutur tal-mistoqsijiet, użajt nodi b'ammont kbir ta 'memorja, użajt varjabbli ta' xandir (varjabbli ta 'xandir), iżda kull darba dawn irriżultaw li kienu nofs miżuri, u gradwalment l-eżekuturi bdew biex ifalli sakemm waqaf kollox.

Jien insir aktar kreattiv

X'tgħallimt: Xi drabi data speċjali teħtieġ soluzzjonijiet speċjali.

Kull SNP għandu valur ta' pożizzjoni. Dan huwa numru li jikkorrispondi għan-numru ta 'bażijiet tul il-kromożomi tiegħu. Dan huwa mod sabiħ u naturali kif norganizzaw id-dejta tagħna. Għall-ewwel ridt naqsam skond ir-reġjuni ta 'kull kromożoma. Pereżempju, pożizzjonijiet 1 - 2000, 2001 - 4000, eċċ. Iżda l-problema hija li l-SNPs mhumiex imqassma b'mod ugwali madwar il-kromożomi, għalhekk id-daqsijiet tal-grupp għalhekk se jvarjaw ħafna.

Parsing ta '25TB bl-użu ta' AWK u R

Bħala riżultat, wasalt għal tqassim tal-pożizzjonijiet f'kategoriji (rank). Bl-użu tad-dejta diġà mniżżla, għamilt talba biex nikseb lista ta 'SNPs uniċi, il-pożizzjonijiet u l-kromożomi tagħhom. Imbagħad għamilt id-dejta f'kull kromożoma u ġbart SNPs fi gruppi (bin) ta 'daqs partikolari. Ejja ngħidu 1000 SNP kull wieħed. Dan tani r-relazzjoni SNP-to-group-per-kromosome.

Fl-aħħar, għamilt gruppi (bin) ta '75 SNPs, ir-raġuni se tiġi spjegata hawn taħt.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

L-ewwel ipprova bi Spark

X'tgħallimt: L-aggregazzjoni tal-ispark hija mgħaġġla, iżda l-qsim għadu jiswa ħafna flus.

Ridt naqra dan il-qafas tad-dejta żgħir (2,5 miljun ringieli) fi Spark, għaqqadha mad-dejta mhux ipproċessata, u mbagħad jaqsamha bil-kolonna li għadha kif ġiet miżjuda bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

jien kont sdf_broadcast(), għalhekk Spark jaf li għandu jibgħat il-qafas tad-dejta lin-nodi kollha. Dan huwa utli jekk id-dejta hija ta' daqs żgħir u meħtieġa għall-kompiti kollha. Inkella, Spark jipprova jkun intelliġenti u jqassam id-dejta kif meħtieġ, li jista 'jikkawża tnaqqis.

U għal darb'oħra, l-idea tiegħi ma ħadmitx: il-kompiti ħadmu għal xi żmien, lestew l-unjoni, u mbagħad, bħall-eżekuturi mnedija mill-qsim, bdew ifallu.

Żieda AWK

X'tgħallimt: M'għandekx torqod meta tkun qed tiġi mgħallma l-affarijiet bażiċi. Żgur li xi ħadd diġà solva l-problema tiegħek lura fis-snin tmenin.

Sa dan il-punt, ir-raġuni għall-fallimenti kollha tiegħi ma 'Spark kienet it-taqlib tad-dejta fil-cluster. Forsi s-sitwazzjoni tista 'titjieb bi trattament minn qabel. Iddeċidejt li nipprova naqsam id-dejta tat-test mhux ipproċessat f'kolonni ta 'kromożomi bit-tama li nipprovdi lil Spark b'dejta "diviżorja minn qabel".

Fittixt fuq StackOverflow kif naqsam bil-valuri tal-kolonna u sibt tweġiba daqshekk kbira. Bl-AWK tista' taqsam fajl test bil-valuri tal-kolonna billi tikteb fi skript aktar milli tibgħat ir-riżultati lil stdout.

Jien ktibt skript Bash biex nipprovaha. Niżżel wieħed mit-TSVs ippakkjati, imbagħad żppakkjat bl-użu gzip u mibgħuta lil awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ħadem!

Mili tal-qlub

X'tgħallimt: gnu parallel - hija ħaġa maġika, kulħadd għandu jużaha.

Is-separazzjoni kienet pjuttost bil-mod u meta bdejt htopbiex jiċċekkja l-użu ta 'istanza EC2 qawwija (u għalja), irriżulta li kont qed nuża qalba waħda biss u madwar 200 MB ta' memorja. Biex issolvi l-problema u ma nitilfux ħafna flus, kellna nifhmu kif nipparallelizzaw ix-xogħol. Fortunatament, fi ktieb assolutament aqwa Xjenza tad-Data fil-Linja tal-Kmand Sibt kapitlu minn Jeron Janssens dwar il-parallelizzazzjoni. Minnha tgħallimt dwarha gnu parallel, metodu flessibbli ħafna għall-implimentazzjoni tal-multithreading f'Unix.

Parsing ta '25TB bl-użu ta' AWK u R
Meta bdejt il-qsim bl-użu tal-proċess il-ġdid, kollox kien tajjeb, iżda kien għad hemm konġestjoni - it-tniżżil ta 'oġġetti S3 fuq id-disk ma kienx mgħaġġel ħafna u ma kienx parallelizzat għal kollox. Biex nirranġa dan, għamilt dan:

  1. Sibt li huwa possibbli li timplimenta l-istadju tat-tniżżil S3 direttament fil-pipeline, u telimina kompletament il-ħażna intermedja fuq disk. Dan ifisser li nista' nevita li nikteb dejta mhux ipproċessata fuq disk u nuża ħażna saħansitra iżgħar, u għalhekk irħas, fuq AWS.
  2. tim aws configure set default.s3.max_concurrent_requests 50 żied ħafna n-numru ta 'ħjut li juża AWS CLI (b'mod awtomatiku hemm 10).
  3. Qlibt għal eżempju EC2 ottimizzat għall-veloċità tan-netwerk, bl-ittra n fl-isem. Sibt li t-telf tal-qawwa tal-ipproċessar meta tuża n-istanzi huwa aktar milli kkumpensat miż-żieda fil-veloċità tat-tagħbija. Għall-biċċa l-kbira tal-kompiti użajt c5n.4xl.
  4. Mibdul gzip fuq pigz, din hija għodda gzip li tista 'tagħmel affarijiet kesħin biex titkompla l-kompitu inizjalment mhux parallelizzat tad-dekompressjoni tal-fajls (dan għen l-inqas).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Dawn il-passi huma kkombinati ma 'xulxin biex kollox jaħdem malajr ħafna. Billi nżid il-veloċitajiet tat-tniżżil u nelimina l-kitba tad-disk, issa stajt nipproċessa pakkett ta’ 5 terabyte fi ftit sigħat biss.

Dan it-tweet kellu jsemmi 'TSV'. Alas.

Bl-użu ta' data parsed ġdida

X'tgħallimt: Spark jħobb data mhux kompressata u ma jħobbx jgħaqqad diviżorji.

Issa d-dejta kienet f'S3 f'format mhux ippakkjat (aqra: maqsuma) u semi-organizzat, u stajt nirritorna għal Spark mill-ġdid. Tistennieni sorpriża: erġajt naqas milli nikseb dak li ridt! Kien diffiċli ħafna li tgħid lil Spark eżattament kif id-data kienet maqsuma. U anke meta għamilt dan, irriżulta li kien hemm wisq diviżorji (95 elf), u meta użajt coalesce naqqsu n-numru tagħhom għal limiti raġonevoli, dan qered il-qsim tiegħi. Jien ċert li dan jista 'jiġi rranġat, iżda wara ftit jiem ta' tfittxija ma stajtx insib soluzzjoni. Eventwalment spiċċajt il-kompiti kollha fi Spark, għalkemm dam ftit u l-fajls tal-Parquet maqsum tiegħi ma kinux żgħar ħafna (~ 200 KB). Madankollu, id-dejta kienet fejn kienet meħtieġa.

Parsing ta '25TB bl-użu ta' AWK u R
Żgħir wisq u irregolari, mill-isbaħ!

Ittestjar mistoqsijiet Spark lokali

X'tgħallimt: Spark għandu overhead wisq meta jsolvi problemi sempliċi.

Billi niżżel id-dejta f'format għaqlija, stajt nittestja l-veloċità. Stabbilixxi script R biex tħaddem server Spark lokali, u mbagħad għabbet qafas tad-dejta Spark mill-ħażna tal-grupp tal-Parquet speċifikat (bin). Ippruvajt tagħbija d-dejta kollha iżda ma stajtx tikseb Sparklyr biex jirrikonoxxi l-qsim.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

L-eżekuzzjoni ħadet 29,415 sekondi. Ħafna aħjar, iżda mhux tajjeb wisq għall-ittestjar tal-massa ta 'xi ħaġa. Barra minn hekk, ma stajtx nħaffef l-affarijiet bil-caching għax meta ppruvajt nibda fil-cache frame tad-dejta fil-memorja, Spark dejjem iġġarraf, anke meta allokajt aktar minn 50 GB ta 'memorja għal dataset li kien jiżen inqas minn 15.

Ritorn lejn AWK

X'tgħallimt: Arrays assoċjati fl-AWK huma effiċjenti ħafna.

Irrealizzajt li stajt nikseb veloċitajiet ogħla. Ftakart li b'mod mill-isbaħ Tutorial AWK minn Bruce Barnett Qrajt dwar karatteristika friska msejħa "matriċi assoċjati" Essenzjalment, dawn huma pari ta 'valur ewlieni, li għal xi raġuni ssejħu b'mod differenti fl-AWK, u għalhekk b'xi mod ma ħsibtx ħafna dwarhom. Ruman Cheplyaka fakkar li t-terminu "arrays assoċjati" huwa ħafna eqdem mit-terminu "par ta 'valur-ċavetta". Anke jekk inti fittex iċ-ċavetta-valur fil-Google Ngram, mhux se tara dan it-terminu hemmhekk, iżda ssib matriċi assoċjati! Barra minn hekk, il-"par ta 'valur-ċavetta" ħafna drabi huwa assoċjat ma' databases, għalhekk jagħmel ħafna aktar sens li tqabbelha ma 'hashmap. Irrealizzajt li stajt nuża dawn l-arrays assoċjati biex nassoċja l-SNPs tiegħi ma 'tabella bin u dejta mhux ipproċessata mingħajr ma nuża Spark.

Biex tagħmel dan, fl-iskrittura AWK użajt il-blokk BEGIN. Din hija biċċa kodiċi li tiġi eżegwita qabel ma l-ewwel linja tad-dejta tiġi mgħoddija lill-korp ewlieni tal-iskrittura.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) mgħobbija r-ringieli kollha mill-grupp CSV (bin), issettja l-ewwel kolonna (isem SNP) bħala ċ-ċavetta għall-firxa assoċjattiva bin u t-tieni valur (grupp) bħala l-valur. Imbagħad fil-blokk { }, li hija esegwita fuq il-linji kollha tal-fajl prinċipali, kull linja tintbagħat lill-fajl tal-output, li jirċievi isem uniku skont il-grupp tiegħu (bin): ..._bin_"bin[$1]"_....

Varjabbli batch_num и chunk_id qabbel id-dejta pprovduta mill-pipeline, tevita kundizzjoni ta 'tellieqa, u kull ħajt ta' eżekuzzjoni għaddej parallel, kiteb fil-fajl uniku tiegħu stess.

Peress li xerred id-dejta mhux ipproċessata kollha f'folders fuq il-kromożomi li fadal mill-esperiment preċedenti tiegħi bl-AWK, issa stajt nikteb skript ieħor Bash biex nipproċessa kromożoma wieħed kull darba u nibgħat dejta maqsuma aktar profonda lil S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

L-iskrittura għandha żewġ taqsimiet parallel.

Fl-ewwel taqsima, id-dejta tinqara mill-fajls kollha li fihom informazzjoni dwar il-kromożoma mixtieqa, imbagħad din id-dejta titqassam fuq ħjut, li jqassmu l-fajls fil-gruppi xierqa (bin). Biex jiġu evitati kundizzjonijiet tat-tellieqa meta ħajt multipli jiktbu fl-istess fajl, AWK jgħaddi l-ismijiet tal-fajls biex jikteb id-dejta f'postijiet differenti, eż. chr_10_bin_52_batch_2_aa.csv. Bħala riżultat, ħafna fajls żgħar huma maħluqa fuq id-diska (għal dan użajt volumi EBS terabyte).

Conveyor mit-tieni taqsima parallel jgħaddi mill-gruppi (bin) u jgħaqqad il-fajls individwali tagħhom f'CSV komuni c catu mbagħad tibgħathom għall-esportazzjoni.

Xandir fl-R?

X'tgħallimt: Tista' tikkuntattja stdin и stdout minn skript R, u għalhekk użah fil-pipeline.

Jista' jkun li ndunajt din il-linja fl-iskrittura ta' Bash tiegħek: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Huwa jittraduċi l-fajls kollha tal-grupp konkatenati (bin) fl-iskrittura R hawn taħt. {} hija teknika speċjali parallel, li ddaħħal kwalunkwe data li tibgħat lill-fluss speċifikat direttament fil-kmand innifsu. Għażla {#} jipprovdi ID tal-ħajt uniku, u {%} jirrappreżenta n-numru tas-slot tax-xogħol (ripetut, iżda qatt fl-istess ħin). Lista tal-għażliet kollha tista' tinstab fi dokumentazzjoni.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Meta varjabbli file("stdin") trasmessa lil readr::read_csv, id-dejta tradotta fl-iskrittura R titgħabba f'qafas, li mbagħad ikun fil-forma .rds-fajl bl-użu aws.s3 miktuba direttament lil S3.

RDS hija xi ħaġa bħal verżjoni junior tal-Parquet, mingħajr il-frills tal-ħażna tal-kelliem.

Wara li spiċċajt l-iskrittura Bash sibt pakkett .rds-fajls li jinsabu f'S3, li ppermettewni nuża kompressjoni effiċjenti u tipi integrati.

Minkejja l-użu tal-brejk R, kollox ħadem malajr ħafna. Mhux ta 'b'xejn, il-partijiet ta' R li jaqraw u jiktbu d-dejta huma ottimizzati ħafna. Wara l-ittestjar fuq kromożoma wieħed ta' daqs medju, ix-xogħol tlesta fuq istanza C5n.4xl f'madwar sagħtejn.

S3 Limitazzjonijiet

X'tgħallimt: Grazzi għall-implimentazzjoni tal-mogħdija intelliġenti, S3 jista 'jimmaniġġja ħafna fajls.

Kont inkwetat jekk S3 kienx kapaċi jimmaniġġja l-ħafna fajls li ġew trasferiti lilha. Nista' nagħmel sens li l-ismijiet tal-fajls, imma S3 kif tfittex għalihom?

Parsing ta '25TB bl-użu ta' AWK u R
Folders fl-S3 huma biss għall-wiri, fil-fatt is-sistema mhix interessata fis-simbolu /. Mill-paġna tal-FAQ S3.

Jidher li S3 jirrappreżenta t-triq għal fajl partikolari bħala ċavetta sempliċi f'tip ta 'tabella hash jew database bbażata fuq id-dokumenti. Barmil jista' jitqies bħala tabella, u l-fajls jistgħu jitqiesu bħala rekords f'dik it-tabella.

Peress li l-veloċità u l-effiċjenza huma importanti biex tagħmel profitt fl-Amazon, mhix sorpriża li din is-sistema ta 'ċavetta bħala fajl-passaġġ hija freaking ottimizzata. Ippruvajt insib bilanċ: sabiex ma kellix għalfejn nagħmel ħafna talbiet ta' get, iżda li t-talbiet ġew esegwiti malajr. Irriżulta li huwa aħjar li tagħmel madwar 20 elf fajl bin. Naħseb li jekk inkomplu nottimizzaw, nistgħu niksbu żieda fil-veloċità (per eżempju, nagħmlu barmil speċjali biss għad-dejta, u b'hekk innaqqsu d-daqs tat-tabella ta 'tfittxija). Iżda ma kienx hemm ħin jew flus għal aktar esperimenti.

Xi ngħidu dwar il-kompatibilità inkroċjata?

Dak li Tgħallimt: Il-kawża ewlenija tal-ħela tal-ħin hija li tottimizza l-metodu tal-ħażna tiegħek qabel iż-żmien.

F'dan il-punt, huwa importanti ħafna li tistaqsi lilek innifsek: "Għaliex tuża format ta' fajl proprjetarju?" Ir-raġuni tinsab fil-veloċità tat-tagħbija (fajls CSV gzipped ħadu 7 darbiet itwal biex jitgħabbew) u l-kompatibilità mal-flussi tax-xogħol tagħna. Nista' nikkunsidra mill-ġdid jekk R jistax faċilment jgħabbi fajls Parquet (jew Arrow) mingħajr it-tagħbija Spark. Kulħadd fil-laboratorju tagħna juża R, u jekk għandi bżonn nikkonverti d-dejta f'format ieħor, għad għandi d-dejta tat-test oriġinali, u għalhekk nista 'nagħmel il-pipeline mill-ġdid.

Diviżjoni tax-xogħol

X'tgħallimt: Tippruvax tottimizza l-impjiegi manwalment, ħalli l-kompjuter jagħmel dan.

Iddebuggajt il-fluss tax-xogħol fuq kromożoma wieħed, issa għandi bżonn nipproċessa d-dejta l-oħra kollha.
Xtaqt inqajjem diversi istanzi EC2 għall-konverżjoni, iżda fl-istess ħin kont nibża li nġib tagħbija żbilanċjata ħafna fuq xogħlijiet ta 'proċessar differenti (bħalma Spark sofra minn diviżorji żbilanċjati). Barra minn hekk, ma kontx interessat li ngħolli istanza waħda għal kull kromożoma, għax għall-kontijiet AWS hemm limitu default ta '10 każijiet.

Imbagħad iddeċidejt li nikteb skript f'R biex nottimizza l-impjiegi tal-ipproċessar.

L-ewwel, tlabt lil S3 biex tikkalkula kemm okkupa spazju għall-ħażna kull kromożoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Imbagħad ktibt funzjoni li tieħu d-daqs totali, tħawwad l-ordni tal-kromożomi, taqsamhom fi gruppi num_jobs u jgħidlek kemm huma differenti d-daqsijiet tal-impjiegi kollha tal-ipproċessar.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Imbagħad għamilt elf shuffle uża purrr u għażilt l-aħjar.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Allura spiċċajt b'sett ta 'kompiti li kienu simili ħafna fid-daqs. Imbagħad kulma kien fadal kien li nagħlaq l-iskript preċedenti tiegħi tal-Bash f'linja kbira for. Din l-ottimizzazzjoni ħadet madwar 10 minuti biex tikteb. U dan huwa ħafna inqas milli kont nonfoq fuq il-ħolqien manwalment tal-kompiti li kieku kienu żbilanċjati. Għalhekk, naħseb li kelli raġun b'din l-ottimizzazzjoni preliminari.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Fl-aħħar inżid il-kmand tal-għeluq:

sudo shutdown -h now

... u kollox ħadem! Bl-użu tal-AWS CLI, qajjejt istanzi bl-użu tal-għażla user_data tahom skripts Bash tal-kompiti tagħhom għall-ipproċessar. Huma damu u jagħlqu awtomatikament, għalhekk ma kontx inħallas għal qawwa ta 'proċessar żejda.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Ejja nippakkjaw!

X'tgħallimt: L-API għandha tkun sempliċi għall-fini tal-faċilità u l-flessibilità ta 'użu.

Fl-aħħar sibt id-dejta fil-post u fil-forma t-tajba. Li baqa’ kien li nissimplifika kemm jista’ jkun il-proċess tal-użu tad-dejta biex tagħmilha aktar faċli għall-kollegi tiegħi. Ridt nagħmel API sempliċi għall-ħolqien ta 'talbiet. Jekk fil-futur niddeċiedi li naqleb minn .rds għal fajls Parquet, allura din għandha tkun problema għalija, mhux għall-kollegi tiegħi. Għal dan iddeċidejt li nagħmel pakkett R intern.

Ibni u ddokumenta pakkett sempliċi ħafna li jkun fih biss ftit funzjonijiet ta' aċċess għad-dejta organizzati madwar funzjoni get_snp. Għamilt ukoll websajt għall-kollegi tiegħi pkgdown, sabiex ikunu jistgħu faċilment jaraw eżempji u dokumentazzjoni.

Parsing ta '25TB bl-użu ta' AWK u R

Caching intelliġenti

X'tgħallimt: Jekk id-dejta tiegħek tkun ippreparata tajjeb, il-caching ikun faċli!

Peress li wieħed mill-flussi tax-xogħol prinċipali applika l-istess mudell ta 'analiżi għall-pakkett SNP, iddeċidejt li nuża binning għall-vantaġġ tiegħi. Meta tittrasmetti d-dejta permezz ta 'SNP, l-informazzjoni kollha mill-grupp (bin) hija mehmuża mal-oġġett ritornat. Jiġifieri, mistoqsijiet qodma jistgħu (fit-teorija) iħaffu l-ipproċessar ta 'mistoqsijiet ġodda.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Meta nibni l-pakkett, għamilt ħafna punti ta 'referenza biex inqabbel il-veloċità meta tuża metodi differenti. Nirrakkomanda li ma tittraskurax dan, għax xi drabi r-riżultati huma mhux mistennija. Pereżempju, dplyr::filter kien ferm aktar mgħaġġel milli jinqabad ringieli bl-użu ta 'filtrazzjoni bbażata fuq l-indiċjar, u l-irkupru ta' kolonna waħda minn qafas ta 'dejta ffiltrat kien ħafna aktar mgħaġġel milli tuża sintassi ta' indiċjar.

Jekk jogħġbok innota li l-oġġett prev_snp_results fih iċ-ċavetta snps_in_bin. Din hija firxa ta 'SNPs uniċi kollha fi grupp (bin), li jippermettilek tiċċekkja malajr jekk diġà għandekx dejta minn mistoqsija preċedenti. Jagħmilha faċli wkoll li tgħaddi l-SNPs kollha fi grupp (bin) b'dan il-kodiċi:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Sejbiet

Issa nistgħu (u bdejna nħaddmu bis-serjetà) mudelli u xenarji li qabel kienu inaċċessibbli għalina. L-aħjar ħaġa hija li l-kollegi tiegħi tal-laboratorju m'għandhomx għalfejn jaħsbu dwar xi kumplikazzjonijiet. Huma għandhom biss funzjoni li taħdem.

U għalkemm il-pakkett jeħlilhom id-dettalji, ippruvajt nagħmel il-format tad-dejta sempliċi biżżejjed li jkunu jistgħu jifhmuh jekk għada f'daqqa waħda sparixxa...

Il-veloċità żdiedet b'mod notevoli. Normalment niskennjaw frammenti tal-ġenoma funzjonalment sinifikanti. Preċedentement, ma stajniex nagħmlu dan (irriżulta li kien għali wisq), iżda issa, grazzi għall-istruttura tal-grupp (bin) u l-caching, talba għal SNP wieħed tieħu bħala medja inqas minn 0,1 sekondi, u l-użu tad-dejta huwa hekk baxx li l-ispejjeż għal S3 huma karawett.

Konklużjoni

Dan l-artikolu mhu gwida xejn. Is-soluzzjoni rriżulta li kienet individwali, u kważi ċertament mhux ottimali. Pjuttost, huwa ktieb tal-ivvjaġġar. Irrid li oħrajn jifhmu li deċiżjonijiet bħal dawn ma jidhrux iffurmati għal kollox fir-ras, huma riżultat ta 'prova u żball. Ukoll, jekk qed tfittex xjenzat tad-dejta, żomm f'moħħok li l-użu ta 'dawn l-għodod b'mod effettiv jeħtieġ esperjenza, u l-esperjenza tiswa l-flus. Ninsab kuntent li kelli l-mezzi biex inħallas, imma ħafna oħrajn li jistgħu jagħmlu l-istess xogħol aħjar minni qatt mhu se jkollhom l-opportunità minħabba nuqqas ta’ flus li saħansitra jippruvaw.

Għodod tad-dejta kbira huma versatili. Jekk għandek il-ħin, kważi ċertament tista 'tikteb soluzzjoni aktar mgħaġġla billi tuża tekniki intelliġenti ta' tindif, ħażna u estrazzjoni tad-dejta. Fl-aħħar mill-aħħar niġu għal analiżi tal-kost-benefiċċju.

Dak li tgħallimt:

  • m'hemm l-ebda mod irħis biex parse 25 TB kull darba;
  • oqgħod attent mad-daqs tal-fajls tal-Parquet tiegħek u l-organizzazzjoni tagħhom;
  • Il-ħitan fi Spark għandhom ikunu bilanċjati;
  • B'mod ġenerali, qatt ma tipprova tagħmel 2,5 miljun diviżorju;
  • L-issortjar għadu diffiċli, kif qed jitwaqqaf Spark;
  • kultant dejta speċjali teħtieġ soluzzjonijiet speċjali;
  • L-aggregazzjoni tal-ispark hija mgħaġġla, iżda l-qsim għadu jiswa ħafna flus;
  • torqodx meta jgħallmuk l-affarijiet bażiċi, xi ħadd probabbilment diġà solvut il-problema tiegħek lura fis-snin tmenin;
  • gnu parallel - din hija ħaġa maġika, kulħadd għandu jużaha;
  • Spark jħobb data mhux kompressata u ma jħobbx jgħaqqad diviżorji;
  • Spark għandu overhead wisq meta jsolvi problemi sempliċi;
  • Arrays assoċjati AWK huma effiċjenti ħafna;
  • tista' tikkuntattja stdin и stdout minn skript R, u għalhekk użah fil-pipeline;
  • Grazzi għall-implimentazzjoni tal-mogħdija intelliġenti, S3 jista 'jipproċessa ħafna fajls;
  • Ir-raġuni ewlenija għall-ħela tal-ħin hija l-ottimizzazzjoni prematura tal-metodu tal-ħażna tiegħek;
  • tippruvax tottimizza l-kompiti manwalment, ħalli l-kompjuter jagħmel dan;
  • L-API għandha tkun sempliċi għall-fini tal-faċilità u l-flessibilità tal-użu;
  • Jekk id-dejta tiegħek tkun ippreparata sew, il-caching ikun faċli!

Sors: www.habr.com

Żid kumment