Parimi i 25 TB duke përdorur AWK dhe R

Parimi i 25 TB duke përdorur AWK dhe R
Si ta lexoni këtë artikull: Kërkoj ndjesë që teksti është kaq i gjatë dhe kaotik. Për t'ju kursyer kohë, unë e filloj çdo kapitull me një hyrje "Ajo që mësova", e cila përmbledh thelbin e kapitullit në një ose dy fjali.

"Vetëm më trego zgjidhjen!" Nëse thjesht doni të shihni se nga kam ardhur, atëherë kaloni te kapitulli "Të bëhemi më shpikës", por mendoj se është më interesante dhe më e dobishme të lexosh për dështimin.

Kohët e fundit më është ngarkuar me vendosjen e një procesi për përpunimin e një vëllimi të madh të sekuencave të ADN-së së papërpunuar (teknikisht një çip SNP). Nevoja ishte për të marrë shpejt të dhëna për një vendndodhje të caktuar gjenetike (të quajtur SNP) për modelimin e mëvonshëm dhe detyra të tjera. Duke përdorur R dhe AWK, isha në gjendje të pastroja dhe organizoja të dhënat në një mënyrë natyrale, duke përshpejtuar shumë përpunimin e pyetjeve. Kjo nuk ishte e lehtë për mua dhe kërkonte përsëritje të shumta. Ky artikull do t'ju ndihmojë të shmangni disa nga gabimet e mia dhe t'ju tregojë se me çfarë përfundova.

Së pari, disa shpjegime hyrëse.

Të dhëna

Qendra jonë e përpunimit të informacionit gjenetik universitar na dha të dhëna në formën e një TSV 25 TB. I mora të ndara në 5 pako të ngjeshur me Gzip, secila përmban rreth 240 skedarë katër gigabajt. Çdo rresht përmbante të dhëna për një SNP nga një individ. Në total, u transmetuan të dhëna për ~2,5 milion SNP dhe ~60 mijë njerëz. Përveç informacionit SNP, skedarët përmbanin kolona të shumta me numra që pasqyronin karakteristika të ndryshme, si intensiteti i leximit, frekuenca e aleleve të ndryshme, etj. Në total kishte rreth 30 kolona me vlera unike.

Qëllim

Ashtu si me çdo projekt të menaxhimit të të dhënave, gjëja më e rëndësishme ishte të përcaktohej se si do të përdoreshin të dhënat. Në këtë rast ne do të zgjedhim kryesisht modelet dhe rrjedhat e punës për SNP bazuar në SNP. Kjo do të thotë, do të na duhen vetëm të dhëna për një SNP në të njëjtën kohë. Më duhej të mësoja se si t'i riktheja të gjitha rekordet e lidhura me një nga 2,5 milionë SNP-të sa më lehtë, shpejt dhe më lirë të ishte e mundur.

Si të mos e bëni këtë

Për të cituar një klishe të përshtatshme:

Unë nuk dështova një mijë herë, thjesht zbulova një mijë mënyra për të shmangur analizimin e një grupi të dhënash në një format të përshtatshëm për pyetje.

Së pari provoni

Çfarë kam mësuar: Nuk ka asnjë mënyrë të lirë për të analizuar 25 TB në të njëjtën kohë.

Pasi ndoqa kursin "Metodat e avancuara për përpunimin e të dhënave të mëdha" në Universitetin Vanderbilt, isha i sigurt se mashtrimi ishte në çantë. Ndoshta do të duhen një ose dy orë për të konfiguruar serverin Hive për të ekzekutuar të gjitha të dhënat dhe për të raportuar rezultatin. Meqenëse të dhënat tona ruhen në AWS S3, unë përdora shërbimin Athinë, i cili ju lejon të aplikoni pyetjet e Hive SQL në të dhënat S3. Nuk keni nevojë të konfiguroni/ngritni një grup Hive, dhe gjithashtu paguani vetëm për të dhënat që kërkoni.

Pasi i tregova Athenës të dhënat e mia dhe formatin e tyre, bëra disa teste me pyetje si kjo:

select * from intensityData limit 10;

Dhe shpejt mori rezultate të strukturuara mirë. Gati.

Derisa u përpoqëm të përdorim të dhënat në punën tonë...

Më kërkuan të nxirrja të gjitha informacionet e SNP për të testuar modelin. Kam drejtuar pyetjen:


select * from intensityData 
where snp = 'rs123456';

...dhe filloi të priste. Pas tetë minutash dhe më shumë se 4 TB të të dhënave të kërkuara, mora rezultatin. Athena tarifon sipas vëllimit të të dhënave të gjetura, 5 dollarë për terabajt. Pra, kjo kërkesë e vetme kushtoi 20 dollarë dhe tetë minuta pritje. Për të ekzekutuar modelin në të gjitha të dhënat, na u desh të prisnim 38 vjet dhe të paguanim 50 milionë dollarë Natyrisht, kjo nuk ishte e përshtatshme për ne.

Ishte e nevojshme përdorimi i parketit...

Çfarë kam mësuar: Kini kujdes me madhësinë e skedarëve tuaj të Parketit dhe organizimin e tyre.

Fillimisht u përpoqa të rregulloja situatën duke i konvertuar të gjitha TSV-të në Skedare parketi. Ato janë të përshtatshme për të punuar me grupe të mëdha të dhënash, sepse informacioni në to ruhet në formë kolone: ​​secila kolonë qëndron në segmentin e saj të memories/diskut, në ndryshim nga skedarët e tekstit, në të cilët rreshtat përmbajnë elementë të secilës kolonë. Dhe nëse keni nevojë të gjeni diçka, atëherë thjesht lexoni kolonën e kërkuar. Për më tepër, çdo skedar ruan një sërë vlerash në një kolonë, kështu që nëse vlera që kërkoni nuk është në intervalin e kolonës, Spark nuk do të humbasë kohë duke skanuar të gjithë skedarin.

Kam kryer një detyrë të thjeshtë Ngjitës AWS për të konvertuar TSV-të tona në Parket dhe i hodhi skedarët e rinj në Athena. U deshën rreth 5 orë. Por kur bëra kërkesën, u desh pothuajse e njëjta sasi kohe dhe pak më pak para për të përfunduar. Fakti është se Spark, duke u përpjekur të optimizojë detyrën, thjesht shpaketoi një copë TSV dhe e vendosi në pjesën e vet të Parketit. Dhe për shkak se çdo copë ishte mjaft e madhe për të përmbajtur të gjitha të dhënat e shumë njerëzve, çdo skedar përmbante të gjitha SNP-të, kështu që Spark duhej të hapte të gjithë skedarët për të nxjerrë informacionin që i nevojitej.

Është interesante se lloji i ngjeshjes së paracaktuar (dhe i rekomanduar) i Parketit, i shpejtë, nuk mund të ndahet. Prandaj, çdo ekzekutues ishte i mbërthyer në detyrën e shpaketimit dhe shkarkimit të të dhënave të plota 3,5 GB.

Parimi i 25 TB duke përdorur AWK dhe R

Le ta kuptojmë problemin

Çfarë kam mësuar: Renditja është e vështirë, veçanërisht nëse të dhënat shpërndahen.

Më dukej se tani e kuptova thelbin e problemit. Më duhej vetëm të renditja të dhënat sipas kolonës SNP, jo sipas njerëzve. Pastaj disa SNP do të ruhen në një pjesë të veçantë të të dhënave dhe më pas funksioni "i zgjuar" i Parketit "i hapur vetëm nëse vlera është në intervalin" do të shfaqet në të gjithë lavdinë e tij. Fatkeqësisht, renditja e miliarda rreshtave të shpërndara nëpër një grup doli të ishte një detyrë e vështirë.

AWS definitivisht nuk dëshiron të lëshojë një rimbursim për shkak të arsyes "Unë jam një student i hutuar". Pasi vrapova me renditjen në Amazon Glue, ai funksionoi për 2 ditë dhe u rrëzua.

Po ndarjen?

Çfarë kam mësuar: Ndarjet në Spark duhet të jenë të balancuara.

Pastaj më lindi ideja e ndarjes së të dhënave në kromozome. Janë 23 prej tyre (dhe disa të tjera nëse merrni parasysh ADN-në mitokondriale dhe rajonet e pa hartë).
Kjo do t'ju lejojë të ndani të dhënat në copa më të vogla. Nëse shtoni vetëm një rresht në funksionin e eksportit Spark në skriptin Glue partition_by = "chr", atëherë të dhënat duhet të ndahen në kova.

Parimi i 25 TB duke përdorur AWK dhe R
Gjenomi përbëhet nga fragmente të shumta të quajtura kromozome.

Fatkeqësisht, nuk funksionoi. Kromozomet kanë madhësi të ndryshme, që do të thotë sasi të ndryshme informacioni. Kjo do të thotë se detyrat që Spark u dërgoi punëtorëve nuk ishin të balancuara dhe të përfunduara ngadalë sepse disa nyje mbaruan herët dhe ishin boshe. Megjithatë, detyrat u kryen. Por kur kërkohej një SNP, çekuilibri përsëri shkaktoi probleme. Kostoja e përpunimit të SNP-ve në kromozome më të mëdha (domethënë aty ku duam të marrim të dhëna) është ulur vetëm me rreth një faktor prej 10. Shumë, por jo mjaftueshëm.

Po sikur ta ndajmë në pjesë edhe më të vogla?

Çfarë kam mësuar: Asnjëherë mos u përpiqni të bëni 2,5 milionë ndarje fare.

Vendosa të shkoja plotësisht dhe ndava çdo SNP. Kjo siguroi që ndarjet të ishin të madhësisë së njëjtë. ISHTE NJË IDE E KEQ. Kam përdorur Glue dhe kam shtuar një linjë të pafajshme partition_by = 'snp'. Detyra filloi dhe filloi të ekzekutohej. Një ditë më vonë kontrollova dhe pashë që nuk kishte ende asgjë të shkruar në S3, kështu që e vrava detyrën. Duket sikur Glue po shkruante skedarë të ndërmjetëm në një vendndodhje të fshehur në S3, shumë skedarë, ndoshta disa milionë. Si rezultat, gabimi im kushtoi më shumë se një mijë dollarë dhe nuk i pëlqeu mentorit tim.

Ndarje + renditje

Çfarë kam mësuar: Renditja është ende e vështirë, siç është akordimi i Spark.

Përpjekja ime e fundit për ndarje përfshinte ndarjen e kromozomeve dhe më pas renditjen e secilës ndarje. Në teori, kjo do të shpejtonte çdo pyetje, sepse të dhënat e dëshiruara SNP duhej të ishin brenda disa pjesëve të parketit brenda një diapazoni të caktuar. Fatkeqësisht, renditja e të dhënave edhe të ndara doli të ishte një detyrë e vështirë. Si rezultat, kalova në EMR për një grup të personalizuar dhe përdora tetë shembuj të fuqishëm (C5.4xl) dhe Sparklyr për të krijuar një rrjedhë pune më fleksibël...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

Megjithatë, detyra ende nuk u krye. E konfigurova në mënyra të ndryshme: rrita ndarjen e memories për secilin ekzekutues të pyetjes, përdora nyje me një sasi të madhe memorie, përdora variabla të transmetimit (ndryshoret e transmetimit), por sa herë që këto rezultuan të ishin gjysmë-masë, dhe gradualisht ekzekutuesit filluan të dështojë derisa gjithçka të ndalojë.

Unë jam duke u bërë më kreativ

Çfarë kam mësuar: Ndonjëherë të dhënat e veçanta kërkojnë zgjidhje të veçanta.

Çdo SNP ka një vlerë pozicioni. Ky është një numër që korrespondon me numrin e bazave përgjatë kromozomit të tij. Kjo është një mënyrë e bukur dhe e natyrshme për të organizuar të dhënat tona. Në fillim doja të ndaja sipas rajoneve të secilit kromozom. Për shembull, pozicionet 1 - 2000, 2001 - 4000, etj. Por problemi është se SNP-të nuk shpërndahen në mënyrë të barabartë nëpër kromozome, kështu që madhësitë e grupit do të ndryshojnë shumë.

Parimi i 25 TB duke përdorur AWK dhe R

Si rezultat, arrita në një ndarje të pozicioneve në kategori (gradë). Duke përdorur të dhënat e shkarkuara tashmë, bëra një kërkesë për të marrë një listë të SNP-ve unike, pozicionet dhe kromozomet e tyre. Më pas i rendita të dhënat brenda secilit kromozom dhe mblodha SNP në grupe (koshi) të një madhësie të caktuar. Le të themi 1000 SNP secila. Kjo më dha marrëdhënien SNP-në-grup-për-kromozom.

Në fund bëra grupe (bin) prej 75 SNP, arsyeja do të shpjegohet më poshtë.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Së pari provoni me Spark

Çfarë kam mësuar: Grumbullimi i shkëndijës është i shpejtë, por ndarja është ende e shtrenjtë.

Doja të lexoja këtë kornizë të vogël të dhënash (2,5 milion rreshta) në Spark, ta kombinoja me të dhënat e papërpunuara dhe më pas ta ndaja sipas kolonës së shtuar rishtazi bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

une e perdora sdf_broadcast(), kështu që Spark e di se duhet të dërgojë kornizën e të dhënave në të gjitha nyjet. Kjo është e dobishme nëse të dhënat janë në përmasa të vogla dhe kërkohen për të gjitha detyrat. Përndryshe, Spark përpiqet të jetë i zgjuar dhe shpërndan të dhëna sipas nevojës, gjë që mund të shkaktojë ngadalësim.

Dhe përsëri, ideja ime nuk funksionoi: detyrat funksionuan për ca kohë, përfunduan bashkimin dhe më pas, si ekzekutorët e nisur nga ndarja, filluan të dështojnë.

Duke shtuar AWK

Çfarë kam mësuar: Mos fle kur të mësojnë bazat. Me siguri dikush tashmë e ka zgjidhur problemin tuaj në vitet 1980.

Deri në këtë pikë, arsyeja për të gjitha dështimet e mia me Spark ishte grumbullimi i të dhënave në grup. Ndoshta situata mund të përmirësohet me trajtimin paraprak. Vendosa të provoja të ndaja të dhënat e tekstit të papërpunuar në kolona kromozomesh, kështu që shpresoja t'i jepja Spark të dhëna "të para-ndara".

Kërkova në StackOverflow për mënyrën e ndarjes sipas vlerave të kolonave dhe gjeta një përgjigje kaq e madhe. Me AWK ju mund të ndani një skedar teksti sipas vlerave të kolonës duke e shkruar atë në një skenar në vend që t'i dërgoni rezultatet në stdout.

Shkrova një skenar Bash për ta provuar. Shkarkuar një nga TSV-të e paketuara, më pas e shpaketuan duke përdorur gzip dhe dërguar në awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Funksionoi!

Mbushja e bërthamave

Çfarë kam mësuar: gnu parallel - është një gjë magjike, të gjithë duhet ta përdorin atë.

Ndarja ishte mjaft e ngadaltë dhe kur fillova htoppër të kontrolluar përdorimin e një shembulli të fuqishëm (dhe të shtrenjtë) EC2, doli që përdorja vetëm një bërthamë dhe rreth 200 MB memorie. Për të zgjidhur problemin dhe për të mos humbur shumë para, duhej të kuptonim se si të paralelizonim punën. Për fat të mirë, në një libër absolutisht të mahnitshëm Shkenca e të dhënave në vijën e komandës Gjeta një kapitull nga Jeron Janssens mbi paralelizimin. Prej saj mësova gnu parallel, një metodë shumë fleksibël për zbatimin e multithreading në Unix.

Parimi i 25 TB duke përdorur AWK dhe R
Kur fillova ndarjen duke përdorur procesin e ri, gjithçka ishte në rregull, por kishte ende një pengesë - shkarkimi i objekteve S3 në disk nuk ishte shumë i shpejtë dhe jo plotësisht i paralelizuar. Për ta rregulluar këtë, bëra këtë:

  1. Zbulova se është e mundur të zbatohet faza e shkarkimit S3 direkt në tubacion, duke eliminuar plotësisht ruajtjen e ndërmjetme në disk. Kjo do të thotë që unë mund të shmang shkrimin e të dhënave të papërpunuara në disk dhe të përdor ruajtje edhe më të vogël, dhe për këtë arsye më të lirë, në AWS.
  2. ekipi aws configure set default.s3.max_concurrent_requests 50 rriti shumë numrin e temave që përdor AWS CLI (si parazgjedhje janë 10).
  3. Kalova në një shembull EC2 të optimizuar për shpejtësinë e rrjetit, me shkronjën n në emër. Kam zbuluar se humbja e fuqisë përpunuese kur përdoren n-instanca kompensohet më shumë nga rritja e shpejtësisë së ngarkimit. Për shumicën e detyrave kam përdorur c5n.4xl.
  4. Ndryshuar gzip mbi pigz, ky është një mjet gzip që mund të bëjë gjëra interesante për të paralelizuar detyrën fillimisht jo të paralelizuar të dekompresimit të skedarëve (kjo ndihmoi më së paku).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Këto hapa kombinohen me njëri-tjetrin për të bërë gjithçka të funksionojë shumë shpejt. Duke rritur shpejtësinë e shkarkimit dhe duke eliminuar shkrimet në disk, tani mund të përpunoj një paketë 5 terabajt në vetëm disa orë.

Ky cicërimë duhet të kishte përmendur 'TSV'. Mjerisht.

Përdorimi i të dhënave të analizuara rishtazi

Çfarë kam mësuar: Spark pëlqen të dhënat e pakompresuara dhe nuk i pëlqen kombinimi i ndarjeve.

Tani të dhënat ishin në S3 në një format të papaketuar (lexo: të përbashkët) dhe gjysmë të porositur, dhe unë mund të kthehesha përsëri në Spark. Më priste një surprizë: Sërish nuk arrita të arrij atë që doja! Ishte shumë e vështirë t'i thuash Shkëndijës saktësisht se si u ndanë të dhënat. Dhe edhe kur e bëra këtë, doli që kishte shumë ndarje (95 mijë), dhe kur përdora coalesce e zvogëloi numrin e tyre në kufij të arsyeshëm, kjo shkatërroi ndarjen time. Jam i sigurt se kjo mund të rregullohet, por pas disa ditësh kërkime nuk gjeta një zgjidhje. Përfundimisht i përfundova të gjitha detyrat në Spark, megjithëse u desh një kohë dhe skedarët e mi të ndarë të Parketit nuk ishin shumë të vegjël (~200 KB). Megjithatë, të dhënat ishin aty ku duheshin.

Parimi i 25 TB duke përdorur AWK dhe R
Shumë e vogël dhe e pabarabartë, e mrekullueshme!

Testimi i pyetjeve lokale të Spark

Çfarë kam mësuar: Shkëndija ka shumë shpenzime kur zgjidh probleme të thjeshta.

Duke shkarkuar të dhënat në një format të zgjuar, unë munda të testoja shpejtësinë. Vendosni një skript R për të ekzekutuar një server lokal Spark dhe më pas ngarkoni një kornizë të dhënash Spark nga ruajtja e specifikuar e grupit Parquet (koshi). U përpoqa të ngarkoja të gjitha të dhënat, por nuk arrita ta bëj Sparklyr të njohë ndarjen.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Ekzekutimi zgjati 29,415 sekonda. Shumë më mirë, por jo shumë i mirë për testimin masiv të ndonjë gjëje. Për më tepër, nuk mund t'i shpejtoja gjërat me ruajtjen e memories, sepse kur u përpoqa të ruaj një kornizë të dhënash në memorie, Spark gjithmonë rrëzohej, edhe kur ndava më shumë se 50 GB memorie për një grup të dhënash që peshonte më pak se 15.

Kthehu në AWK

Çfarë kam mësuar: Vargjet shoqëruese në AWK janë shumë efikase.

Kuptova se mund të arrija shpejtësi më të larta. E kujtova në një mënyrë të mrekullueshme Tutorial AWK nga Bruce Barnett Kam lexuar për një veçori interesante të quajtur "vargjeve asociative" Në thelb, këto janë çifte me vlerë kyçe, të cilat për disa arsye quheshin ndryshe në AWK, dhe për këtë arsye unë disi nuk mendova shumë për to. Roman Cheplyaka kujtoi se termi "vargje shoqëruese" është shumë më i vjetër se termi "çift-vlerë çelës". Edhe nëse ju kërkoni çelësin-vlerën në Google Ngram, nuk do ta shihni këtë term atje, por do të gjeni vargje shoqëruese! Për më tepër, "çifti çelës-vlerë" shoqërohet më shpesh me bazat e të dhënave, kështu që ka shumë më kuptim ta krahasoni atë me një hartë. Kuptova se mund t'i përdorja këto grupe shoqëruese për të lidhur SNP-të e mia me një tabelë koshi dhe të dhëna të papërpunuara pa përdorur Spark.

Për ta bërë këtë, në skriptin AWK përdora bllokun BEGIN. Kjo është një pjesë e kodit që ekzekutohet përpara se rreshti i parë i të dhënave të kalojë në trupin kryesor të skriptit.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Ekip while(getline...) ngarkoi të gjitha rreshtat nga grupi CSV (koshi), vendosi kolonën e parë (emri SNP) si çelës për grupin shoqërues bin dhe vlera e dytë (grupi) si vlerë. Pastaj në bllok { }, i cili ekzekutohet në të gjitha linjat e skedarit kryesor, çdo rresht dërgohet në skedarin dalës, i cili merr një emër unik në varësi të grupit të tij (bin): ..._bin_"bin[$1]"_....

variabla batch_num и chunk_id përputhet me të dhënat e ofruara nga tubacioni, duke shmangur një gjendje gare dhe çdo fill ekzekutimi që funksionon parallel, i shkroi skedarit të vet unik.

Meqenëse i shpërndava të gjitha të dhënat e papërpunuara në dosje në kromozomet e mbetura nga eksperimenti im i mëparshëm me AWK, tani mund të shkruaj një skript tjetër Bash për të përpunuar një kromozom në një kohë dhe për të dërguar të dhëna më të thella të ndara në S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skenari ka dy seksione parallel.

Në seksionin e parë, të dhënat lexohen nga të gjithë skedarët që përmbajnë informacion mbi kromozomin e dëshiruar, më pas këto të dhëna shpërndahen nëpër fije, të cilat i shpërndajnë skedarët në grupet e duhura (bin). Për të shmangur kushtet e garës kur thread të shumta shkruajnë në të njëjtin skedar, AWK i kalon emrat e skedarëve për të shkruar të dhëna në vende të ndryshme, p.sh. chr_10_bin_52_batch_2_aa.csv. Si rezultat, shumë skedarë të vegjël krijohen në disk (për këtë kam përdorur vëllime terabyte EBS).

Transportues nga seksioni i dytë parallel kalon nëpër grupe (bin) dhe kombinon skedarët e tyre individualë në CSV të përbashkët c catdhe më pas i dërgon për eksport.

Transmetimi në R?

Çfarë kam mësuar: Mund te kontaktoni stdin и stdout nga një skript R, dhe për këtë arsye përdorni atë në tubacion.

Ju mund ta keni vënë re këtë rresht në skriptin tuaj Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ai përkthen të gjithë skedarët e grupit të bashkuar (bin) në skriptin R më poshtë. {} është një teknikë e veçantë parallel, i cili fut çdo të dhënë që dërgon në rrjedhën e specifikuar direkt në vetë komandën. Opsioni {#} ofron një ID unike të fillit, dhe {%} përfaqëson numrin e vendit të punës (të përsëritur, por asnjëherë njëkohësisht). Një listë e të gjitha opsioneve mund të gjendet në dokumentacionin.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kur një ndryshore file("stdin") transmetohet në readr::read_csv, të dhënat e përkthyera në skriptin R ngarkohen në një kornizë, e cila më pas është në formë .rds- skedar duke përdorur aws.s3 shkruar direkt në S3.

RDS është diçka si një version i ri i Parketit, pa njollat ​​e ruajtjes së altoparlantëve.

Pasi mbarova skenarin Bash, mora një pako .rds-skedarët e vendosur në S3, të cilat më lejuan të përdor kompresim efikas dhe lloje të integruara.

Pavarësisht përdorimit të frenave R, gjithçka funksionoi shumë shpejt. Jo çuditërisht, pjesët e R që lexojnë dhe shkruajnë të dhëna janë shumë të optimizuara. Pas testimit në një kromozom me madhësi mesatare, puna përfundoi në një shembull C5n.4xl në rreth dy orë.

Kufizimet e S3

Çfarë kam mësuar: Falë zbatimit të rrugës inteligjente, S3 mund të trajtojë shumë skedarë.

Isha i shqetësuar nëse S3 do të ishte në gjendje të trajtonte skedarët e shumtë që u transferuan në të. Unë mund të bëj që emrat e skedarëve të kenë kuptim, por si do t'i kërkonte S3 ato?

Parimi i 25 TB duke përdorur AWK dhe R
Dosjet në S3 janë vetëm për shfaqje, në fakt sistemi nuk është i interesuar për simbolin /. Nga faqja FAQ S3.

Duket se S3 përfaqëson shtegun drejt një skedari të caktuar si një çelës i thjeshtë në një lloj tabele hash ose bazë të dhënash të bazuar në dokumente. Një kovë mund të konsiderohet si një tabelë dhe skedarët mund të konsiderohen si regjistrime në atë tabelë.

Meqenëse shpejtësia dhe efikasiteti janë të rëndësishme për të bërë një fitim në Amazon, nuk është për t'u habitur që ky sistem i rrugës së çelësit si skedar është i optimizuar jashtëzakonisht. U përpoqa të gjeja një ekuilibër: në mënyrë që të mos më duhej të bëja shumë kërkesa, por që kërkesat të ekzekutoheshin shpejt. Doli se është më mirë të bësh rreth 20 mijë skedarë koshi. Unë mendoj se nëse vazhdojmë të optimizojmë, mund të arrijmë një rritje të shpejtësisë (për shembull, duke bërë një kovë të veçantë vetëm për të dhënat, duke zvogëluar kështu madhësinë e tabelës së kërkimit). Por nuk kishte kohë apo para për eksperimente të mëtejshme.

Po në lidhje me përputhshmërinë e kryqëzuar?

Çfarë mësova: Shkaku numër një i humbjes së kohës është optimizimi i parakohshëm i metodës suaj të ruajtjes.

Në këtë pikë, është shumë e rëndësishme të pyesni veten: "Pse të përdorni një format skedari të pronarit?" Arsyeja qëndron në shpejtësinë e ngarkimit (skedarët CSV të gzipped u deshën 7 herë më shumë për t'u ngarkuar) dhe përputhshmëria me rrjedhat tona të punës. Mund të rishqyrtoj nëse R mund të ngarkojë me lehtësi skedarë Parquet (ose Shigjeta) pa ngarkesën Spark. Të gjithë në laboratorin tonë përdorin R, dhe nëse më duhet t'i konvertoj të dhënat në një format tjetër, unë i kam ende të dhënat origjinale të tekstit, kështu që thjesht mund të ekzekutoj përsëri tubacionin.

Ndarja e punës

Çfarë kam mësuar: Mos u mundoni të optimizoni punët me dorë, lëreni kompjuterin ta bëjë atë.

Kam korrigjuar rrjedhën e punës në një kromozom, tani më duhet të përpunoj të gjitha të dhënat e tjera.
Doja të ngreja disa raste EC2 për konvertim, por në të njëjtën kohë kisha frikë se mos merrja një ngarkesë shumë të pabalancuar në punë të ndryshme përpunimi (ashtu si Spark vuajti nga ndarjet e çekuilibruara). Për më tepër, nuk isha i interesuar të ngrija një shembull për kromozom, sepse për llogaritë AWS ekziston një kufi i paracaktuar prej 10 shembujsh.

Pastaj vendosa të shkruaj një skenar në R për të optimizuar punët e përpunimit.

Së pari, i kërkova S3 të llogariste se sa hapësirë ​​ruajtëse zinte secili kromozom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Pastaj shkrova një funksion që merr madhësinë totale, përzien rendin e kromozomeve, i ndan ato në grupe num_jobs dhe ju tregon se sa të ndryshme janë madhësitë e të gjitha punëve të përpunimit.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Pastaj kalova nëpër një mijë riorganizime duke përdorur purrr dhe zgjodha më të mirën.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Kështu përfundova me një grup detyrash që ishin shumë të ngjashme në madhësi. Pastaj gjithçka që mbeti ishte të mbyllja skenarin tim të mëparshëm Bash në një lak të madh for. Ky optimizim mori rreth 10 minuta për t'u shkruar. Dhe kjo është shumë më pak se sa do të shpenzoja për krijimin manualisht të detyrave nëse ato do të ishin të pabalancuara. Prandaj, mendoj se kisha të drejtë me këtë optimizim paraprak.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Në fund shtoj komandën e mbylljes:

sudo shutdown -h now

... dhe gjithçka funksionoi! Duke përdorur AWS CLI, unë ngrita shembuj duke përdorur opsionin user_data u dha atyre skriptet Bash të detyrave të tyre për përpunim. Ata funksionuan dhe u mbyllën automatikisht, kështu që unë nuk po paguaja për fuqi shtesë përpunuese.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Le të paketojmë!

Çfarë kam mësuar: API duhet të jetë i thjeshtë për hir të lehtësisë dhe fleksibilitetit të përdorimit.

Më në fund i mora të dhënat në vendin dhe formën e duhur. Ajo që mbeti ishte thjeshtimi i procesit të përdorimit të të dhënave sa më shumë që të ishte e mundur për ta bërë më të lehtë për kolegët e mi. Doja të bëja një API të thjeshtë për krijimin e kërkesave. Nëse në të ardhmen vendos të kaloj nga .rds te dosjet e parketit, atëherë ky duhet të jetë problem për mua, jo për kolegët e mi. Për këtë vendosa të bëj një paketë të brendshme R.

Ndërtoni dhe dokumentoni një paketë shumë të thjeshtë që përmban vetëm disa funksione të aksesit të të dhënave të organizuara rreth një funksioni get_snp. Kam krijuar edhe një faqe interneti për kolegët e mi pkgdown, kështu që ata mund të shohin lehtësisht shembuj dhe dokumentacion.

Parimi i 25 TB duke përdorur AWK dhe R

Memorie inteligjente

Çfarë kam mësuar: Nëse të dhënat tuaja janë të përgatitura mirë, ruajtja në memorie do të jetë e lehtë!

Meqenëse një nga flukset kryesore të punës aplikoi të njëjtin model analize në paketën SNP, vendosa të përdor lidhjen në avantazhin tim. Gjatë transmetimit të të dhënave përmes SNP, i gjithë informacioni nga grupi (bina) i bashkëngjitet objektit të kthyer. Kjo do të thotë, pyetjet e vjetra mund (në teori) të shpejtojnë përpunimin e pyetjeve të reja.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Kur ndërtova paketën, përdora shumë standarde për të krahasuar shpejtësinë kur përdor metoda të ndryshme. Unë rekomandoj të mos e neglizhoni këtë, sepse ndonjëherë rezultatet janë të papritura. Për shembull, dplyr::filter ishte shumë më i shpejtë se kapja e rreshtave duke përdorur filtrimin e bazuar në indeksim, dhe marrja e një kolone të vetme nga një kornizë e filtruar e të dhënave ishte shumë më e shpejtë sesa përdorimi i sintaksës së indeksimit.

Ju lutemi vini re se objekti prev_snp_results përmban çelësin snps_in_bin. Ky është një grup i të gjitha SNP-ve unike në një grup (kosh), që ju lejon të kontrolloni shpejt nëse keni tashmë të dhëna nga një pyetje e mëparshme. Gjithashtu e bën të lehtë kalimin e të gjitha SNP-ve në një grup (bin) me këtë kod:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Gjetjet

Tani ne mund (dhe kemi filluar të zbatojmë seriozisht) modele dhe skenarë që më parë ishin të paarritshëm për ne. Gjëja më e mirë është se kolegët e mi të laboratorit nuk duhet të mendojnë për ndonjë ndërlikim. Ata thjesht kanë një funksion që funksionon.

Dhe megjithëse paketa i kursen atyre detajet, unë u përpoqa ta bëja formatin e të dhënave mjaft të thjeshtë që ata të mund ta kuptonin nëse unë u zhduka papritur nesër...

Shpejtësia është rritur ndjeshëm. Zakonisht skanojmë fragmente të gjenomit me rëndësi funksionale. Më parë, ne nuk mund ta bënim këtë (doli të ishte shumë i shtrenjtë), por tani, falë strukturës së grupit (koshi) dhe ruajtjes së memories, një kërkesë për një SNP zgjat mesatarisht më pak se 0,1 sekonda, dhe përdorimi i të dhënave është kaq ulët se kostot për S3 janë kikirikë.

Përfundim

Ky artikull nuk është aspak një udhëzues. Zgjidhja doli të ishte individuale, dhe pothuajse me siguri jo optimale. Përkundrazi, është një udhëtim. Dua që të tjerët të kuptojnë se vendime të tilla nuk duken plotësisht të formuara në kokë, ato janë rezultat i provës dhe gabimit. Gjithashtu, nëse jeni duke kërkuar për një shkencëtar të të dhënave, mbani në mend se përdorimi i këtyre mjeteve në mënyrë efektive kërkon përvojë dhe përvoja kushton para. Jam e lumtur që kisha mjetet për të paguar, por shumë të tjerë që mund të bëjnë të njëjtën punë më mirë se unë nuk do të kenë kurrë mundësinë për mungesë parash të provojnë.

Mjetet e të dhënave të mëdha janë të gjithanshme. Nëse keni kohë, pothuajse me siguri mund të shkruani një zgjidhje më të shpejtë duke përdorur teknikat inteligjente të pastrimit, ruajtjes dhe nxjerrjes së të dhënave. Në fund të fundit, bëhet fjalë për një analizë kosto-përfitim.

Ajo që mësova:

  • nuk ka asnjë mënyrë të lirë për të analizuar 25 TB në të njëjtën kohë;
  • Kini kujdes me madhësinë e skedarëve tuaj të Parketit dhe organizimin e tyre;
  • Ndarjet në Spark duhet të jenë të balancuara;
  • Në përgjithësi, kurrë mos u përpiqni të bëni 2,5 milionë ndarje;
  • Renditja është ende e vështirë, siç është vendosja e Spark;
  • ndonjëherë të dhënat e veçanta kërkojnë zgjidhje të veçanta;
  • Grumbullimi i shkëndijës është i shpejtë, por ndarja është ende e shtrenjtë;
  • mos flini kur ju mësojnë bazat, dikush ndoshta e ka zgjidhur tashmë problemin tuaj në vitet 1980;
  • gnu parallel - kjo është një gjë magjike, të gjithë duhet ta përdorin atë;
  • Spark pëlqen të dhënat e pakompresuara dhe nuk i pëlqen kombinimi i ndarjeve;
  • Shkëndija ka shumë shpenzime kur zgjidh probleme të thjeshta;
  • Vargjet shoqëruese të AWK janë shumë efikase;
  • mund te kontaktoni stdin и stdout nga një skript R, dhe për këtë arsye përdorni atë në tubacion;
  • Falë zbatimit të rrugës inteligjente, S3 mund të përpunojë shumë skedarë;
  • Arsyeja kryesore për humbjen e kohës është optimizimi i parakohshëm i metodës suaj të ruajtjes;
  • mos u përpiqni të optimizoni detyrat me dorë, lëreni kompjuterin ta bëjë atë;
  • API duhet të jetë i thjeshtë për hir të lehtësisë dhe fleksibilitetit të përdorimit;
  • Nëse të dhënat tuaja janë të përgatitura mirë, ruajtja në memorie do të jetë e lehtë!

Burimi: www.habr.com

Shto një koment