25 TB analizavimas naudojant AWK ir R

25 TB analizavimas naudojant AWK ir R
Kaip skaityti šį straipsnį: Atsiprašau, kad tekstas toks ilgas ir chaotiškas. Kad sutaupytumėte jūsų laiko, kiekvieną skyrių pradedu įvadu „Ką aš išmokau“, kuriame vienu ar dviem sakiniais apibendrinama skyriaus esmė.

„Tiesiog parodyk man sprendimą! Jei tik norite pamatyti, iš kur aš atėjau, pereikite prie skyriaus „Tapti išradingesniu“, bet manau, kad įdomiau ir naudingiau skaityti apie nesėkmes.

Neseniai man buvo pavesta nustatyti didelio kiekio neapdorotų DNR sekų (techniškai SNP lusto) apdorojimo procesą. Reikėjo greitai gauti duomenis apie tam tikrą genetinę vietą (vadinamą SNP), kad būtų galima atlikti tolesnį modeliavimą ir kitas užduotis. Naudodamas R ir AWK galėjau natūraliai išvalyti ir tvarkyti duomenis, labai paspartindamas užklausų apdorojimą. Tai man nebuvo lengva ir reikėjo daugybės pakartojimų. Šis straipsnis padės išvengti kai kurių mano klaidų ir parodys, kuo aš baigiau.

Pirma, keletas įvadinių paaiškinimų.

Duomenys

Mūsų universiteto genetinės informacijos apdorojimo centras pateikė mums duomenis 25 TB TSV forma. Jas gavau padalintas į 5 paketus, suspaustus Gzip, kurių kiekvienoje buvo apie 240 keturių gigabaitų failų. Kiekvienoje eilutėje buvo duomenys apie vieną SNP iš vieno asmens. Iš viso buvo perduoti duomenys apie ~2,5 milijono SNP ir ~60 tūkstančių žmonių. Be SNP informacijos, failuose buvo daug stulpelių su skaičiais, atspindinčiais įvairias charakteristikas, tokias kaip skaitymo intensyvumas, skirtingų alelių dažnis ir kt. Iš viso buvo apie 30 stulpelių su unikaliomis reikšmėmis.

Tikslas

Kaip ir bet kuriame duomenų valdymo projekte, svarbiausia buvo nustatyti, kaip duomenys bus naudojami. Tokiu atveju dažniausiai rinksimės SNP modelius ir darbo eigas pagal SNP. Tai reiškia, kad vienu metu mums reikės tik vieno SNP duomenų. Turėjau išmokti kuo lengviau, greičiau ir pigiau gauti visus įrašus, susijusius su vienu iš 2,5 milijono SNP.

Kaip to nedaryti

Cituoti tinkamą klišę:

Man nepavyko tūkstantį kartų, tiesiog atradau tūkstantį būdų, kaip išvengti duomenų krūvos analizavimo užklausoms pritaikytu formatu.

Pirmiausia pabandykite

Ką aš išmokau: Nėra pigaus būdo išanalizuoti 25 TB vienu metu.

Išklausęs kursą „Išplėstiniai didelių duomenų apdorojimo metodai“ Vanderbilto universitete, buvau tikras, kad triukas yra maiše. Tikriausiai prireiks valandos ar dviejų, kol bus nustatytas „Hive“ serveris, kad būtų galima peržiūrėti visus duomenis ir pranešti apie rezultatą. Kadangi mūsų duomenys saugomi AWS S3, pasinaudojau paslauga Atėnė, kuri leidžia taikyti Hive SQL užklausas S3 duomenims. Jums nereikia nustatyti / kelti Hive klasterio, be to, mokate tik už tuos duomenis, kurių ieškote.

Parodęs „Athena“ savo duomenis ir jų formatą, atlikau keletą testų su tokiomis užklausomis:

select * from intensityData limit 10;

Ir greitai gavo gerai susistemintus rezultatus. Paruošta.

Kol nebandėme duomenų panaudoti savo darbe...

Manęs buvo paprašyta ištraukti visą SNP informaciją, kad galėčiau išbandyti modelį. Paleidau užklausą:


select * from intensityData 
where snp = 'rs123456';

...ir pradėjo laukti. Po aštuonių minučių ir daugiau nei 4 TB prašomų duomenų gavau rezultatą. „Athena“ apmokestina pagal rastų duomenų kiekį, 5 USD už terabaitą. Taigi šis vienas prašymas kainavo 20 USD ir aštuonias minutes laukti. Norėdami paleisti modelį su visais duomenimis, turėjome laukti 38 metus ir sumokėti 50 milijonų dolerių. Akivaizdu, kad tai mums netiko.

Reikėjo naudoti parketą...

Ką aš išmokau: Būkite atsargūs dėl savo parketo failų dydžio ir jų organizavimo.

Pirmiausia bandžiau išspręsti situaciją konvertuodamas visus TSV į Parketo dildės. Jie yra patogūs dirbant su dideliais duomenų rinkiniais, nes juose esanti informacija saugoma stulpelių pavidalu: kiekvienas stulpelis yra savo atminties/disko segmente, priešingai nei tekstiniuose failuose, kurių eilutėse yra kiekvieno stulpelio elementai. Ir jei jums reikia ką nors rasti, tiesiog perskaitykite reikiamą stulpelį. Be to, kiekviename faile stulpelyje saugomas verčių diapazonas, taigi, jei ieškomos vertės nėra stulpelio diapazone, „Spark“ negaiš laiko viso failo nuskaitymui.

Atlikau paprastą užduotį AWS klijai konvertuoti mūsų TSV į parketą ir numetė naujus failus į Athena. Tai užtruko apie 5 valandas. Tačiau kai paleidau užklausą, užbaigti prireikė maždaug tiek pat laiko ir šiek tiek mažiau pinigų. Faktas yra tas, kad „Spark“, bandydama optimizuoti užduotį, tiesiog išpakavo vieną TSV gabalėlį ir įdėjo į savo parketo gabalą. Ir kadangi kiekvienas gabalas buvo pakankamai didelis, kad talpintų visus daugelio žmonių įrašus, kiekviename faile buvo visi SNP, todėl „Spark“ turėjo atidaryti visus failus, kad išgautų jai reikalingą informaciją.

Įdomu tai, kad numatytasis (ir rekomenduojamas) „Parquet“ suspaudimo tipas „snappy“ nėra skaidomas. Todėl kiekvienas vykdytojas įstrigo išpakuoti ir atsisiųsti visą 3,5 GB duomenų rinkinį.

25 TB analizavimas naudojant AWK ir R

Supraskime problemą

Ką aš išmokau: Rūšiuoti sunku, ypač jei duomenys paskirstyti.

Man atrodė, kad dabar supratau problemos esmę. Man reikėjo rūšiuoti duomenis tik pagal SNP stulpelį, o ne pagal žmones. Tada keli SNP bus saugomi atskirame duomenų gabale, o tada „Parquet“ „išmanioji“ funkcija „atidaryta tik tada, kai vertė yra diapazone“ pasirodys visa savo šlove. Deja, rūšiuoti milijardus eilučių, išsibarsčiusių grupėje, buvo sudėtinga.

AWS tikrai nenori grąžinti pinigų dėl priežasties „Aš esu išsiblaškęs studentas“. Kai rūšiavau „Amazon Glue“, jis veikė 2 dienas ir sudužo.

O kaip dėl skaidymo?

Ką aš išmokau: Spark pertvaros turi būti subalansuotos.

Tada aš sugalvojau suskirstyti duomenis chromosomose. Jų yra 23 (ir dar keletas, jei atsižvelgsite į mitochondrijų DNR ir nesusijusias sritis).
Tai leis padalyti duomenis į mažesnes dalis. Jei pridedate tik vieną eilutę prie „Spark“ eksportavimo funkcijos „Glue“ scenarijuje partition_by = "chr", tada duomenis reikia suskirstyti į segmentus.

25 TB analizavimas naudojant AWK ir R
Genomas susideda iš daugybės fragmentų, vadinamų chromosomomis.

Deja, nepavyko. Chromosomos yra skirtingo dydžio, o tai reiškia skirtingą informacijos kiekį. Tai reiškia, kad užduotys, kurias Spark išsiuntė darbuotojams, nebuvo subalansuotos ir buvo baigtos lėtai, nes kai kurie mazgai buvo baigti anksti ir neveikė. Tačiau užduotys buvo įvykdytos. Tačiau paprašius vieno SNP, disbalansas vėl sukėlė problemų. SNP apdorojimo didesnėse chromosomose (ty ten, kur norime gauti duomenis) kaina sumažėjo tik maždaug 10 kartų. Daug, bet nepakankamai.

O jei padalinsime į dar mažesnes dalis?

Ką aš išmokau: Niekada nebandykite daryti 2,5 milijono skaidinių.

Nusprendžiau viską padaryti ir padalinau kiekvieną SNP. Tai užtikrino, kad pertvaros būtų vienodo dydžio. TAI BUVO BLOGA IDĖJA. Aš naudojau klijus ir pridėjau nekaltą eilutę partition_by = 'snp'. Užduotis prasidėjo ir pradėjo vykdyti. Po dienos patikrinau ir pamačiau, kad į S3 vis dar nieko neparašyta, todėl užduotį nužudžiau. Atrodo, kad Glue rašė tarpinius failus į paslėptą S3 vietą, daug failų, galbūt pora milijonų. Dėl to mano klaida kainavo daugiau nei tūkstantį dolerių ir nepatiko mano mentoriui.

Skirstymas + rūšiavimas

Ką aš išmokau: Rūšiuoti vis dar sunku, kaip ir derinti Spark.

Paskutinis mano bandymas skaidyti buvo susijęs su chromosomų padalijimu ir kiekvienos skaidinio rūšiavimu. Teoriškai tai pagreitintų kiekvieną užklausą, nes norimi SNP duomenys turi būti keliuose parketo gabaluose tam tikrame diapazone. Deja, net suskirstytų duomenų rūšiavimas pasirodė sudėtingas. Dėl to aš perėjau į EMR, kad sukurčiau tinkintą klasterį ir panaudojau aštuonis galingus egzempliorius (C5.4xl) ir „Sparklyr“, kad sukurčiau lankstesnę darbo eigą...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tačiau užduotis vis dar nebuvo atlikta. Konfigūravau įvairiais būdais: padidinau atminties paskirstymą kiekvienam užklausos vykdytojui, naudoju mazgus su dideliu atminties kiekiu, naudoju transliavimo kintamuosius (transliavimo kintamuosius), bet kiekvieną kartą tai pasirodydavo pusiau matais ir pamažu pradėjo vykdyti vykdytojai. nepavyks, kol viskas sustos.

Tampu kūrybiškesnis

Ką aš išmokau: Kartais ypatingiems duomenims reikia specialių sprendimų.

Kiekvienas SNP turi padėties reikšmę. Tai skaičius, atitinkantis bazių skaičių jo chromosomoje. Tai puikus ir natūralus būdas tvarkyti duomenis. Iš pradžių norėjau suskirstyti pagal kiekvienos chromosomos sritis. Pavyzdžiui, pozicijos 1 - 2000, 2001 - 4000 ir kt. Tačiau problema ta, kad SNP nėra tolygiai pasiskirstę chromosomose, todėl grupių dydžiai labai skirsis.

25 TB analizavimas naudojant AWK ir R

Dėl to aš priėjau prie pozicijų suskirstymo į kategorijas (rangą). Naudodamas jau atsisiųstus duomenis, pateikiau užklausą gauti unikalių SNP, jų pozicijų ir chromosomų sąrašą. Tada surūšiavau duomenis kiekvienoje chromosomoje ir surinkau SNP į tam tikro dydžio grupes (bin). Tarkime, po 1000 SNP. Tai suteikė man ryšį tarp SNP ir grupės vienai chromosomai.

Galų gale sudariau grupes (bin) iš 75 SNP, priežastis bus paaiškinta toliau.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Pirmiausia pabandykite su Spark

Ką aš išmokau: Spark agregacija yra greita, bet skaidymas vis tiek brangus.

Norėjau perskaityti šį nedidelį (2,5 mln. eilučių) duomenų rėmelį į „Spark“, sujungti jį su neapdorotais duomenimis ir padalinti į naujai pridėtą stulpelį bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

aš naudojau sdf_broadcast(), todėl „Spark“ žino, kad turėtų siųsti duomenų rėmelį į visus mazgus. Tai naudinga, jei duomenys yra mažo dydžio ir reikalingi visoms užduotims atlikti. Priešingu atveju „Spark“ bando būti protinga ir paskirsto duomenis pagal poreikį, o tai gali sukelti sulėtėjimą.

Ir vėl mano sumanymas nepasiteisino: užduotys kurį laiką veikė, užbaigė sąjungą, o paskui, kaip ir skaidant paleisti vykdytojai, pradėjo žlugti.

Pridedamas AWK

Ką aš išmokau: Nemiegok, kai tave moko pagrindinių dalykų. Tikrai kažkas jau išsprendė jūsų problemą devintajame dešimtmetyje.

Iki šiol visų mano nesėkmių naudojant „Spark“ priežastis buvo duomenų kratinys klasteryje. Galbūt situaciją galima pagerinti pradėjus gydymą. Nusprendžiau pabandyti padalyti neapdorotus teksto duomenis į chromosomų stulpelius, todėl tikėjausi „Spark“ pateikti „iš anksto suskirstytus“ duomenis.

„StackOverflow“ ieškojau, kaip padalinti pagal stulpelių reikšmes, ir radau toks puikus atsakymas. Naudodami AWK galite padalinti tekstinį failą pagal stulpelių reikšmes, rašydami jį scenarijuje, o ne siųsdami rezultatus į stdout.

Parašiau Bash scenarijų, kad jį išbandyčiau. Atsisiuntė vieną iš supakuotų TSV, tada išpakavo jį naudodami gzip ir išsiųstas į awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Pavyko!

Šerdies užpildymas

Ką aš išmokau: gnu parallel - tai stebuklingas dalykas, visi turėtų juo naudotis.

Atsiskyrimas buvo gana lėtas ir kai pradėjau htopnorint patikrinti galingo (ir brangaus) EC2 egzemplioriaus naudojimą, paaiškėjo, kad naudoju tik vieną branduolį ir apie 200 MB atminties. Norėdami išspręsti problemą ir neprarasti daug pinigų, turėjome sugalvoti, kaip sulyginti darbus. Laimei, absoliučiai nuostabioje knygoje Duomenų mokslas komandinėje eilutėje Radau Jerono Jansenso skyrių apie lygiagretavimą. Iš jo sužinojau apie gnu parallel, labai lankstus būdas įdiegti daugiagiją Unix.

25 TB analizavimas naudojant AWK ir R
Kai pradėjau skaidyti naudojant naują procesą, viskas buvo gerai, bet vis tiek buvo kliūtis - S3 objektų atsisiuntimas į diską nebuvo labai greitas ir nevisiškai lygiagretus. Norėdami tai ištaisyti, padariau taip:

  1. Sužinojau, kad S3 atsisiuntimo etapą galima įdiegti tiesiogiai konvejeryje, visiškai pašalinant tarpinį saugojimą diske. Tai reiškia, kad galiu nerašyti neapdorotų duomenų į diską ir naudoti dar mažesnę, taigi ir pigesnę, AWS saugyklą.
  2. komanda aws configure set default.s3.max_concurrent_requests 50 labai padidino AWS CLI naudojamų gijų skaičių (pagal numatytuosius nustatymus jų yra 10).
  3. Perjungiau prie tinklo greičiui optimizuoto EC2 egzemplioriaus, kurio pavadinime yra raidė n. Pastebėjau, kad apdorojimo galios praradimą naudojant n egzempliorius daugiau nei kompensuoja įkėlimo greičio padidėjimas. Daugumai užduočių naudoju c5n.4xl.
  4. Pasikeitė gzip apie pigz, tai yra gzip įrankis, galintis padaryti šaunių dalykų, kad būtų lygiagreti iš pradžių nelygiagreti failų išskleidimo užduotis (tai padėjo mažiausiai).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Šie veiksmai derinami vienas su kitu, kad viskas veiktų labai greitai. Padidinus atsisiuntimo greitį ir pašalinus įrašymą į diską, dabar galėčiau apdoroti 5 terabaitų paketą vos per kelias valandas.

Šiame tviteryje turėjo būti paminėta „TSV“. Deja.

Naudojami naujai išanalizuoti duomenys

Ką aš išmokau: Spark mėgsta nesuspaustus duomenis ir nemėgsta derinti skaidinių.

Dabar duomenys buvo S3 išpakuotu (skaityti: bendrinamu) ir pusiau užsakytu formatu, ir aš vėl galėjau grįžti į „Spark“. Manęs laukė staigmena: ir vėl nepasiekiau to, ko norėjau! „Spark“ buvo labai sunku tiksliai pasakyti, kaip duomenys buvo suskirstyti. Ir net kai tai padariau, paaiškėjo, kad buvo per daug pertvarų (95 tūkst.), o kai naudojau coalesce sumažino jų skaičių iki pagrįstų ribų, tai sunaikino mano skaidymą. Esu tikras, kad tai gali būti ištaisyta, bet po kelių dienų ieškojimo neradau sprendimo. Galiausiai visas užduotis baigiau Spark, nors tai užtruko ir mano suskaidyti Parquet failai nebuvo labai maži (~200 KB). Tačiau duomenys buvo ten, kur jų reikėjo.

25 TB analizavimas naudojant AWK ir R
Per mažas ir nelygus, nuostabus!

Vietinių „Spark“ užklausų testavimas

Ką aš išmokau: Spark turi per daug pridėtinių išlaidų sprendžiant paprastas problemas.

Parsiuntus duomenis sumaniu formatu, galėjau išbandyti greitį. Nustatykite R scenarijų, kad paleistumėte vietinį „Spark“ serverį, tada įkelkite „Spark“ duomenų rėmelį iš nurodytos „Parquet“ grupės saugyklos (dėklo). Bandžiau įkelti visus duomenis, bet nepavyko Sparklyr atpažinti skaidymo.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Egzekucija truko 29,415 sek. Daug geriau, bet nelabai tinka masiniam bet ko testavimui. Be to, negalėjau pagreitinti kaupimo talpykloje, nes kai bandžiau talpykloje įrašyti duomenų rėmelį atmintyje, „Spark“ visada sugesdavo, net kai duomenų rinkiniui, sveriančiam mažiau nei 50, skirdavau daugiau nei 15 GB atminties.

Grįžti į AWK

Ką aš išmokau: AWK asociatyvūs masyvai yra labai veiksmingi.

Supratau, kad galiu pasiekti didesnį greitį. Prisiminiau tai nuostabiai Bruce'o Barnetto AWK pamoka Skaičiau apie puikią funkciją, pavadintą „asociatyviniai masyvai“ Iš esmės tai yra raktų-reikšmių poros, kurios AWK dėl tam tikrų priežasčių buvo vadinamos skirtingai, todėl aš kažkaip apie jas negalvojau. Romanas Čeplyaka priminė, kad terminas „asociatyvūs masyvai“ yra daug senesnis už terminą „rakto-reikšmių pora“. Net jei tu „Google Ngram“ ieškokite rakto vertės, šio termino ten nematysite, bet rasite asociatyvinius masyvus! Be to, „rakto-reikšmių pora“ dažniausiai siejama su duomenų bazėmis, todėl daug prasmingiau ją lyginti su maišos žemėlapiu. Supratau, kad galiu naudoti šias asociatyvines matricas, kad susiečiau savo SNP su dėžės lentele ir neapdorotais duomenimis nenaudodamas „Spark“.

Norėdami tai padaryti, AWK scenarijuje naudojau bloką BEGIN. Tai yra kodo dalis, kuri vykdoma prieš perduodant pirmą duomenų eilutę į pagrindinį scenarijaus turinį.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Komanda while(getline...) įkėlė visas eilutes iš CSV grupės (bin), nustatykite pirmąjį stulpelį (SNP pavadinimą) kaip asociatyvaus masyvo raktą bin o antroji reikšmė (grupė) kaip reikšmė. Tada bloke { }, kuris vykdomas visose pagrindinio failo eilutėse, kiekviena eilutė siunčiama į išvesties failą, kuris gauna unikalų pavadinimą, priklausomai nuo jo grupės (dėklo): ..._bin_"bin[$1]"_....

Kintamieji batch_num и chunk_id atitiko dujotiekio pateiktus duomenis, išvengiant lenktynių sąlygų ir kiekvienos vykdymo gijos parallel, parašė į savo unikalų failą.

Kadangi visus neapdorotus duomenis išskaičiau į aplankus chromosomose, likusiuose po ankstesnio eksperimento su AWK, dabar galėčiau parašyti kitą „Bash“ scenarijų, kad apdorotų vieną chromosomą vienu metu ir nusiųsti gilesnius skaidinius duomenis į S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Scenarijus turi du skyrius parallel.

Pirmajame skyriuje duomenys nuskaitomi iš visų failų, kuriuose yra informacija apie norimą chromosomą, tada šie duomenys paskirstomi gijomis, kurios paskirsto failus į atitinkamas grupes (bin). Kad būtų išvengta lenktynių sąlygų, kai į tą patį failą rašo kelios gijos, AWK perduoda failų pavadinimus duomenims įrašyti į skirtingas vietas, pvz. chr_10_bin_52_batch_2_aa.csv. Dėl to diske sukuriama daug mažų failų (tam naudojau terabaitų EBS tomus).

Konvejeris iš antros sekcijos parallel eina per grupes (bin) ir sujungia jų atskirus failus į bendrą CSV c catir tada siunčia juos eksportui.

Transliavimas R?

Ką aš išmokau: Galite susisiekti stdin и stdout iš R scenarijaus, todėl naudokite jį ruošiant.

Galbūt pastebėjote šią eilutę savo Bash scenarijuje: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Jis paverčia visus sujungtus grupės failus (bin) į toliau pateiktą R scenarijų. {} yra ypatinga technika parallel, kuris įterpia visus duomenis, kuriuos siunčia į nurodytą srautą, tiesiai į pačią komandą. Variantas {#} suteikia unikalų gijos ID ir {%} reiškia darbo vietos numerį (kartojantis, bet niekada vienu metu). Visų parinkčių sąrašą galite rasti dokumentacija.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kai kintamasis file("stdin") perduota į readr::read_csv, duomenys, išversti į R scenarijų, įkeliami į rėmelį, kuris tada yra formoje .rds- naudojant failą aws.s3 rašoma tiesiai į S3.

RDS yra kažkas panašaus į jaunesniąją „Parquet“ versiją be garsiakalbių saugojimo smulkmenų.

Baigęs „Bash“ scenarijų, gavau paketą .rds-S3 esančius failus, kurie leido naudoti efektyvų glaudinimą ir įtaisytuosius tipus.

Nepaisant to, kad buvo naudojamas stabdys R, viskas veikė labai greitai. Nenuostabu, kad R dalys, kurios skaito ir rašo duomenis, yra labai optimizuotos. Atlikus bandymus vienoje vidutinio dydžio chromosomoje, darbas su C5n.4xl egzemplioriumi buvo atliktas maždaug per dvi valandas.

S3 apribojimai

Ką aš išmokau: Išmaniojo kelio diegimo dėka S3 gali tvarkyti daug failų.

Nerimavau, ar S3 sugebės tvarkyti daugybę į jį perkeltų failų. Galėčiau padaryti failų pavadinimus prasmingus, bet kaip S3 jų ieškotų?

25 TB analizavimas naudojant AWK ir R
Aplankai S3 yra tik parodyti, iš tikrųjų sistema nesidomi simboliu /. Iš S3 DUK puslapio.

Atrodo, kad S3 žymi kelią į tam tikrą failą kaip paprastą raktą maišos lentelėje arba dokumentų duomenų bazėje. Kibirą galima laikyti lentele, o failus galima laikyti tos lentelės įrašais.

Kadangi greitis ir efektyvumas yra svarbūs norint gauti pelną „Amazon“, nenuostabu, kad ši rakto kaip failo kelio sistema yra beprotiškai optimizuota. Stengiausi rasti balansą: kad nereikėtų daug gauti užklausų, o kad prašymai būtų greitai įvykdyti. Paaiškėjo, kad geriausia padaryti apie 20 tūkst. Manau, jei ir toliau optimizuosime, galime pasiekti spartos padidėjimą (pavyzdžiui, padaryti specialų kibirą tik duomenims, taip sumažinant peržvalgos lentelės dydį). Tačiau tolimesniems eksperimentams nebuvo nei laiko, nei pinigų.

O kaip dėl kryžminio suderinamumo?

Ką aš sužinojau: pagrindinė sugaišto laiko priežastis yra per anksti optimizuoti saugojimo metodą.

Šiuo metu labai svarbu savęs paklausti: „Kodėl naudoti patentuotą failo formatą? Priežastis yra įkėlimo greitis (gzipped CSV failai įkeliami 7 kartus ilgiau) ir suderinamumas su mūsų darbo eigomis. Galiu persvarstyti, ar R gali lengvai įkelti „Parquet“ (arba „Arrow“) failus be „Spark“ apkrovos. Visi mūsų laboratorijoje naudojasi R, ir jei man reikia konvertuoti duomenis į kitą formatą, vis tiek turiu pradinius teksto duomenis, todėl galiu tiesiog paleisti konvejerį dar kartą.

Darbų pasidalijimas

Ką aš išmokau: Nebandykite optimizuoti darbų rankiniu būdu, leiskite tai padaryti kompiuteriui.

Suderinau darbo eigą vienoje chromosomoje, dabar turiu apdoroti visus kitus duomenis.
Norėjau pakelti kelis EC2 egzempliorius, kad būtų galima konvertuoti, bet tuo pat metu bijojau gauti labai nesubalansuotą apkrovą atliekant skirtingus apdorojimo darbus (kaip ir Spark kentėjo dėl nesubalansuotų skaidinių). Be to, man nebuvo įdomu pakelti vieną egzempliorių vienoje chromosomoje, nes AWS paskyroms yra numatytasis 10 egzempliorių limitas.

Tada nusprendžiau parašyti scenarijų R, kad optimizuočiau apdorojimo darbus.

Pirmiausia paprašiau S3 apskaičiuoti, kiek atminties vietos užėmė kiekviena chromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Tada parašiau funkciją, kuri paima bendrą dydį, sumaišo chromosomų tvarką, suskirsto jas į grupes num_jobs ir nurodo, kaip skiriasi visų apdorojimo užduočių dydžiai.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Tada perėjau tūkstantį maišymų naudodamas purrr ir išsirinkau geriausią.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Taigi aš baigiau užduočių rinkinį, kuris buvo labai panašaus dydžio. Tada beliko suvynioti mano ankstesnį Bash scenarijų į didelę kilpą for. Šis optimizavimas užtruko apie 10 minučių. Ir tai yra daug mažiau, nei išleisčiau rankiniu būdu kurdamas užduotis, jei jos būtų nesubalansuotos. Todėl manau, kad buvau teisus dėl šio išankstinio optimizavimo.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Pabaigoje pridedu išjungimo komandą:

sudo shutdown -h now

... ir viskas pavyko! Naudodamas AWS CLI, iškėliau egzempliorius naudodamas parinktį user_data davė jiems apdoroti savo užduočių Bash scenarijus. Jie paleido ir išsijungė automatiškai, todėl nemokėjau už papildomą apdorojimo galią.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pakuosime!

Ką aš išmokau: API turėtų būti paprasta, kad būtų patogu ir lanksčiau naudoti.

Pagaliau duomenis gavau reikiamoje vietoje ir formoje. Liko tik kiek įmanoma supaprastinti duomenų panaudojimo procesą, kad būtų lengviau kolegoms. Norėjau sukurti paprastą API užklausoms kurti. Jei ateityje nuspręsiu pereiti nuo .rds prie Parketo dildes, tai turetu buti man, o ne kolegoms problema. Tam nusprendžiau sukurti vidinį R paketą.

Sukurkite ir dokumentuokite labai paprastą paketą, kuriame yra tik kelios duomenų prieigos funkcijos, išdėstytos pagal funkciją get_snp. Taip pat sukūriau svetainę savo kolegoms pkgdown, kad jie galėtų lengvai matyti pavyzdžius ir dokumentus.

25 TB analizavimas naudojant AWK ir R

Išmanusis talpyklos kaupimas

Ką aš išmokau: Jei jūsų duomenys yra gerai paruošti, kaupimas talpykloje bus lengvas!

Kadangi viena iš pagrindinių darbo eigų taikė tą patį analizės modelį SNP paketui, nusprendžiau panaudoti sujungimą savo naudai. Perduodant duomenis per SNP visa informacija iš grupės (bin) pridedama prie grąžinamo objekto. Tai yra, senos užklausos gali (teoriškai) pagreitinti naujų užklausų apdorojimą.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Kurdamas paketą atlikau daugybę etalonų, kad palyginčiau greitį naudojant skirtingus metodus. Rekomenduoju to neapleisti, nes kartais rezultatai būna netikėti. Pavyzdžiui, dplyr::filter buvo daug greičiau nei eilučių fiksavimas naudojant indeksavimu pagrįstą filtravimą, o vieno stulpelio gavimas iš filtruoto duomenų rėmelio buvo daug greičiau nei naudojant indeksavimo sintaksę.

Atkreipkite dėmesį, kad objektas prev_snp_results yra raktas snps_in_bin. Tai yra visų unikalių SNP grupėje (dėkle), leidžiantis greitai patikrinti, ar jau turite duomenų iš ankstesnės užklausos. Tai taip pat leidžia lengvai pereiti per visus grupės (bin) SNP naudojant šį kodą:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

rezultatai

Dabar galime (ir pradėjome rimtai) paleisti modelius ir scenarijus, kurie anksčiau mums buvo nepasiekiami. Geriausia tai, kad mano kolegoms laboratorijoje nereikia galvoti apie jokias komplikacijas. Jie tiesiog turi funkciją, kuri veikia.

Ir nors pakuotė negaili jiems smulkmenų, aš stengiausi padaryti duomenų formatą pakankamai paprastą, kad jie galėtų tai suprasti, jei rytoj staiga dingčiau...

Greitis pastebimai padidėjo. Paprastai nuskaitome funkciškai reikšmingus genomo fragmentus. Anksčiau to negalėjome padaryti (pasirodė per brangu), o dabar dėl grupės (bin) struktūros ir talpyklos užklausos dėl vieno SNP vidutiniškai trunka mažiau nei 0,1 sekundės, o duomenų naudojimas yra toks. mažos, kad S3 išlaidos yra žemės riešutai.

išvada

Šis straipsnis visai nėra vadovas. Sprendimas pasirodė individualus ir beveik neabejotinai ne optimalus. Greičiau tai kelionių aprašymas. Noriu, kad kiti suprastų, kad tokie sprendimai neatrodo iki galo susiformavę galvoje, jie yra bandymų ir klaidų rezultatas. Be to, jei ieškote duomenų mokslininko, atminkite, kad norint efektyviai naudoti šiuos įrankius reikia patirties, o patirtis kainuoja. Džiaugiuosi, kad turėjau galimybių mokėti, bet daugelis kitų, kurie tą patį darbą gali atlikti geriau už mane, dėl pinigų stokos niekada neturės galimybės net pabandyti.

Didelių duomenų įrankiai yra universalūs. Jei turite laiko, beveik neabejotinai galite parašyti greitesnį sprendimą naudodami išmaniuosius duomenų valymo, saugojimo ir ištraukimo būdus. Galiausiai tai priklauso nuo kaštų ir naudos analizės.

Ką aš išmokau:

  • nėra pigaus būdo išanalizuoti 25 TB vienu metu;
  • būkite atsargūs dėl savo parketo failų dydžio ir jų organizavimo;
  • Spark pertvaros turi būti subalansuotos;
  • Apskritai niekada nebandykite padaryti 2,5 milijono skaidinių;
  • Rūšiuoti vis dar sunku, kaip ir nustatyti „Spark“;
  • kartais specialiems duomenims reikia specialių sprendimų;
  • Kibirkšties sujungimas yra greitas, tačiau skaidymas vis tiek brangus;
  • nemiegok, kai tave moko pagrindų, tikriausiai kažkas jau išsprendė tavo problemą devintajame dešimtmetyje;
  • gnu parallel - tai stebuklingas dalykas, visi turėtų juo naudotis;
  • Spark mėgsta nesuspaustus duomenis ir nemėgsta derinti skaidinių;
  • Spark turi per daug pridėtinių išlaidų sprendžiant paprastas problemas;
  • AWK asociatyvūs masyvai yra labai veiksmingi;
  • galite susisiekti stdin и stdout iš R scenarijaus, todėl naudokite jį ruošiant;
  • Išmaniojo kelio diegimo dėka S3 gali apdoroti daug failų;
  • Pagrindinė laiko gaišimo priežastis – per anksti optimizuotas saugojimo būdas;
  • nesistenkite optimizuoti užduočių rankiniu būdu, leiskite tai padaryti kompiuteriui;
  • API turėtų būti paprasta, kad būtų patogu ir lanksčiau naudoti;
  • Jei jūsų duomenys yra gerai paruošti, kaupimas talpykloje bus lengvas!

Šaltinis: www.habr.com

Добавить комментарий