Dosrannu 25TB gan ddefnyddio AWK ac R

Dosrannu 25TB gan ddefnyddio AWK ac R
Sut i ddarllen yr erthygl hon: Ymddiheuraf am fod y testun mor hir ac anhrefnus. Er mwyn arbed amser i chi, rydw i'n dechrau pob pennod gyda chyflwyniad “Beth Ddysgais i”, sy'n crynhoi hanfod y bennod mewn brawddeg neu ddwy.

“Dangoswch yr ateb i mi!” Os ydych chi eisiau gweld o ble y des i, yna ewch i'r bennod “Dod yn Fwy Dyfeisgar,” ond rwy'n meddwl ei bod hi'n fwy diddorol a defnyddiol darllen am fethiant.

Yn ddiweddar cefais y dasg o sefydlu proses ar gyfer prosesu nifer fawr o ddilyniannau DNA amrwd (sglodyn SNP yn dechnegol). Yr angen oedd cael data yn gyflym am leoliad genetig penodol (a elwir yn SNP) ar gyfer gwaith modelu dilynol a thasgau eraill. Gan ddefnyddio R ac AWK, roeddwn yn gallu glanhau a threfnu data mewn ffordd naturiol, gan gyflymu prosesu ymholiadau yn fawr. Nid oedd hyn yn hawdd i mi ac roedd angen nifer o iteriadau. Bydd yr erthygl hon yn eich helpu i osgoi rhai o'm camgymeriadau ac yn dangos i chi beth wnes i yn y pen draw.

Yn gyntaf, rhai esboniadau rhagarweiniol.

Data

Darparodd canolfan prosesu gwybodaeth genetig ein prifysgol ddata i ni ar ffurf TSV 25 TB. Derbyniais nhw wedi'u rhannu'n 5 pecyn, wedi'u cywasgu gan Gzip, pob un ohonynt yn cynnwys tua 240 o ffeiliau pedwar-gigabeit. Roedd pob rhes yn cynnwys data ar gyfer un PCE gan un unigolyn. Yn gyfan gwbl, trosglwyddwyd data ar ~2,5 miliwn o SNPs a ~60 mil o bobl. Yn ogystal â gwybodaeth SNP, roedd y ffeiliau'n cynnwys nifer o golofnau gyda rhifau'n adlewyrchu nodweddion amrywiol, megis dwyster darllen, amlder gwahanol alelau, ac ati. Roedd cyfanswm o tua 30 o golofnau gyda gwerthoedd unigryw.

Nod

Fel gydag unrhyw brosiect rheoli data, y peth pwysicaf oedd penderfynu sut y byddai'r data'n cael ei ddefnyddio. Yn yr achos hwn byddwn yn bennaf yn dewis modelau a llifoedd gwaith ar gyfer PCE yn seiliedig ar SNP. Hynny yw, dim ond data ar un PCE ar y tro y bydd ei angen arnom. Roedd yn rhaid i mi ddysgu sut i adalw'r holl gofnodion sy'n gysylltiedig ag un o'r 2,5 miliwn o SNPs mor hawdd, cyflym a rhad â phosibl.

Sut i beidio â gwneud hyn

I ddyfynnu cliché addas:

Wnes i ddim methu fil o weithiau, dwi newydd ddarganfod mil o ffyrdd i osgoi dosrannu criw o ddata mewn fformat sy'n gyfeillgar i ymholiad.

Ceisiwch yn gyntaf

Beth rydw i wedi'i ddysgu: Nid oes ffordd rad i ddosrannu 25 TB ar y tro.

Ar ôl dilyn y cwrs “Dulliau Uwch ar gyfer Prosesu Data Mawr” ym Mhrifysgol Vanderbilt, roeddwn yn siŵr bod y tric yn y bag. Mae'n debyg y bydd yn cymryd awr neu ddwy i sefydlu'r gweinydd Hive i redeg trwy'r holl ddata ac adrodd ar y canlyniad. Gan fod ein data yn cael ei storio yn AWS S3, defnyddiais y gwasanaeth Athena, sy'n eich galluogi i gymhwyso ymholiadau Hive SQL i ddata S3. Nid oes angen i chi sefydlu/codi clwstwr Hive, ac rydych chi hefyd yn talu am y data rydych chi'n chwilio amdano yn unig.

Ar ôl i mi ddangos fy nata i Athena a'i fformat, cynhaliais rai profion gydag ymholiadau fel hyn:

select * from intensityData limit 10;

A chafwyd canlyniadau strwythuredig yn gyflym. Yn barod.

Hyd nes i ni geisio defnyddio'r data yn ein gwaith...

Gofynnwyd i mi dynnu'r holl wybodaeth SNP allan i brofi'r model. Cynhaliais yr ymholiad:


select * from intensityData 
where snp = 'rs123456';

...a dechreuodd aros. Ar ôl wyth munud a mwy na 4 TB o ddata y gofynnwyd amdano, cefais y canlyniad. Costau Athena yn ôl swm y data a ganfuwyd, $5 y terabyte. Felly costiodd y cais sengl hwn $20 ac wyth munud o aros. I redeg y model ar yr holl ddata, bu'n rhaid aros 38 mlynedd a thalu $50 miliwn.Yn amlwg, nid oedd hyn yn addas i ni.

Roedd angen defnyddio Parquet...

Beth rydw i wedi'i ddysgu: Byddwch yn ofalus gyda maint eich ffeiliau Parquet a'u sefydliad.

Ceisiais drwsio'r sefyllfa yn gyntaf trwy drosi pob TSV i Ffeiliau parquet. Maent yn gyfleus ar gyfer gweithio gyda setiau data mawr oherwydd bod y wybodaeth ynddynt yn cael ei storio ar ffurf golofn: mae pob colofn yn gorwedd yn ei segment cof / disg ei hun, yn wahanol i ffeiliau testun, lle mae rhesi yn cynnwys elfennau o bob colofn. Ac os oes angen i chi ddod o hyd i rywbeth, yna darllenwch y golofn ofynnol. Yn ogystal, mae pob ffeil yn storio ystod o werthoedd mewn colofn, felly os nad yw'r gwerth yr ydych yn chwilio amdano yn ystod y golofn, ni fydd Spark yn gwastraffu amser yn sganio'r ffeil gyfan.

Cynhaliais dasg syml Glud AWS i drosi ein TSVs i Parquet a gollwng y ffeiliau newydd i Athena. Cymerodd tua 5 awr. Ond pan redais y cais, fe gymerodd tua'r un faint o amser ac ychydig yn llai o arian i'w gwblhau. Y ffaith yw bod Spark, wrth geisio gwneud y gorau o'r dasg, wedi dadbacio un darn TSV a'i roi yn ei dalp Parquet ei hun. Ac oherwydd bod pob talp yn ddigon mawr i gynnwys holl gofnodion llawer o bobl, roedd pob ffeil yn cynnwys yr holl SNPs, felly bu'n rhaid i Spark agor yr holl ffeiliau i dynnu'r wybodaeth yr oedd ei hangen.

Yn ddiddorol, nid yw math cywasgu diofyn (ac a argymhellir) Parquet, bachog, yn hollti. Felly, roedd pob ysgutor yn gaeth i'r dasg o ddadbacio a lawrlwytho'r set ddata lawn 3,5 GB.

Dosrannu 25TB gan ddefnyddio AWK ac R

Gadewch i ni ddeall y broblem

Beth rydw i wedi'i ddysgu: Mae didoli yn anodd, yn enwedig os dosberthir y data.

Roedd yn ymddangos i mi fy mod bellach yn deall hanfod y broblem. Dim ond yn ôl colofn SNP yr oedd angen i mi ddidoli'r data, nid fesul pobl. Yna bydd sawl SNP yn cael eu storio mewn talp data ar wahân, ac yna bydd swyddogaeth “smart” Parquet “ar agor dim ond os yw'r gwerth yn yr ystod” yn dangos ei hun yn ei holl ogoniant. Yn anffodus, roedd didoli trwy biliynau o resi wedi'u gwasgaru ar draws clwstwr yn dasg anodd.

Yn bendant nid yw AWS am roi ad-daliad oherwydd y rheswm "Rwy'n fyfyriwr sy'n tynnu sylw". Ar ôl i mi redeg didoli ar Amazon Glue, mae'n rhedeg am 2 ddiwrnod a damwain.

Beth am rannu?

Beth rydw i wedi'i ddysgu: Rhaid cydbwyso rhaniadau yn Spark.

Yna deuthum i fyny gyda'r syniad o rannu data mewn cromosomau. Mae yna 23 ohonyn nhw (a sawl un arall os ydych chi'n ystyried DNA mitocondriaidd a rhanbarthau heb eu mapio).
Bydd hyn yn caniatáu ichi rannu'r data yn ddarnau llai. Os ychwanegwch un llinell yn unig at swyddogaeth allforio Spark yn y sgript Gludwch partition_by = "chr", yna dylid rhannu'r data yn fwcedi.

Dosrannu 25TB gan ddefnyddio AWK ac R
Mae'r genom yn cynnwys nifer o ddarnau o'r enw cromosomau.

Yn anffodus, ni weithiodd. Mae gan gromosomau feintiau gwahanol, sy'n golygu symiau gwahanol o wybodaeth. Mae hyn yn golygu nad oedd y tasgau a anfonodd Spark at weithwyr yn gytbwys ac yn cael eu cwblhau'n araf oherwydd bod rhai nodau'n gorffen yn gynnar ac yn segur. Fodd bynnag, cwblhawyd y tasgau. Ond wrth ofyn am un SNP, fe achosodd yr anghydbwysedd broblemau eto. Mae cost prosesu SNPs ar gromosomau mwy (hynny yw, lle rydym am gael data) wedi gostwng tua ffactor o 10 yn unig. Llawer, ond dim digon.

Beth os ydym yn ei rannu'n rhannau hyd yn oed yn llai?

Beth rydw i wedi'i ddysgu: Peidiwch byth â cheisio gwneud 2,5 miliwn o barwydydd o gwbl.

Penderfynais fynd i gyd allan a rhannu pob SNP. Sicrhaodd hyn fod y rhaniadau o faint cyfartal. SYNIAD DRWG OEDD. Defnyddiais Glud ac ychwanegu llinell ddiniwed partition_by = 'snp'. Dechreuodd y dasg a dechreuodd ei chyflawni. Ddiwrnod yn ddiweddarach gwiriais a gwelais nad oedd unrhyw beth wedi'i ysgrifennu at S3 o hyd, felly lladdais y dasg. Mae'n edrych fel bod Glue yn ysgrifennu ffeiliau canolradd i leoliad cudd yn S3, llawer o ffeiliau, efallai cwpl o filiwn. O ganlyniad, costiodd fy nghamgymeriad fwy na mil o ddoleri ac nid oedd yn plesio fy mentor.

Rhaniad + didoli

Beth rydw i wedi'i ddysgu: Mae didoli yn dal yn anodd, fel y mae tiwnio Spark.

Roedd fy ymgais ddiwethaf i rannu'n cynnwys rhannu'r cromosomau ac yna didoli pob rhaniad. Mewn egwyddor, byddai hyn yn cyflymu pob ymholiad oherwydd bod yn rhaid i ddata dymunol yr SNP fod o fewn ychydig dalpiau Parquet o fewn ystod benodol. Yn anffodus, roedd didoli data rhanedig hyd yn oed yn dasg anodd. O ganlyniad, newidiais i EMR ar gyfer clwstwr arfer a defnyddio wyth achos pwerus (C5.4xl) a Sparklyr i greu llif gwaith mwy hyblyg ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...fodd bynnag, nid oedd y dasg wedi'i chwblhau o hyd. Fe wnes i ei ffurfweddu mewn gwahanol ffyrdd: cynyddu'r dyraniad cof ar gyfer pob ysgutor ymholiad, defnyddio nodau gyda llawer iawn o gof, defnyddio newidynnau darlledu (newidynnau darlledu), ond bob tro roedd y rhain yn troi allan i fod yn hanner mesurau, ac yn raddol dechreuodd yr ysgutorion i fethu nes stopio popeth.

Rwy'n dod yn fwy creadigol

Beth rydw i wedi'i ddysgu: Weithiau mae angen atebion arbennig ar ddata arbennig.

Mae gan bob PCE werth safle. Dyma rif sy'n cyfateb i nifer y basau ar hyd ei gromosom. Mae hon yn ffordd braf a naturiol o drefnu ein data. Ar y dechrau roeddwn i eisiau rhannu yn ôl rhanbarthau pob cromosom. Er enghraifft, swyddi 1 - 2000, 2001 - 4000, ac ati. Ond y broblem yw nad yw SNPs wedi'u dosbarthu'n gyfartal ar draws y cromosomau, felly bydd maint y grwpiau yn amrywio'n fawr.

Dosrannu 25TB gan ddefnyddio AWK ac R

O ganlyniad, deuthum i ddadansoddiad o safleoedd yn gategorïau (rheng). Gan ddefnyddio'r data a lawrlwythwyd eisoes, rhedais gais i gael rhestr o SNPs unigryw, eu safleoedd a chromosomau. Yna fe wnes i ddidoli'r data o fewn pob cromosom a chasglu SNPs yn grwpiau (bin) o faint penodol. Gadewch i ni ddweud 1000 o SNPs yr un. Rhoddodd hyn berthynas yr SNP-i-grŵp-fesul-cromosom i mi.

Yn y diwedd, fe wnes i grwpiau (bin) o 75 PCE, bydd y rheswm yn cael ei esbonio isod.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Ceisiwch gyntaf gyda Spark

Beth rydw i wedi'i ddysgu: Mae agregu gwreichionen yn gyflym, ond mae rhaniad yn dal yn ddrud.

Roeddwn i eisiau darllen y ffrâm ddata fach hon (2,5 miliwn o resi) i Spark, ei chyfuno â'r data crai, ac yna ei rannu â'r golofn sydd newydd ei hychwanegu bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Defnyddiais i sdf_broadcast(), felly mae Spark yn gwybod y dylai anfon y ffrâm ddata i bob nod. Mae hyn yn ddefnyddiol os yw'r data'n fach o ran maint ac yn ofynnol ar gyfer pob tasg. Fel arall, mae Spark yn ceisio bod yn graff ac yn dosbarthu data yn ôl yr angen, a all achosi arafu.

Ac eto, nid oedd fy syniad yn gweithio: bu'r tasgau'n gweithio ers peth amser, cwblhawyd yr undeb, ac yna, fel yr ysgutorion a lansiwyd trwy rannu, dechreuon nhw fethu.

Ychwanegu AWK

Beth rydw i wedi'i ddysgu: Peidiwch â chysgu pan ddysgir y pethau sylfaenol i chi. Siawns bod rhywun eisoes wedi datrys eich problem yn ôl yn yr 1980au.

Hyd at y pwynt hwn, y rheswm dros fy holl fethiannau gyda Spark oedd y sborion data yn y clwstwr. Efallai y gellir gwella'r sefyllfa gyda chyn-driniaeth. Penderfynais geisio rhannu’r data testun crai yn golofnau o gromosomau, felly roeddwn yn gobeithio darparu data “rhag-rhanedig” i Spark.

Chwiliais ar StackOverflow am sut i rannu yn ôl gwerthoedd colofn a chanfod ateb mor wych. Gyda AWK gallwch rannu ffeil testun yn ôl gwerthoedd colofn trwy ei ysgrifennu mewn sgript yn hytrach nag anfon y canlyniadau i stdout.

Ysgrifennais sgript Bash i roi cynnig arni. Wedi lawrlwytho un o'r TSVs wedi'u pecynnu, yna ei ddadbacio gan ddefnyddio gzip ac anfonwyd i awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Fe weithiodd!

Llenwi'r creiddiau

Beth rydw i wedi'i ddysgu: gnu parallel - mae'n beth hudolus, dylai pawb ei ddefnyddio.

Roedd y gwahaniad yn eithaf araf a phan ddechreuais i htopi wirio'r defnydd o enghraifft EC2 pwerus (a drud), daeth i'r amlwg fy mod yn defnyddio un craidd yn unig a thua 200 MB o gof. Er mwyn datrys y broblem a pheidio â cholli llawer o arian, roedd yn rhaid i ni ddarganfod sut i gyfochrog â'r gwaith. Yn ffodus, mewn llyfr hollol anhygoel Gwyddor Data ar y Llinell Reoli Deuthum o hyd i bennod gan Jeron Janssens ar gyfochrog. Oddi yno dysgais i amdano gnu parallel, dull hyblyg iawn ar gyfer gweithredu multithreading yn Unix.

Dosrannu 25TB gan ddefnyddio AWK ac R
Pan ddechreuais y rhaniad gan ddefnyddio'r broses newydd, roedd popeth yn iawn, ond roedd yna dagfa o hyd - nid oedd lawrlwytho gwrthrychau S3 i ddisg yn gyflym iawn ac nid oedd yn gwbl gyfochrog. I drwsio hyn, fe wnes i hyn:

  1. Canfûm ei bod hi'n bosibl gweithredu'r cam lawrlwytho S3 yn uniongyrchol ar y gweill, gan ddileu storio canolradd ar ddisg yn llwyr. Mae hyn yn golygu y gallaf osgoi ysgrifennu data crai i ddisg a defnyddio storfa hyd yn oed yn llai, ac felly'n rhatach, ar AWS.
  2. Tîm aws configure set default.s3.max_concurrent_requests 50 cynyddu'n sylweddol nifer yr edafedd y mae AWS CLI yn eu defnyddio (yn ddiofyn mae 10).
  3. Newidiais i enghraifft EC2 wedi'i optimeiddio ar gyfer cyflymder rhwydwaith, gyda'r llythyren n yn yr enw. Rwyf wedi canfod bod colli pŵer prosesu wrth ddefnyddio n-achosion yn fwy na digolledu gan y cynnydd mewn cyflymder llwytho. Ar gyfer y rhan fwyaf o dasgau defnyddiais c5n.4xl.
  4. Wedi newid gzip ar pigz, mae hwn yn declyn gzip sy'n gallu gwneud pethau cŵl i gyfochrog â'r dasg heb ei chyfateb i ddechrau o ddatgywasgu ffeiliau (hyn helpodd y lleiaf).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Mae'r camau hyn yn cael eu cyfuno â'i gilydd i wneud i bopeth weithio'n gyflym iawn. Trwy gynyddu cyflymder llwytho i lawr a dileu ysgrifennu disg, gallwn nawr brosesu pecyn 5 terabyte mewn ychydig oriau yn unig.

Dylai'r trydariad hwn fod wedi sôn am 'TSV'. Ysywaeth.

Defnyddio data sydd newydd ei ddosrannu

Beth rydw i wedi'i ddysgu: Mae Spark yn hoffi data heb ei gywasgu ac nid yw'n hoffi cyfuno rhaniadau.

Nawr roedd y data yn S3 mewn fformat heb ei bacio (darllen: rhannu) a lled-archeb, a gallwn ddychwelyd i Spark eto. Roedd syrpreis yn fy aros: methais eto â chyflawni'r hyn yr oeddwn ei eisiau! Roedd yn anodd iawn dweud yn union sut y rhannwyd y data gan Spark. A hyd yn oed pan wnes i hyn, mae'n troi allan bod yna ormod o raniadau (95 mil), a phan fyddaf yn defnyddio coalesce lleihau eu nifer i derfynau rhesymol, dinistriodd hyn fy rhaniad. Rwy'n siŵr y gellir trwsio hyn, ond ar ôl ychydig ddyddiau o chwilio, ni allwn ddod o hyd i ateb. Yn y pen draw, fe wnes i orffen yr holl dasgau yn Spark, er iddo gymryd peth amser ac nid oedd fy ffeiliau Parquet hollt yn fach iawn (~200 KB). Fodd bynnag, roedd angen y data.

Dosrannu 25TB gan ddefnyddio AWK ac R
Rhy fach ac anwastad, gwych!

Profi ymholiadau lleol Spark

Beth rydw i wedi'i ddysgu: Mae gan Spark ormod o orbenion wrth ddatrys problemau syml.

Trwy lawrlwytho'r data mewn fformat clyfar, roeddwn i'n gallu profi'r cyflymder. Sefydlu sgript R i redeg gweinydd Spark lleol, ac yna llwytho ffrâm ddata Spark o'r storfa grŵp Parquet penodedig (bin). Ceisiais lwytho'r holl ddata ond ni allwn gael Sparklyr i adnabod y rhaniad.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Cymerodd y dienyddiad 29,415 eiliad. Llawer gwell, ond ddim yn rhy dda ar gyfer profi màs o unrhyw beth. Yn ogystal, ni allwn gyflymu pethau gyda caching oherwydd pan geisiais storio ffrâm ddata yn y cof, roedd Spark bob amser yn damwain, hyd yn oed pan ddyrannais fwy na 50 GB o gof i set ddata a oedd yn pwyso llai na 15.

Dychwelyd i AWK

Beth rydw i wedi'i ddysgu: Mae araeau cysylltiadol yn AWK yn effeithlon iawn.

Sylweddolais y gallwn gyflawni cyflymderau uwch. Cofiais hynny mewn hyfryd Tiwtorial AWK gan Bruce Barnett Darllenais am nodwedd cŵl o'r enw “araeau cysylltiadol" Yn y bôn, mae'r rhain yn barau gwerth allweddol, a oedd am ryw reswm yn cael eu galw'n wahanol yn AWK, ac felly rywsut ni wnes i feddwl llawer amdanyn nhw. Cheplyaka Rhufeinig cofio bod y term “araeau cysylltiadol” yn llawer hŷn na'r term “pâr gwerth allweddol”. Hyd yn oed os ydych chi chwiliwch am y gwerth allweddol yn Google Ngram, ni welwch y term hwn yno, ond fe welwch araeau cysylltiadol! Yn ogystal, mae'r “pâr gwerth allweddol” yn cael ei gysylltu amlaf â chronfeydd data, felly mae'n gwneud llawer mwy o synnwyr i'w gymharu â hashmap. Sylweddolais y gallwn ddefnyddio'r araeau cysylltiadol hyn i gysylltu fy SNPs â bwrdd biniau a data crai heb ddefnyddio Spark.

I wneud hyn, yn y sgript AWK defnyddiais y bloc BEGIN. Dyma ddarn o god sy'n cael ei weithredu cyn i'r llinell gyntaf o ddata gael ei throsglwyddo i brif gorff y sgript.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Tîm while(getline...) llwytho pob rhes o'r grŵp CSV (bin), gosod y golofn gyntaf (enw SNP) fel yr allwedd ar gyfer yr arae cysylltiadol bin a'r ail werth (grŵp) fel y gwerth. Yna yn y bloc { }, sy'n cael ei weithredu ar bob llinell o'r brif ffeil, anfonir pob llinell i'r ffeil allbwn, sy'n derbyn enw unigryw yn dibynnu ar ei grŵp (bin): ..._bin_"bin[$1]"_....

Newidynnau batch_num и chunk_id yn cyfateb i'r data a ddarparwyd gan y biblinell, gan osgoi cyflwr hil, a phob edefyn gweithredu yn rhedeg parallel, ysgrifennodd at ei ffeil unigryw ei hun.

Ers i mi wasgaru'r holl ddata crai i ffolderi ar gromosomau dros ben o fy arbrawf blaenorol gydag AWK, nawr gallwn i ysgrifennu sgript Bash arall i brosesu un cromosom ar y tro ac anfon data rhaniad dyfnach i S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Mae dwy adran i'r sgript parallel.

Yn yr adran gyntaf, darllenir data o bob ffeil sy'n cynnwys gwybodaeth am y cromosom a ddymunir, yna dosberthir y data hwn ar draws edafedd, sy'n dosbarthu'r ffeiliau i'r grwpiau priodol (bin). Er mwyn osgoi amodau hil pan fydd edafedd lluosog yn ysgrifennu i'r un ffeil, mae AWK yn pasio enwau'r ffeiliau i ysgrifennu data i wahanol leoedd, e.e. chr_10_bin_52_batch_2_aa.csv. O ganlyniad, mae llawer o ffeiliau bach yn cael eu creu ar y ddisg (ar gyfer hyn defnyddiais gyfrolau terabyte EBS).

Cludwr o'r ail adran parallel yn mynd drwy'r grwpiau (bin) ac yn cyfuno eu ffeiliau unigol yn CSV c catac yna'n eu hanfon i'w hallforio.

Darlledu yn R?

Beth rydw i wedi'i ddysgu: Gallwch gysylltu stdin и stdout o sgript R, ac felly defnyddiwch hi ar y gweill.

Efallai eich bod wedi sylwi ar y llinell hon yn eich sgript Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Mae'n trosi'r holl ffeiliau grŵp concatenated (bin) i'r sgript R isod. {} yn dechneg arbennig parallel, sy'n mewnosod unrhyw ddata y mae'n ei anfon i'r ffrwd benodedig yn uniongyrchol i'r gorchymyn ei hun. Opsiwn {#} yn darparu ID edau unigryw, a {%} cynrychioli rhif slot y swydd (ailadrodd, ond byth ar yr un pryd). Mae rhestr o'r holl opsiynau i'w gweld yn dogfennaeth.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Pan fydd newidyn file("stdin") trosglwyddo i readr::read_csv, mae'r data a gyfieithir i'r sgript R yn cael ei lwytho i mewn i ffrâm, sydd wedyn yn y ffurf .rds-ffeil gan ddefnyddio aws.s3 wedi'i ysgrifennu'n uniongyrchol i S3.

Mae RDS yn rhywbeth fel fersiwn iau o Parquet, heb ffrils storio siaradwr.

Ar ôl gorffen y sgript Bash ges i bwndel .rds-ffeiliau wedi'u lleoli yn S3, a oedd yn caniatáu i mi ddefnyddio cywasgu effeithlon a mathau adeiledig.

Er gwaethaf y defnydd o brêc R, gweithiodd popeth yn gyflym iawn. Nid yw'n syndod bod y rhannau o R sy'n darllen ac yn ysgrifennu data wedi'u optimeiddio'n fawr. Ar ôl profi ar un cromosom maint canolig, cwblhawyd y gwaith ar enghraifft C5n.4xl mewn tua dwy awr.

S3 Cyfyngiadau

Beth rydw i wedi'i ddysgu: Diolch i weithredu llwybr smart, gall S3 drin llawer o ffeiliau.

Roeddwn yn poeni a fyddai S3 yn gallu trin y ffeiliau niferus a drosglwyddwyd iddo. Gallwn i wneud yr enwau ffeil yn gwneud synnwyr, ond sut fyddai S3 yn edrych amdanynt?

Dosrannu 25TB gan ddefnyddio AWK ac R
Mae ffolderi yn S3 i'w dangos yn unig, mewn gwirionedd nid oes gan y system ddiddordeb yn y symbol /. O dudalen Cwestiynau Cyffredin S3.

Mae'n ymddangos bod S3 yn cynrychioli'r llwybr i ffeil benodol fel allwedd syml mewn math o dabl hash neu gronfa ddata sy'n seiliedig ar ddogfen. Gellir meddwl am fwced fel bwrdd, a gellir ystyried ffeiliau yn gofnodion yn y tabl hwnnw.

Gan fod cyflymder ac effeithlonrwydd yn bwysig i wneud elw yn Amazon, nid yw'n syndod bod y system llwybr allwedd-fel-ffeil hon wedi'i optimeiddio'n freaking. Ceisiais ddod o hyd i gydbwysedd: fel nad oedd yn rhaid i mi wneud llawer o geisiadau, ond bod y ceisiadau'n cael eu gweithredu'n gyflym. Mae'n troi allan ei bod yn well gwneud tua 20 mil o ffeiliau bin. Rwy'n credu, os byddwn yn parhau i wneud y gorau, y gallwn gyflawni cynnydd mewn cyflymder (er enghraifft, gwneud bwced arbennig ar gyfer data yn unig, gan leihau maint y tabl chwilio). Ond nid oedd amser nac arian ar gyfer arbrofion pellach.

Beth am draws-gydnawsedd?

Yr hyn a Ddysgais: Prif achos gwastraffu amser yw optimeiddio'ch dull storio yn gynamserol.

Ar y pwynt hwn, mae'n bwysig iawn gofyn i chi'ch hun: "Pam defnyddio fformat ffeil perchnogol?" Y rheswm yw cyflymder llwytho (cymerodd ffeiliau CSV gzipped 7 gwaith yn hirach i'w llwytho) a chydnawsedd â'n llifoedd gwaith. Efallai y byddaf yn ailystyried a all R lwytho ffeiliau Parquet (neu Arrow) yn hawdd heb y llwyth Spark. Mae pawb yn ein labordy yn defnyddio R, ac os oes angen i mi drosi'r data i fformat arall, mae'r data testun gwreiddiol gennyf o hyd, felly gallaf redeg y biblinell eto.

Rhannu gwaith

Beth rydw i wedi'i ddysgu: Peidiwch â cheisio gwneud y gorau o swyddi â llaw, gadewch i'r cyfrifiadur ei wneud.

Rwyf wedi dadfygio'r llif gwaith ar un cromosom, nawr mae angen i mi brosesu'r holl ddata arall.
Roeddwn i eisiau codi sawl achos EC2 ar gyfer trosi, ond ar yr un pryd roeddwn yn ofni cael llwyth anghytbwys iawn ar draws gwahanol swyddi prosesu (yn union fel yr oedd Spark yn dioddef o raniadau anghytbwys). Yn ogystal, nid oedd gennyf ddiddordeb mewn codi un enghraifft fesul cromosom, oherwydd ar gyfer cyfrifon AWS mae terfyn rhagosodedig o 10 achos.

Yna penderfynais ysgrifennu sgript yn R i wneud y gorau o swyddi prosesu.

Yn gyntaf, gofynnais i S3 gyfrifo faint o le storio yr oedd pob cromosom yn ei feddiannu.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Yna ysgrifennais swyddogaeth sy'n cymryd y maint cyfan, yn cymysgu trefn y cromosomau, yn eu rhannu'n grwpiau num_jobs ac yn dweud wrthych pa mor wahanol yw maint yr holl swyddi prosesu.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Yna rhedais trwy fil o sifftiau gan ddefnyddio purrr a dewis y gorau.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Felly, yn y pen draw, cefais set o dasgau a oedd yn debyg iawn o ran maint. Yna y cyfan oedd ar ôl oedd lapio fy sgript Bash flaenorol mewn dolen fawr for. Cymerodd yr optimeiddio hwn tua 10 munud i'w ysgrifennu. Ac mae hyn yn llawer llai nag y byddwn yn ei wario ar greu tasgau â llaw pe baent yn anghytbwys. Felly, credaf fy mod yn iawn gyda'r optimeiddio rhagarweiniol hwn.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Ar y diwedd, rwy'n ychwanegu'r gorchymyn cau i lawr:

sudo shutdown -h now

... a phopeth wedi gweithio allan! Gan ddefnyddio CLI AWS, codais enghreifftiau gan ddefnyddio'r opsiwn user_data rhoddodd sgriptiau Bash iddynt o'u tasgau i'w prosesu. Roeddent yn rhedeg ac yn cau i lawr yn awtomatig, felly nid oeddwn yn talu am bŵer prosesu ychwanegol.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Gadewch i ni bacio!

Beth rydw i wedi'i ddysgu: Dylai'r API fod yn syml er mwyn rhwyddineb a hyblygrwydd defnydd.

Yn olaf, cefais y data yn y lle a'r ffurf gywir. Y cyfan oedd ar ôl oedd symleiddio’r broses o ddefnyddio data cymaint â phosibl i’w gwneud yn haws i’m cydweithwyr. Roeddwn i eisiau gwneud API syml ar gyfer creu ceisiadau. Os yn y dyfodol byddaf yn penderfynu newid o .rds i ffeiliau Parquet, yna dylai hyn fod yn broblem i mi, nid i fy nghydweithwyr. Ar gyfer hyn penderfynais wneud pecyn R mewnol.

Adeiladu a dogfennu pecyn syml iawn sy'n cynnwys dim ond ychydig o swyddogaethau mynediad data wedi'u trefnu o amgylch swyddogaeth get_snp. Gwneuthum hefyd wefan ar gyfer fy nghydweithwyr pkgdown, fel y gallant weld enghreifftiau a dogfennaeth yn hawdd.

Dosrannu 25TB gan ddefnyddio AWK ac R

caching clyfar

Beth rydw i wedi'i ddysgu: Os yw'ch data wedi'i baratoi'n dda, bydd caching yn hawdd!

Gan fod un o'r prif lifau gwaith wedi cymhwyso'r un model dadansoddi i becyn PCE, penderfynais ddefnyddio binio er mantais i mi. Wrth drosglwyddo data trwy SNP, mae'r holl wybodaeth o'r grŵp (bin) ynghlwm wrth y gwrthrych a ddychwelwyd. Hynny yw, gall hen ymholiadau (mewn theori) gyflymu prosesu ymholiadau newydd.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Wrth adeiladu'r pecyn, rhedais lawer o feincnodau i gymharu cyflymder wrth ddefnyddio gwahanol ddulliau. Rwy'n argymell peidio ag esgeuluso hyn, oherwydd weithiau mae'r canlyniadau'n annisgwyl. Er enghraifft, dplyr::filter yn llawer cyflymach na chipio rhesi gan ddefnyddio hidlo seiliedig ar fynegeion, ac roedd adalw colofn sengl o ffrâm ddata wedi'i hidlo yn llawer cyflymach na defnyddio cystrawen mynegeio.

Sylwch fod y gwrthrych prev_snp_results yn cynnwys yr allwedd snps_in_bin. Mae hwn yn amrywiaeth o'r holl SNPs unigryw mewn grŵp (bin), sy'n eich galluogi i wirio'n gyflym a oes gennych ddata o ymholiad blaenorol yn barod. Mae hefyd yn ei gwneud hi'n hawdd dolennu trwy'r holl SNPs mewn grŵp (bin) gyda'r cod hwn:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Canfyddiadau

Nawr gallwn (ac rydym wedi dechrau o ddifrif) redeg modelau a senarios a oedd yn anhygyrch i ni yn flaenorol. Y peth gorau yw nad oes rhaid i fy nghydweithwyr labordy feddwl am unrhyw gymhlethdodau. Dim ond swyddogaeth sy'n gweithio sydd ganddyn nhw.

Ac er bod y pecyn yn arbed y manylion iddynt, ceisiais wneud y fformat data yn ddigon syml y gallent ei ddarganfod pe bawn yn diflannu'n sydyn yfory ...

Mae'r cyflymder wedi cynyddu'n sylweddol. Rydym fel arfer yn sganio darnau genom swyddogaethol arwyddocaol. Yn flaenorol, ni allem wneud hyn (trodd allan i fod yn rhy ddrud), ond nawr, diolch i strwythur y grŵp (bin) a'r caching, mae cais am un PCE yn cymryd llai na 0,1 eiliad ar gyfartaledd, ac mae'r defnydd o ddata felly isel fod y costau ar gyfer S3 yn bysgnau.

Casgliad

Nid yw'r erthygl hon yn ganllaw o gwbl. Trodd yr ateb allan i fod yn unigol, a bron yn sicr nid yw'n optimaidd. Yn hytrach, mae'n travelogue. Rwyf am i eraill ddeall nad yw penderfyniadau o'r fath yn ymddangos yn llawn yn y pen, maent yn ganlyniad i brawf a chamgymeriad. Hefyd, os ydych chi'n chwilio am wyddonydd data, cofiwch fod defnyddio'r offer hyn yn effeithiol yn gofyn am brofiad, ac mae profiad yn costio arian. Rwy'n hapus fy mod wedi cael y modd i dalu, ond ni fydd llawer o bobl eraill sy'n gallu gwneud yr un swydd yn well na fi byth yn cael y cyfle oherwydd diffyg arian i hyd yn oed geisio.

Mae offer data mawr yn amlbwrpas. Os oes gennych yr amser, gallwch bron yn sicr ysgrifennu datrysiad cyflymach gan ddefnyddio technegau glanhau, storio ac echdynnu data craff. Yn y pen draw mae'n dibynnu ar ddadansoddiad cost a budd.

Beth ddysgais i:

  • nid oes ffordd rad i ddosrannu 25 TB ar y tro;
  • byddwch yn ofalus gyda maint eich ffeiliau Parquet a'u trefniadaeth;
  • Rhaid cydbwyso rhaniadau yn Spark;
  • Yn gyffredinol, peidiwch byth â cheisio gwneud 2,5 miliwn o barwydydd;
  • Mae didoli yn dal yn anodd, fel y mae sefydlu Spark;
  • weithiau mae data arbennig yn gofyn am atebion arbennig;
  • Mae agregu gwreichionen yn gyflym, ond mae rhaniad yn dal yn ddrud;
  • peidiwch â chysgu pan fyddant yn dysgu'r pethau sylfaenol i chi, mae'n debyg bod rhywun eisoes wedi datrys eich problem yn ôl yn yr 1980au;
  • gnu parallel - peth hudol yw hwn, dylai pawb ei ddefnyddio;
  • Mae Spark yn hoffi data heb ei gywasgu ac nid yw'n hoffi cyfuno rhaniadau;
  • Mae gan Spark ormod o orbenion wrth ddatrys problemau syml;
  • Mae araeau cysylltiadol AWK yn effeithlon iawn;
  • gallwch gysylltu stdin и stdout o sgript R, ac felly ei ddefnyddio ar y gweill;
  • Diolch i weithredu llwybr smart, gall S3 brosesu llawer o ffeiliau;
  • Y prif reswm dros wastraffu amser yw optimeiddio'ch dull storio cyn pryd;
  • peidiwch â cheisio gwneud y gorau o dasgau â llaw, gadewch i'r cyfrifiadur ei wneud;
  • Dylai'r API fod yn syml er hwylustod a hyblygrwydd defnydd;
  • Os yw'ch data wedi'i baratoi'n dda, bydd caching yn hawdd!

Ffynhonnell: hab.com

Ychwanegu sylw