Að flokka 25TB með AWK og R

Að flokka 25TB með AWK og R
Hvernig á að lesa þessa grein: Ég biðst afsökunar á því að textinn er svona langur og óreiðukenndur. Til að spara þér tíma byrja ég hvern kafla á „Það sem ég lærði“ inngang, sem dregur saman kjarna kaflans í einni eða tveimur setningum.

"Sýndu mér bara lausnina!" Ef þú vilt bara sjá hvaðan ég kom, slepptu þá yfir í kaflann „Að verða frumlegri,“ en ég held að það sé áhugaverðara og gagnlegra að lesa um mistök.

Mér var nýlega falið að setja upp ferli til að vinna úr miklu magni af hráum DNA röðum (tæknilega séð SNP flís). Þörfin var að fá fljótt gögn um tiltekna erfðafræðilega staðsetningu (kallað SNP) fyrir síðari líkanagerð og önnur verkefni. Með því að nota R og AWK gat ég hreinsað og skipulagt gögn á náttúrulegan hátt, sem flýtti mjög fyrir úrvinnslu fyrirspurna. Þetta var ekki auðvelt fyrir mig og þurfti margar endurtekningar. Þessi grein mun hjálpa þér að forðast sum mistök mín og sýna þér hvað ég endaði með.

Í fyrsta lagi nokkrar inngangsskýringar.

Gögn

Erfðaupplýsingamiðstöð háskólans okkar útvegaði okkur gögn í formi 25 TB TSV. Ég fékk þeim skipt í 5 Gzip-þjappaða pakka, sem hver innihélt um 240 fjögurra gígabæta skrár. Hver röð innihélt gögn fyrir einn SNP frá einum einstaklingi. Alls voru send gögn um ~2,5 milljónir SNP og ~60 þúsund manns. Auk SNP-upplýsinga innihéldu skrárnar fjölmarga dálka með tölum sem endurspegla ýmsa eiginleika, svo sem lesstyrk, tíðni mismunandi samsæta osfrv. Alls voru um 30 dálkar með einstökum gildum.

Markmið

Eins og með öll gagnastjórnunarverkefni var mikilvægast að ákveða hvernig gögnin yrðu notuð. Í þessu tilfelli við munum aðallega velja líkön og verkflæði fyrir SNP byggt á SNP. Það er, við munum aðeins þurfa gögn um einn SNP í einu. Ég þurfti að læra hvernig á að sækja allar færslur sem tengjast einum af 2,5 milljónum SNP eins auðveldlega, fljótt og ódýrt og mögulegt er.

Hvernig á ekki að gera þetta

Til að vitna í viðeigandi klisju:

Ég mistókst ekki þúsund sinnum, ég uppgötvaði bara þúsund leiðir til að forðast að flokka fullt af gögnum á fyrirspurnavænu sniði.

Fyrsta tilraun

Hvað hef ég lært: Það er engin ódýr leið til að flokka 25 TB í einu.

Eftir að hafa tekið námskeiðið „Advanced Methods for Big Data Processing“ í Vanderbilt háskólanum var ég viss um að bragðið væri í pokanum. Það mun líklega taka klukkutíma eða tvo að setja upp Hive netþjóninn til að keyra í gegnum öll gögnin og tilkynna niðurstöðuna. Þar sem gögnin okkar eru geymd í AWS S3 notaði ég þjónustuna Athena, sem gerir þér kleift að beita Hive SQL fyrirspurnum á S3 gögn. Þú þarft ekki að setja upp / hækka Hive þyrping, og þú borgar líka aðeins fyrir gögnin sem þú ert að leita að.

Eftir að ég sýndi Aþenu gögnin mín og snið þeirra, keyrði ég nokkur próf með fyrirspurnum eins og þessari:

select * from intensityData limit 10;

Og fékk fljótt vel uppbyggðar niðurstöður. Tilbúið.

Þangað til við reyndum að nota gögnin í vinnu okkar...

Ég var beðinn um að draga út allar SNP upplýsingar til að prófa líkanið á. Ég rak fyrirspurnina:


select * from intensityData 
where snp = 'rs123456';

...og fór að bíða. Eftir átta mínútur og meira en 4 TB af umbeðnum gögnum fékk ég niðurstöðuna. Athena rukkar eftir magni gagna sem finnast, $5 á terabætið. Svo þessi eina beiðni kostaði $20 og átta mínútna bið. Til að keyra líkanið á öllum gögnum þurftum við að bíða í 38 ár og borga 50 milljónir dollara. Augljóslega hentaði þetta okkur ekki.

Það þurfti að nota parket...

Hvað hef ég lært: Vertu varkár með stærð Parket skránna og skipulag þeirra.

Ég reyndi fyrst að laga ástandið með því að breyta öllum TSV í Parket skrár. Þau eru þægileg til að vinna með stór gagnasöfn þar sem upplýsingarnar í þeim eru geymdar í dálkaformi: hver dálkur liggur í eigin minni/diskhluta, öfugt við textaskrár, þar sem línur innihalda þætti hvers dálks. Og ef þú þarft að finna eitthvað, þá skaltu bara lesa nauðsynlegan dálk. Að auki geymir hver skrá fjölda gilda í dálki, þannig að ef gildið sem þú ert að leita að er ekki innan dálksins mun Spark ekki eyða tíma í að skanna alla skrána.

Ég hljóp einfalt verkefni AWS lím til að breyta TSV-tækjunum okkar í Parket og slepptu nýju skránum í Athena. Það tók um 5 klst. En þegar ég rak beiðnina tók það um það bil sama tíma og aðeins minni peninga að klára. Staðreyndin er sú að Spark, sem reyndi að hagræða verkefninu, pakkaði einfaldlega upp einum TSV bita og setti hann í sinn eigin Parket bita. Og vegna þess að hver klumpur var nógu stór til að geyma allar skrár margra, innihélt hver skrá öll SNP, svo Spark varð að opna allar skrárnar til að draga út þær upplýsingar sem hún þurfti.

Athyglisvert er að sjálfgefin (og ráðlögð) þjöppunargerð Parket, snappy, er ekki skiptanleg. Þess vegna var hver framkvæmdaraðili fastur við það verkefni að taka upp og hlaða niður öllu 3,5 GB gagnasafninu.

Að flokka 25TB með AWK og R

Við skulum skilja vandamálið

Hvað hef ég lært: Flokkun er erfið, sérstaklega ef gögnunum er dreift.

Mér fannst ég nú skilja kjarna vandans. Ég þurfti aðeins að flokka gögnin eftir SNP dálki, ekki eftir fólki. Þá verða nokkrir SNPs geymdir í sérstökum gagnaklumpi og þá mun „snjall“ aðgerð Parket „opna aðeins ef gildið er á bilinu“ sýna sig í allri sinni dýrð. Því miður reyndist það erfitt verkefni að raða í gegnum milljarða raða á víð og dreif um þyrping.

AWS vill örugglega ekki gefa út endurgreiðslu vegna „Ég er annars hugar námsmaður“ ástæðu. Eftir að ég keyrði flokkun á Amazon Glue keyrði það í 2 daga og hrundi.

Hvað með skiptinguna?

Hvað hef ég lært: Skilrúm í Spark verða að vera í jafnvægi.

Svo datt mér í hug að skipta gögnum í litninga. Þeir eru 23 (og nokkrir fleiri ef tekið er tillit til DNA hvatbera og ókortlögð svæði).
Þetta gerir þér kleift að skipta gögnunum í smærri klumpur. Ef þú bætir aðeins einni línu við Spark útflutningsaðgerðina í Glue scriptinu partition_by = "chr", þá ætti að skipta gögnunum í fötu.

Að flokka 25TB með AWK og R
Erfðamengið samanstendur af fjölmörgum brotum sem kallast litningar.

Því miður tókst það ekki. Litningar hafa mismunandi stærð, sem þýðir mismikið magn upplýsinga. Þetta þýðir að verkefnin sem Spark sendi til starfsmanna voru ekki í jafnvægi og kláruðust hægt vegna þess að sumir hnútar luku snemma og voru aðgerðalausir. Verkefnin voru hins vegar unnin. En þegar beðið var um einn SNP olli ójafnvægið aftur vandamálum. Kostnaður við að vinna SNP á stærri litningum (þ.e. þar sem við viljum fá gögn) hefur aðeins lækkað um það bil 10. Mikið, en ekki nóg.

Hvað ef við skiptum því í enn smærri hluta?

Hvað hef ég lært: Reyndu aldrei að gera 2,5 milljón skipting yfirleitt.

Ég ákvað að fara út og skipta hverri SNP í sundur. Þetta tryggði að skilrúmin voru jafnstór. ÞAÐ VAR SLEGT HUGMYND. Ég notaði Glue og bætti við saklausri línu partition_by = 'snp'. Verkefnið byrjaði og byrjaði að framkvæma. Degi síðar athugaði ég og sá að það var enn ekkert skrifað á S3, svo ég drap á verkefninu. Það lítur út fyrir að Glue hafi verið að skrifa milliskrár á falinn stað í S3, fullt af skrám, kannski nokkrar milljónir. Þess vegna kostuðu mistök mín meira en þúsund dollara og þóknuðu leiðbeinanda mínum ekki.

Skipting + flokkun

Hvað hef ég lært: Flokkun er enn erfið, eins og að stilla Spark.

Síðasta tilraun mín til að skipta í sundur fól í sér að ég skipti litningunum og flokkaði síðan hverja skiptingu. Fræðilega séð myndi þetta flýta fyrir hverri fyrirspurn vegna þess að æskileg SNP gögn þurftu að vera innan nokkurra parketbita innan tiltekins sviðs. Því miður reyndist það erfitt verkefni að flokka jafnvel skipt gögn. Fyrir vikið skipti ég yfir í EMR fyrir sérsniðna klasa og notaði átta öflug tilvik (C5.4xl) og Sparklyr til að búa til sveigjanlegra vinnuflæði...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...verkinu var þó enn ekki lokið. Ég stillti það á mismunandi vegu: jók minnisúthlutun fyrir hvern fyrirspurnarframkvæmda, notaði hnúta með miklu minni, notaði útsendingarbreytur (útsendingarbreytur), en í hvert skipti reyndust þetta vera hálfgert mál og smám saman byrjuðu framkvæmdaraðilarnir að mistakast þar til allt stoppaði.

Ég er að verða skapandi

Hvað hef ég lært: Stundum krefjast sérstakra gagna sérlausna.

Hver SNP hefur stöðugildi. Þetta er tala sem samsvarar fjölda basa meðfram litningi hans. Þetta er fín og eðlileg leið til að skipuleggja gögnin okkar. Í fyrstu vildi ég skipta eftir svæðum á hverjum litningi. Til dæmis, stöður 1 - 2000, 2001 - 4000 osfrv. En vandamálið er að SNPs dreifast ekki jafnt yfir litningana, þannig að hópastærðir verða því mjög mismunandi.

Að flokka 25TB með AWK og R

Í kjölfarið komst ég að því að skiptast á stöður í flokka (stöðu). Með því að nota gögnin sem þegar var hlaðið niður, sendi ég beiðni um að fá lista yfir einstaka SNP, stöðu þeirra og litninga. Síðan flokkaði ég gögnin innan hvers litninga og safnaði SNP í hópa (bin) af ákveðinni stærð. Segjum 1000 SNPs hver. Þetta gaf mér SNP-til-hóp-á-litninga sambandið.

Að lokum bjó ég til hópa (bin) af 75 SNP, ástæðan verður útskýrð hér að neðan.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Reyndu fyrst með Spark

Hvað hef ég lært: Neistasöfnun er hröð, en skipting er samt dýr.

Mig langaði að lesa þennan litla (2,5 milljón raðir) gagnaramma inn í Spark, sameina hann við hrá gögnin og skipta honum síðan í dálkinn sem nýlega var bætt við. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ég notaði sdf_broadcast(), svo Spark veit að það ætti að senda gagnarammann til allra hnúta. Þetta er gagnlegt ef gögnin eru lítil að stærð og nauðsynleg fyrir öll verkefni. Annars reynir Spark að vera klár og dreifir gögnum eftir þörfum, sem getur valdið hægagangi.

Og aftur, hugmynd mín gekk ekki upp: verkefnin virkuðu í nokkurn tíma, kláruðu sambandið og síðan, eins og executors sem settir voru af stað með skiptingu, fóru þau að mistakast.

Bætir við AWK

Hvað hef ég lært: Ekki sofa þegar þú ert að kenna grunnatriðin. Vissulega hefur einhver þegar leyst vandamálið þitt á níunda áratugnum.

Hingað til var ástæðan fyrir öllum mistökum mínum með Spark ruglið af gögnum í þyrpingunni. Kannski er hægt að bæta ástandið með formeðferð. Ég ákvað að prófa að skipta hráum textagögnum í dálka af litningum í von um að útvega Spark „forskipt“ gögn.

Ég leitaði á StackOverflow að því hvernig á að skipta eftir dálkagildum og fann svo frábært svar. Með AWK geturðu skipt textaskrá eftir dálkagildum með því að skrifa hana í skriftu frekar en að senda niðurstöðurnar til stdout.

Ég skrifaði Bash handrit til að prófa það. Hlaðið niður einum af pakkuðu TSV-tækjunum og tók það síðan upp með því að nota gzip og sent til awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Það virkaði!

Að fylla kjarnana

Hvað hef ég lært: gnu parallel - það er töfrandi hlutur, allir ættu að nota það.

Aðskilnaðurinn var frekar hægur og þegar ég byrjaði htoptil að athuga notkun á öflugu (og dýru) EC2 tilviki, kom í ljós að ég notaði aðeins einn kjarna og um 200 MB af minni. Til að leysa vandann og tapa ekki miklum peningum urðum við að finna út hvernig við gætum samhliða vinnunni. Sem betur fer í alveg mögnuðu bók Gagnafræði við stjórnlínuna Ég fann kafla eftir Jeron Janssens um hliðstæðutengingu. Af því lærði ég um gnu parallel, mjög sveigjanleg aðferð til að útfæra multithreading í Unix.

Að flokka 25TB með AWK og R
Þegar ég byrjaði skiptinguna með því að nota nýja ferlið var allt í lagi, en það var samt flöskuháls - að hlaða niður S3 hlutum á diskinn var ekki mjög hratt og ekki að fullu samhliða. Til að laga þetta gerði ég þetta:

  1. Ég komst að því að það er hægt að innleiða S3 niðurhalsstigið beint í leiðslunni og útiloka algjörlega milligeymslu á disknum. Þetta þýðir að ég get forðast að skrifa hrá gögn á diskinn og notað enn minni, og þar af leiðandi ódýrari, geymslu á AWS.
  2. lið aws configure set default.s3.max_concurrent_requests 50 fjölgaði til muna fjölda þráða sem AWS CLI notar (sjálfgefið eru 10).
  3. Ég skipti yfir í EC2 tilvik sem var fínstillt fyrir nethraða, með bókstafnum n í nafninu. Ég hef komist að því að tap á vinnsluorku við notkun n-tilvika er meira en bætt upp með aukningu á hleðsluhraða. Fyrir flest verkefni notaði ég c5n.4xl.
  4. Breytt gzip á pigz, þetta er gzip tól sem getur gert flotta hluti til að samsíða upphaflega ósamhliða verkefninu að afþjappa skrár (þetta hjálpaði minnst).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Þessi skref eru sameinuð hvert við annað til að allt virki mjög hratt. Með því að auka niðurhalshraða og útrýma skrifum á disknum gæti ég nú unnið úr 5 terabæta pakka á örfáum klukkustundum.

Þetta tíst hefði átt að nefna „TSV“. Því miður.

Notkun nýgreindra gagna

Hvað hef ég lært: Spark líkar við óþjöppuð gögn og líkar ekki við að sameina skipting.

Nú voru gögnin í S3 í ópakkuðu (lesist: deilt) og hálfskipulögðu sniði og ég gat farið aftur í Spark. Óvænt beið mín: Mér tókst aftur ekki að ná því sem ég vildi! Það var mjög erfitt að segja Spark nákvæmlega hvernig gögnunum var skipt. Og jafnvel þegar ég gerði þetta, kom í ljós að það voru of mörg skipting (95 þúsund), og þegar ég notaði coalesce minnkaði fjölda þeirra niður í hæfileg mörk, þetta eyðilagði skiptinguna mína. Ég er viss um að þetta er hægt að laga, en eftir nokkra daga leit fann ég enga lausn. Ég kláraði á endanum öll verkefnin í Spark, þó það hafi tekið smá tíma og skiptu Parket skrárnar mínar voru ekki mjög litlar (~200 KB). Hins vegar voru gögnin þar sem þeirra var þörf.

Að flokka 25TB með AWK og R
Of lítil og ójöfn, yndisleg!

Prófar staðbundnar Spark fyrirspurnir

Hvað hef ég lært: Spark hefur of mikið kostnað við að leysa einföld vandamál.

Með því að hala niður gögnunum á snjöllu formi gat ég prófað hraðann. Settu upp R forskrift til að keyra staðbundinn Spark netþjón og hlaðið síðan Spark gagnaramma úr tilgreindri Parket hópgeymslu (bin). Ég reyndi að hlaða öllum gögnum en gat ekki fengið Sparklyr til að þekkja skiptinguna.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Aftakan tók 29,415 sekúndur. Miklu betra, en ekki of gott fyrir fjöldaprófun á neinu. Að auki gat ég ekki hraðað hlutunum með skyndiminni vegna þess að þegar ég reyndi að vista gagnaramma í minni, hrundi Spark alltaf, jafnvel þegar ég úthlutaði meira en 50 GB af minni til gagnasafns sem vó minna en 15.

Fara aftur í AWK

Hvað hef ég lært: Sambandsfylki í AWK eru mjög skilvirk.

Ég áttaði mig á því að ég gæti náð meiri hraða. Ég minntist þess á dásamlegan hátt AWK kennsluefni eftir Bruce Barnett Ég las um flottan eiginleika sem heitir "tengd fylki" Í meginatriðum eru þetta lykilgildi pör, sem af einhverjum ástæðum voru kölluð öðruvísi í AWK, og þess vegna hugsaði ég einhvern veginn ekki mikið um þau. Roman Cheplyaka minntist á að hugtakið „samsett fylki“ er miklu eldra en hugtakið „lykilgildi par“. Jafnvel þótt þú flettu upp lykilgildi í Google Ngram, þú munt ekki sjá þetta hugtak þar, en þú munt finna tengda fylki! Að auki er „lykilgildi parið“ oftast tengt gagnagrunnum, svo það er miklu skynsamlegra að bera það saman við hashmap. Ég áttaði mig á því að ég gæti notað þessar tengdu fylki til að tengja SNPs mína við bin töflu og hrá gögn án þess að nota Spark.

Til að gera þetta notaði ég blokkina í AWK handritinu BEGIN. Þetta er stykki af kóða sem er keyrt áður en fyrsta gagnalínan er send til meginhluta skriftunnar.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) hlaðið allar línur úr CSV hópnum (bin), stilltu fyrsta dálkinn (SNP nafn) sem lykil fyrir tengifylki bin og annað gildi (hópur) sem gildi. Síðan í blokkinni { }, sem er keyrt á öllum línum aðalskrárinnar, hver lína er send í úttaksskrána, sem fær einstakt nafn eftir hópnum (bin): ..._bin_"bin[$1]"_....

Variables batch_num и chunk_id passaði við gögnin sem leiðslan gaf, forðast keppnisástand og hvern framkvæmdarþráð í gangi parallel, skrifaði í sína eigin einstöku skrá.

Þar sem ég dreifði öllum hráum gögnum í möppur á litningum sem eftir voru af fyrri tilraun minni með AWK, gæti ég nú skrifað annað Bash forskrift til að vinna úr einum litningi í einu og sent dýpri skipt gögn til S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Handritið hefur tvo hluta parallel.

Í fyrsta hlutanum eru gögn lesin úr öllum skrám sem innihalda upplýsingar um þann litning sem óskað er eftir, síðan er þessum gögnum dreift yfir þræði sem dreifa skránum í viðeigandi hópa (bin). Til að forðast keppnisaðstæður þegar margir þræðir skrifa í sömu skrána, sendir AWK skráarnöfnin til að skrifa gögn á mismunandi staði, t.d. chr_10_bin_52_batch_2_aa.csv. Þess vegna eru margar litlar skrár búnar til á disknum (til þess notaði ég terabæta EBS bindi).

Færiband úr öðrum hluta parallel fer í gegnum hópana (bin) og sameinar einstakar skrár þeirra í algengar CSV c catog sendir þá til útflutnings.

Útsending í R?

Hvað hef ég lært: Þú getur haft samband stdin и stdout úr R handriti, og þess vegna nota það í leiðslunni.

Þú gætir hafa tekið eftir þessari línu í Bash handritinu þínu: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Það þýðir allar samtengdar hópskrár (bin) yfir í R forskriftina hér að neðan. {} er sérstök tækni parallel, sem setur öll gögn sem það sendir í tilgreindan straum beint inn í skipunina sjálfa. Valkostur {#} veitir einstakt þráðaauðkenni, og {%} táknar númer verks (endurtekið, en aldrei samtímis). Lista yfir alla valkosti er að finna í skjöl.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Þegar breyt file("stdin") send til readr::read_csv, gögnin sem þýdd eru yfir í R forskriftina eru hlaðin inn í ramma, sem er síðan í formi .rds-skrá með því að nota aws.s3 skrifað beint á S3.

RDS er eitthvað eins og yngri útgáfa af Parketi, án þess að efla hátalarageymslu.

Eftir að hafa klárað Bash handritið fékk ég búnt .rds-skrár staðsettar í S3, sem gerði mér kleift að nota skilvirka þjöppun og innbyggðar tegundir.

Þrátt fyrir notkun á bremsu R virkaði allt mjög hratt. Það kemur ekki á óvart að þeir hlutar R sem lesa og skrifa gögn eru mjög fínstilltir. Eftir prófun á einum meðalstórum litningi lauk verkinu á C5n.4xl dæmi á um tveimur klukkustundum.

S3 Takmarkanir

Hvað hef ég lært: Þökk sé snjallri útfærslu getur S3 séð um margar skrár.

Ég hafði áhyggjur af því hvort S3 myndi geta séð um margar skrár sem voru fluttar á hann. Ég gæti gert skráarnöfnin skynsamleg, en hvernig myndi S3 leita að þeim?

Að flokka 25TB með AWK og R
Möppur í S3 eru bara til sýnis, reyndar hefur kerfið ekki áhuga á tákninu /. Frá S3 FAQ síðunni.

Svo virðist sem S3 táknar slóðina að tiltekinni skrá sem einfaldan lykil í eins konar kjötkássatöflu eða gagnagrunni sem byggir á skjali. Líta má á fötu sem töflu og skrár geta talist færslur í þeirri töflu.

Þar sem hraði og skilvirkni eru mikilvæg til að græða hjá Amazon, kemur það ekki á óvart að þetta lykil-sem-skrá-slóð kerfi er ógnvekjandi bjartsýni. Ég reyndi að finna jafnvægi: svo að ég þyrfti ekki að gera mikið af get-beiðnum, en að beiðnirnar voru framkvæmdar hratt. Í ljós kom að best er að gera um 20 þúsund bin skrár. Ég held að ef við höldum áfram að hagræða getum við náð auknum hraða (til dæmis með því að búa til sérstaka fötu bara fyrir gögn og þannig minnkað uppflettitöfluna). En það var enginn tími eða peningar fyrir frekari tilraunir.

Hvað með krosssamhæfni?

Það sem ég lærði: Orsök númer eitt fyrir tímasóun er að fínstilla geymsluaðferðina þína of snemma.

Á þessum tímapunkti er mjög mikilvægt að spyrja sjálfan sig: "Af hverju að nota sérstakt skráarsnið?" Ástæðan liggur í hleðsluhraða (gzipped CSV skrár tók 7 sinnum lengri tíma að hlaða) og samhæfni við vinnuflæði okkar. Ég gæti endurskoðað hvort R geti auðveldlega hlaðið Parket (eða Arrow) skrár án Spark hleðslunnar. Allir í rannsóknarstofunni okkar nota R, og ef ég þarf að breyta gögnunum í annað snið, þá er ég enn með upprunalegu textagögnin, svo ég get bara keyrt leiðsluna aftur.

Verkaskipting

Hvað hef ég lært: Ekki reyna að fínstilla störf handvirkt, láttu tölvuna gera það.

Ég er búinn að kemba verkflæðið á einum litningi, nú þarf ég að vinna úr öllum hinum gögnunum.
Mig langaði að hækka nokkur EC2 tilvik fyrir umbreytingu, en á sama tíma var ég hræddur um að fá mjög ójafnvægið álag yfir mismunandi vinnslustörf (alveg eins og Spark þjáðist af ójafnvægi skiptinganna). Þar að auki hafði ég ekki áhuga á að hækka eitt tilvik á hvern litning, því fyrir AWS reikninga er sjálfgefið takmörk upp á 10 tilvik.

Þá ákvað ég að skrifa handrit í R til að hámarka vinnslustörf.

Fyrst bað ég S3 að reikna út hversu mikið geymslupláss hver litningur tók.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Svo skrifaði ég fall sem tekur heildarstærðina, stokkar upp röð litninganna, skiptir þeim í hópa num_jobs og segir þér hversu mismunandi stærðir eru á öllum vinnslustörfum.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Svo fór ég í gegnum þúsund uppstokkanir með purrr og valdi það besta.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Svo ég endaði með verkefnasett sem var mjög svipað að stærð. Þá var ekki annað eftir en að vefja fyrra Bash-handritið mitt inn í stóra lykkju for. Þessa hagræðingu tók um 10 mínútur að skrifa. Og þetta er miklu minna en ég myndi eyða í að búa til verkefni handvirkt ef þau væru í ójafnvægi. Þess vegna held ég að ég hafi haft rétt fyrir mér með þessa bráðabirgðahagræðingu.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Í lokin bæti ég við lokunarskipuninni:

sudo shutdown -h now

... og allt gekk upp! Með því að nota AWS CLI vakti ég upp tilvik með því að nota valkostinn user_data gaf þeim Bash handrit af verkefnum sínum til úrvinnslu. Þeir keyrðu og slökktu sjálfkrafa, þannig að ég var ekki að borga fyrir auka vinnsluorku.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Við skulum pakka!

Hvað hef ég lært: API ætti að vera einfalt til að auðvelda notkun og sveigjanleika.

Loksins fékk ég gögnin á réttan stað og form. Það eina sem var eftir var að einfalda ferlið við að nota gögn eins mikið og hægt er til að auðvelda samstarfsfólki mínu. Mig langaði að búa til einfalt API til að búa til beiðnir. Ef ég ákveð í framtíðinni að skipta frá .rds til Parket skrár, þá ætti þetta að vera vandamál fyrir mig, ekki fyrir samstarfsfólk mitt. Fyrir þetta ákvað ég að gera innri R pakka.

Búðu til og skjalfestu mjög einfaldan pakka sem inniheldur aðeins nokkrar gagnaaðgangsaðgerðir skipulagðar í kringum aðgerð get_snp. Ég gerði líka vefsíðu fyrir samstarfsfólk mitt pakka niður, svo þeir geti auðveldlega séð dæmi og skjöl.

Að flokka 25TB með AWK og R

Snjall skyndiminni

Hvað hef ég lært: Ef gögnin þín eru vel undirbúin, verður skyndiminni auðvelt!

Þar sem eitt helsta verkflæðið notaði sama greiningarlíkanið á SNP pakkann ákvað ég að nota binning mér til framdráttar. Þegar gögn eru send í gegnum SNP eru allar upplýsingar úr hópnum (bin) festar við hlutinn sem skilað er. Það er að segja að gamlar fyrirspurnir geta (fræðilega séð) flýtt fyrir úrvinnslu nýrra fyrirspurna.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Þegar ég smíðaði pakkann rak ég mörg viðmið til að bera saman hraða þegar ég notaði mismunandi aðferðir. Ég mæli með því að vanrækja þetta ekki, því stundum eru niðurstöðurnar óvæntar. Til dæmis, dplyr::filter var miklu hraðari en að fanga raðir með því að nota flokkun sem byggir á síun, og að sækja einn dálk úr síuðum gagnaramma var mun hraðari en að nota setningafræði flokkunar.

Vinsamlegast athugaðu að hluturinn prev_snp_results inniheldur lykilinn snps_in_bin. Þetta er fylki allra einstaka SNPs í hópi (bin), sem gerir þér kleift að athuga fljótt hvort þú hafir nú þegar gögn frá fyrri fyrirspurn. Það gerir það einnig auðvelt að hringja í gegnum öll SNP í hópi (bin) með þessum kóða:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Niðurstöður

Nú getum við (og erum farin að af alvöru) keyrt líkön og sviðsmyndir sem áður voru óaðgengilegar fyrir okkur. Það besta er að samstarfsmenn mínir á rannsóknarstofu þurfa ekki að hugsa um neina fylgikvilla. Þeir hafa bara aðgerð sem virkar.

Og þó að pakkinn hlífi þeim við smáatriðum, reyndi ég að gera gagnasniðið nógu einfalt til að þeir gætu fundið það út ef ég myndi skyndilega hverfa á morgun...

Hraðinn hefur aukist verulega. Við skönnum venjulega virkni mikilvæga erfðamengisbrota. Áður gátum við ekki gert þetta (það reyndist vera of dýrt), en núna, þökk sé hópuppbyggingunni og skyndiminni, tekur beiðni um einn SNP að meðaltali innan við 0,1 sekúndu og gagnanotkunin er svo mikil. lágt að kostnaður fyrir S3 sé jarðhnetur.

Ályktun

Þessi grein er alls ekki leiðarvísir. Lausnin reyndist einstaklingsbundin og nær örugglega ekki ákjósanleg. Frekar er þetta ferðasaga. Ég vil að aðrir skilji að slíkar ákvarðanir virðast ekki fullmótaðar í hausnum, þær eru afleiðing af tilraunum og mistökum. Einnig, ef þú ert að leita að gagnafræðingi, hafðu í huga að notkun þessara verkfæra krefst í raun reynslu og reynsla kostar peninga. Ég er ánægður með að hafa haft burði til að borga, en margir aðrir sem geta sinnt sama starfi betur en ég munu aldrei fá tækifæri vegna peningaleysis til að reyna jafnvel.

Stór gagnaverkfæri eru fjölhæf. Ef þú hefur tíma geturðu næstum örugglega skrifað hraðari lausn með því að nota snjalla gagnahreinsun, geymslu og útdráttartækni. Að lokum kemur það niður á kostnaðar-ábatagreiningu.

Það sem ég lærði:

  • það er engin ódýr leið til að flokka 25 TB í einu;
  • vertu varkár með stærð Parket skrárnar þínar og skipulag þeirra;
  • Skilrúm í Spark verða að vera í jafnvægi;
  • Almennt, aldrei reyna að gera 2,5 milljón skipting;
  • Flokkun er enn erfið, sem og uppsetning Spark;
  • stundum krefjast sérstakra gagna sérlausna;
  • Neistasöfnun er hröð, en skipting er samt dýr;
  • ekki sofa þegar þeir kenna þér grunnatriðin, einhver hefur líklega þegar leyst vandamálið þitt aftur á níunda áratugnum;
  • gnu parallel - þetta er töfrandi hlutur, allir ættu að nota það;
  • Spark líkar við óþjöppuð gögn og líkar ekki við að sameina skipting;
  • Spark hefur of mikið kostnað við að leysa einföld vandamál;
  • Sambandsfylki AWK eru mjög skilvirk;
  • þú getur haft samband stdin и stdout úr R handriti, og þess vegna nota það í leiðslunni;
  • Þökk sé snjallslóðaútfærslu getur S3 unnið úr mörgum skrám;
  • Aðalástæðan fyrir því að sóa tíma er ótímabært að fínstilla geymsluaðferðina þína;
  • ekki reyna að fínstilla verkefni handvirkt, láttu tölvuna gera það;
  • API ætti að vera einfalt til að auðvelda og sveigjanleika í notkun;
  • Ef gögnin þín eru vel undirbúin verður skyndiminni auðvelt!

Heimild: www.habr.com

Bæta við athugasemd