Parsing 25TB siv AWK thiab R

Parsing 25TB siv AWK thiab R
Yuav ua li cas nyeem tsab xov xwm no: Kuv thov txim rau cov ntawv nyeem tau ntev thiab chaotic. Txhawm rau txuag koj lub sijhawm, kuv pib txhua tshooj nrog "Qhov Kuv Kawm Tau Li Cas" kev taw qhia, uas qhia txog cov ntsiab lus ntawm tshooj hauv ib lossis ob kab lus.

"Tsuas qhia kuv qhov kev daws teeb meem!" Yog tias koj tsuas yog xav pom qhov kuv tuaj ntawm qhov twg, ces hla mus rau tshooj "Ua Ntau Qhov Tsim Kho," tab sis kuv xav tias nws yog qhov nthuav dua thiab muaj txiaj ntsig los nyeem txog kev ua tsis tiav.

Tsis ntev los no kuv tau ua haujlwm nrog kev teeb tsa cov txheej txheem rau kev ua cov ntim loj ntawm cov DNA ua ntu zus (technically SNP nti). Qhov xav tau yog kom tau txais cov ntaub ntawv sai sai ntawm qhov chaw muab caj ces (hu ua SNP) rau kev ua qauv tom ntej thiab lwm yam haujlwm. Siv R thiab AWK, Kuv tuaj yeem ntxuav thiab teeb tsa cov ntaub ntawv hauv txoj hauv kev, ua kom nrawm nrawm rau kev nug. Qhov no tsis yooj yim rau kuv thiab xav tau ntau qhov kev rov ua dua. Kab lus no yuav pab koj zam qee yam ntawm kuv qhov yuam kev thiab qhia koj tias kuv tau ua dab tsi.

Ua ntej, qee cov lus piav qhia.

cov ntaub ntawv

Peb lub tsev kawm ntawv genetic information processing center muab peb cov ntaub ntawv nyob rau hauv daim ntawv ntawm 25 TB TSV. Kuv tau txais lawv faib ua 5 Gzip-compressed pob, txhua tus muaj txog 240 plaub-gigabyte cov ntaub ntawv. Txhua kab muaj cov ntaub ntawv rau ib SNP los ntawm ib tus neeg. Nyob rau hauv tag nrho, cov ntaub ntawv ntawm ~ 2,5 lab SNPs thiab ~ 60 txhiab tus neeg tau kis. Ntxiv nrog rau SNP cov ntaub ntawv, cov ntaub ntawv muaj ntau kab nrog cov lej qhia txog ntau yam yam ntxwv, xws li kev nyeem ntawv, zaus ntawm cov alleles sib txawv, thiab lwm yam. Nyob rau hauv tag nrho muaj txog 30 kab nrog cov nqi tshwj xeeb.

Lub hom phiaj

Raws li nrog rau txhua qhov kev tswj xyuas cov ntaub ntawv, qhov tseem ceeb tshaj plaws yog los txiav txim seb cov ntaub ntawv yuav siv li cas. Hauv qhov no peb feem ntau yuav xaiv cov qauv thiab kev ua haujlwm rau SNP raws li SNP. Ntawd yog, peb tsuas yog xav tau cov ntaub ntawv ntawm ib SNP ib zaug. Kuv yuav tsum kawm yuav ua li cas rov qab tau tag nrho cov ntaub ntawv cuam tshuam nrog ib qho ntawm 2,5 lab SNPs kom yooj yim, sai thiab pheej yig li sai tau.

Yuav ua li cas tsis ua li no

Txhawm rau hais qhov tsim nyog clichΓ©:

Kuv tsis poob ib txhiab zaus, kuv tsuas yog nrhiav tau ib txhiab txoj hauv kev kom tsis txhob parsing ib pawg ntawm cov ntaub ntawv hauv cov lus nug-phooj ywg.

Ua ntej sim

Kuv tau kawm dab tsi: Tsis muaj txoj hauv kev pheej yig los ntsuas 25 TB ib zaug.

Tom qab kawm "Cov Txheej Txheem Tshaj Tawm rau Kev Ua Cov Ntaub Ntawv Loj" ntawm Vanderbilt University, kuv paub tseeb tias qhov ua kom yuam kev nyob hauv lub hnab. Nws yuav siv sij hawm ib teev lossis ob zaug los teeb tsa Hive server kom khiav los ntawm tag nrho cov ntaub ntawv thiab tshaj tawm qhov tshwm sim. Txij li thaum peb cov ntaub ntawv khaws cia hauv AWS S3, kuv tau siv qhov kev pabcuam Athena, uas tso cai rau koj siv Hive SQL cov lus nug rau S3 cov ntaub ntawv. Koj tsis tas yuav teeb tsa / tsa pawg Hive, thiab koj kuj them rau cov ntaub ntawv koj tab tom nrhiav.

Tom qab kuv pom Athena kuv cov ntaub ntawv thiab nws hom, kuv tau khiav qee qhov kev xeem nrog cov lus nug zoo li no:

select * from intensityData limit 10;

Thiab sai sai tau txais cov txiaj ntsig zoo. Npaj txhij.

Txog thaum peb sim siv cov ntaub ntawv hauv peb txoj haujlwm ...

Kuv raug hais kom rub tawm tag nrho cov ntaub ntawv SNP los kuaj tus qauv ntawm. Kuv khiav cov lus nug:


select * from intensityData 
where snp = 'rs123456';

...thiab pib tos. Tom qab yim feeb thiab ntau dua 4 TB ntawm cov ntaub ntawv thov, kuv tau txais qhov tshwm sim. Athena tsub nqi los ntawm qhov ntim ntawm cov ntaub ntawv pom, $ 5 ib terabyte. Yog li no qhov kev thov no raug nqi $ 20 thiab yim feeb ntawm kev tos. Txhawm rau khiav tus qauv ntawm tag nrho cov ntaub ntawv, peb yuav tsum tau tos 38 xyoo thiab them $ 50 lab. Pom tseeb, qhov no tsis haum rau peb.

Yuav tsum tau siv parquet ...

Kuv tau kawm dab tsi: Ceev faj nrog qhov loj ntawm koj cov ntaub ntawv Parquet thiab lawv lub koom haum.

Kuv thawj zaug sim kho qhov xwm txheej los ntawm kev hloov tag nrho TSV rau Parquet cov ntaub ntawv. Lawv yooj yim rau kev ua hauj lwm nrog cov ntaub ntawv loj vim hais tias cov ntaub ntawv nyob rau hauv lawv yog khaws cia nyob rau hauv columnar daim ntawv: txhua kem nyob rau hauv nws tus kheej nco / disk ntu, nyob rau hauv sib piv rau cov ntawv nyeem cov ntaub ntawv, nyob rau hauv uas kab muaj cov ntsiab lus ntawm txhua kem. Thiab yog tias koj xav nrhiav ib yam dab tsi, ces tsuas yog nyeem cov kab ntawv xav tau. Tsis tas li ntawd, txhua cov ntaub ntawv khaws cov nqi ntau hauv ib kem, yog li yog tias tus nqi koj tab tom nrhiav tsis nyob hauv kab ntawv, Spark yuav tsis nkim sij hawm luam theej duab tag nrho cov ntaub ntawv.

Kuv ua haujlwm yooj yim AWS Kua nplaum hloov peb TSVs rau Parquet thiab tso cov ntaub ntawv tshiab rau hauv Athena. Nws siv li 5 teev. Tab sis thaum kuv khiav qhov kev thov, nws siv tib lub sijhawm thiab nyiaj tsawg me ntsis kom ua tiav. Qhov tseeb yog tias Spark, sim ua kom zoo dua txoj haujlwm, tsuas yog unpacked TSV chunks thiab muab tso rau hauv nws tus kheej Parquet chunk. Thiab vim hais tias txhua lub chunk loj txaus kom muaj tag nrho cov ntaub ntawv ntawm ntau tus neeg, txhua cov ntaub ntawv muaj tag nrho cov SNPs, yog li Spark yuav tsum qhib tag nrho cov ntaub ntawv los rho tawm cov ntaub ntawv nws xav tau.

Interestingly, Parquet lub neej ntawd (thiab pom zoo) compression hom, snappy, tsis splittable. Yog li ntawd, txhua tus executor tau daig ntawm txoj haujlwm ntawm kev tshem tawm thiab rub tawm tag nrho 3,5 GB dataset.

Parsing 25TB siv AWK thiab R

Cia peb nkag siab qhov teeb meem

Kuv tau kawm dab tsi: Kev txheeb xyuas yog qhov nyuaj, tshwj xeeb tshaj yog tias cov ntaub ntawv faib tawm.

Nws zoo li kuv tias tam sim no kuv nkag siab lub ntsiab ntawm qhov teeb meem. Kuv tsuas xav txheeb cov ntaub ntawv los ntawm SNP kem, tsis yog los ntawm tib neeg. Tom qab ntawd ob peb SNPs yuav muab khaws cia rau hauv cov ntaub ntawv sib cais, thiab tom qab ntawd Parquet "ntse" ua haujlwm "qhib tsuas yog tias tus nqi nyob hauv qhov ntau" yuav qhia nws tus kheej hauv tag nrho nws lub yeeb koob. Hmoov tsis zoo, kev txheeb xyuas los ntawm ntau txhiab tus kab uas tawg thoob plaws ib pawg tau ua pov thawj tias yog ib txoj haujlwm nyuaj.

AWS yeej tsis xav muab qhov nyiaj rov qab vim yog vim li cas "Kuv yog ib tus tub ntxhais kawm cuam tshuam" vim li cas. Tom qab kuv khiav sorting ntawm Amazon Glue, nws khiav tau 2 hnub thiab poob.

Yuav ua li cas yog partitioning?

Kuv tau kawm dab tsi: Partitions hauv Spark yuav tsum sib npaug.

Tom qab ntawd kuv tuaj nrog lub tswv yim ntawm kev faib cov ntaub ntawv hauv chromosomes. Muaj 23 ntawm lawv (thiab ntau ntxiv yog tias koj coj mus rau hauv tus account mitochondrial DNA thiab cov cheeb tsam uas tsis tau pom dua).
Qhov no yuav cia koj faib cov ntaub ntawv mus rau hauv me me chunks. Yog tias koj ntxiv ib kab rau Spark export muaj nuj nqi hauv cov ntawv nplaum partition_by = "chr", ces cov ntaub ntawv yuav tsum muab faib ua thoob.

Parsing 25TB siv AWK thiab R
Lub genome muaj ntau qhov seem hu ua chromosomes.

Hmoov tsis, nws tsis ua haujlwm. Chromosomes muaj ntau qhov sib txawv, uas txhais tau hais tias cov ntaub ntawv sib txawv. Qhov no txhais tau hais tias cov haujlwm uas Spark xa mus rau cov neeg ua haujlwm tsis sib npaug thiab ua tiav maj mam vim qee cov nodes ua tiav ntxov thiab tsis ua haujlwm. Txawm li cas los xij, cov dej num tau ua tiav. Tab sis thaum thov rau ib qho SNP, qhov tsis txaus ntseeg tau ua rau muaj teeb meem. Tus nqi ntawm kev ua SNPs ntawm cov chromosomes loj (uas yog, qhov uas peb xav tau cov ntaub ntawv) tsuas yog txo los ntawm ib qho ntawm 10. Ntau, tab sis tsis txaus.

Yuav ua li cas yog tias peb faib nws mus rau hauv ib qho me me?

Kuv tau kawm dab tsi: Tsis txhob sim ua 2,5 lab partitions txhua.

Kuv txiav txim siab tawm mus thiab faib txhua qhov SNP. Qhov no ua kom ntseeg tau tias cov partitions ntawm qhov sib npaug. YOG IB YAM PHEM. Kuv siv Glue thiab ntxiv ib txoj kab dawb huv partition_by = 'snp'. Txoj haujlwm tau pib thiab pib ua tiav. Ib hnub tom qab ntawd kuv tau tshawb xyuas thiab pom tias tseem tsis tau muaj dab tsi sau rau S3, yog li kuv tua txoj haujlwm. Nws zoo li Glue tau sau cov ntaub ntawv nruab nrab mus rau qhov chaw zais hauv S3, ntau cov ntaub ntawv, tej zaum ob peb lab. Vim li ntawd, kuv qhov yuam kev raug nqi ntau dua ib txhiab daus las thiab tsis txaus siab rau kuv tus kws cob qhia.

Partitioning + sorting

Kuv tau kawm dab tsi: Kev txheeb xyuas tseem nyuaj, ib yam li kev kho Spark.

Kuv qhov kev sim zaum kawg ntawm kev faib ua feem cuam tshuam kuv faib cov chromosomes thiab tom qab ntawd txheeb xyuas txhua qhov kev faib tawm. Hauv txoj kev xav, qhov no yuav ua kom nrawm dua txhua cov lus nug vim tias qhov xav tau SNP cov ntaub ntawv yuav tsum nyob rau hauv ob peb Parquet chunks nyob rau hauv ib tug muab ntau yam. Hmoov tsis zoo, kev txheeb xyuas txawm tias cov ntaub ntawv faib tawm tau ua haujlwm nyuaj. Raws li qhov tshwm sim, kuv tau hloov mus rau EMR rau ib pawg kev cai thiab siv yim lub zog muaj zog (C5.4xl) thiab Sparklyr los tsim kom muaj kev ua haujlwm yooj yim dua ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...Txawm li cas los xij, txoj haujlwm tseem tsis tiav. Kuv teeb tsa nws nyob rau hauv ntau txoj kev: nce lub cim xeeb faib rau txhua cov lus nug executor, siv cov nodes nrog ib tug loj npaum li cas ntawm lub cim xeeb, siv broadcast variables (broadcasting variables), tab sis txhua lub sij hawm cov no tig tawm mus rau ib nrab ntsuas, thiab maj mam cov executors pib. ua tsis tiav kom txog thaum txhua yam nres.

Kuv tab tom muaj tswv yim

Kuv tau kawm dab tsi: Qee zaum cov ntaub ntawv tshwj xeeb xav tau cov kev daws teeb meem tshwj xeeb.

Txhua SNP muaj tus nqi ntawm txoj haujlwm. Qhov no yog tus lej sib raug rau tus lej ntawm cov hauv paus raws nws cov chromosome. Nov yog ib txoj hauv kev zoo thiab ntuj tsim peb cov ntaub ntawv. Thaum xub thawj kuv xav muab faib los ntawm thaj tsam ntawm txhua tus chromosome. Piv txwv li, txoj haujlwm 1 - 2000, 2001 - 4000, thiab lwm yam. Tab sis qhov teeb meem yog tias SNPs tsis sib npaug ntawm cov chromosomes, yog li cov pab pawg loj yuav sib txawv heev.

Parsing 25TB siv AWK thiab R

Raws li qhov tshwm sim, kuv tuaj rau kev tawg ntawm txoj haujlwm ua pawg (qib qib). Siv cov ntaub ntawv uas twb tau rub tawm lawm, kuv tau khiav qhov kev thov kom tau txais cov npe ntawm SNPs tshwj xeeb, lawv txoj haujlwm thiab chromosomes. Tom qab ntawd kuv txheeb cov ntaub ntawv nyob rau hauv txhua chromosome thiab sau SNPs rau hauv pab pawg (bin) ntawm qhov loj me. Cia peb hais 1000 SNPs txhua. Qhov no tau muab kuv SNP-rau-pab pawg-ib-chromosome kev sib raug zoo.

Thaum kawg, kuv tau ua pab pawg (bin) ntawm 75 SNPs, qhov laj thawj yuav tau piav qhia hauv qab no.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Ua ntej sim nrog Spark

Kuv tau kawm dab tsi: Spark aggregation yog ceev, tab sis partitioning tseem kim.

Kuv xav nyeem cov ntaub ntawv me me no (2,5 lab kab) cov ntaub ntawv rau hauv Spark, muab nws nrog cov ntaub ntawv nyoos, thiab muab faib nws los ntawm kab ntawv tshiab bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

kuv siv sdf_broadcast(), yog li Spark paub tias nws yuav tsum xa cov ntaub ntawv xa mus rau tag nrho cov nodes. Qhov no muaj txiaj ntsig yog tias cov ntaub ntawv me me thiab xav tau rau txhua txoj haujlwm. Txwv tsis pub, Spark sim ua kom ntse thiab faib cov ntaub ntawv raws li xav tau, uas tuaj yeem ua rau qeeb.

Thiab ib zaug ntxiv, kuv lub tswv yim tsis ua haujlwm: cov haujlwm tau ua haujlwm rau qee lub sijhawm, ua tiav lub koomhaum, thiab tom qab ntawd, zoo li cov neeg ua haujlwm tau pib los ntawm kev faib tawm, lawv pib ua tsis tiav.

Adding AWK

Kuv tau kawm dab tsi: Tsis txhob tsaug zog thaum koj tab tom qhia cov hauv paus. Muaj tseeb ib tug neeg twb daws koj qhov teeb meem rov qab rau xyoo 1980s.

Txog rau qhov no, qhov laj thawj rau tag nrho kuv qhov ua tsis tiav nrog Spark yog qhov sib tw ntawm cov ntaub ntawv hauv pawg. Tej zaum qhov xwm txheej tuaj yeem txhim kho nrog kev kho ua ntej. Kuv txiav txim siab sim faib cov ntaub ntawv nyoos rau hauv kab ntawm chromosomes, yog li kuv vam tias yuav muab Spark nrog "pre-partitioned" cov ntaub ntawv.

Kuv tau tshawb nrhiav ntawm StackOverflow rau yuav ua li cas faib los ntawm kab ntawv qhov tseem ceeb thiab pom zoo li no teb. Nrog AWK koj tuaj yeem faib cov ntawv sau los ntawm kab ntawv qhov tseem ceeb los ntawm kev sau nws hauv tsab ntawv es tsis xa cov txiaj ntsig mus rau stdout.

Kuv tau sau ib tsab ntawv Bash los sim nws. Downloaded ib lub pob TSV, ces unpacked nws siv gzip thiab xa mus rau awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Nws ua haujlwm!

Sau cov Cores

Kuv tau kawm dab tsi: gnu parallel - Nws yog ib yam khoom muaj yees, txhua tus yuav tsum siv nws.

Kev sib cais tau qeeb heev thiab thaum kuv pib htoptxhawm rau txheeb xyuas kev siv lub zog (thiab kim) EC2 piv txwv, nws tau pom tias kuv tsuas yog siv ib qho tseem ceeb thiab txog 200 MB ntawm lub cim xeeb. Yuav kom daws tau qhov teeb meem thiab tsis poob nyiaj ntau, peb yuav tsum xav txog seb yuav ua li cas ua haujlwm sib luag. Hmoov zoo, nyob rau hauv ib phau ntawv amazing kiag li Cov Ntaub Ntawv Kev Tshawb Fawb ntawm Kab hais kom ua Kuv pom ib tshooj los ntawm Jeron Janssens ntawm parallelization. Los ntawm nws kuv kawm txog gnu parallel, ib txoj hauv kev yooj yim heev rau kev siv ntau txoj xov hauv Unix.

Parsing 25TB siv AWK thiab R
Thaum kuv pib qhov kev faib tawm siv cov txheej txheem tshiab, txhua yam zoo, tab sis tseem muaj qhov tsis zoo - rub tawm S3 cov khoom rau disk tsis nrawm heev thiab tsis sib luag tag nrho. Txhawm rau kho qhov no, kuv tau ua qhov no:

  1. Kuv pom tias nws muaj peev xwm los siv S3 rub tawm theem ncaj qha rau hauv cov raj xa dej, tshem tawm tag nrho cov cia nruab nrab ntawm disk. Qhov no txhais tau tias kuv tuaj yeem zam kev sau cov ntaub ntawv nyoos rau disk thiab siv txawm tias me dua, thiab yog li pheej yig dua, cia rau AWS.
  2. Pab neeg aws configure set default.s3.max_concurrent_requests 50 nce ntau cov xov uas AWS CLI siv (los ntawm lub neej ntawd muaj 10).
  3. Kuv hloov mus rau EC2 piv txwv optimized rau network ceev, nrog tsab ntawv n nyob rau hauv lub npe. Kuv tau pom tias qhov poob ntawm kev ua lub zog thaum siv n-xws li ntau dua li kev them nyiaj los ntawm kev nce hauv kev thauj khoom ceev. Rau feem ntau cov dej num kuv siv c5n.4xl.
  4. Hloov gzip rau pigz, qhov no yog ib lub cuab yeej gzip uas tuaj yeem ua cov khoom txias kom sib npaug rau qhov pib tsis yog-parallelized ua haujlwm ntawm decompressing cov ntaub ntawv (qhov no tau pab tsawg kawg).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Cov kauj ruam no tau ua ke nrog ib leeg los ua kom txhua yam ua haujlwm sai heev. Los ntawm kev nce kev rub tawm nrawm thiab tshem tawm cov ntawv sau, tam sim no kuv tuaj yeem ua cov pob 5 terabyte hauv ob peb teev xwb.

Cov tweet no yuav tsum tau hais txog 'TSV'. Alas.

Siv cov ntaub ntawv tshiab parsed

Kuv tau kawm dab tsi: Spark nyiam uncompressed cov ntaub ntawv thiab tsis nyiam combining partitions.

Tam sim no cov ntaub ntawv yog nyob rau hauv S3 nyob rau hauv ib tug unpacked (nyeem: sib koom) thiab semi-ordered hom, thiab kuv yuav rov qab mus rau Spark dua. Ib qho surprise tos kuv: Kuv rov ua tsis tiav qhov kuv xav tau! Nws nyuaj heev los qhia Spark raws nraim li cas cov ntaub ntawv tau muab faib. Thiab txawm tias thaum kuv ua qhov no, nws tau pom tias muaj ntau qhov sib faib (95 txhiab), thiab thaum kuv siv coalesce txo lawv tus lej mus rau qhov tsim nyog txwv, qhov no ua rau kuv qhov kev faib tawm. Kuv paub tseeb tias qhov no tuaj yeem kho tau, tab sis tom qab ob peb hnub ntawm kev tshawb nrhiav kuv nrhiav tsis tau. Thaum kawg kuv tau ua tiav tag nrho cov haujlwm hauv Spark, txawm hais tias nws siv sijhawm ib ntus thiab kuv cov ntaub ntawv Parquet tsis yog me me (~ 200 KB). Txawm li cas los xij, cov ntaub ntawv yog qhov uas nws xav tau.

Parsing 25TB siv AWK thiab R
Me me thiab tsis sib xws, zoo kawg nkaus!

Ntsuam xyuas cov lus nug hauv zos Spark

Kuv tau kawm dab tsi: Spark muaj nyiaj ntau dhau thaum daws teeb meem yooj yim.

Los ntawm rub tawm cov ntaub ntawv hauv hom ntse, kuv tuaj yeem kuaj qhov nrawm. Teeb tsa R tsab ntawv los khiav Spark server hauv zos, thiab tom qab ntawd thauj cov ntaub ntawv Spark los ntawm pawg Parquet teev cia (hauv). Kuv sim thauj tag nrho cov ntaub ntawv tab sis tsis tuaj yeem tau txais Sparklyr kom paub txog qhov muab faib.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Qhov kev tua coj 29,415 vib nas this. Ntau zoo dua, tab sis tsis zoo heev rau kev sim loj ntawm txhua yam. Tsis tas li ntawd, kuv tsis tuaj yeem ua kom nrawm nrog caching vim tias thaum kuv sim cache cov ntaub ntawv hauv lub cim xeeb, Spark ib txwm poob, txawm tias thaum kuv faib ntau dua 50 GB ntawm lub cim xeeb rau cov ntaub ntawv uas hnyav dua 15.

Rov qab mus rau AWK

Kuv tau kawm dab tsi: Associative arrays hauv AWK yog qhov ua tau zoo heev.

Kuv paub tias kuv tuaj yeem ua tiav qhov nrawm dua. Kuv nco qab tias nyob rau hauv ib tug zoo kawg nkaus AWK tutorial los ntawm Bruce Barnett Kuv nyeem txog qhov zoo nkauj hu ua "associative arrays" Qhov tseem ceeb, cov no yog cov khub tseem ceeb, uas yog vim li cas thiaj li raug hu ua txawv hauv AWK, thiab yog li kuv tsis xav ntau txog lawv. Roman Cheplyaka nco qab tias lo lus "kev sib koom ua ke" yog ntau dua li lo lus "tus nqi tseem ceeb". Txawm tias koj saib qhov tseem ceeb-tus nqi hauv Google Ngram, koj yuav tsis pom lo lus no nyob ntawd, tab sis koj yuav pom kev sib koom ua ke! Tsis tas li ntawd, "tus khub tus nqi tseem ceeb" feem ntau cuam tshuam nrog cov ntaub ntawv, yog li nws ua rau muaj kev nkag siab ntau dua los piv nws nrog hashmap. Kuv pom tau hais tias kuv tuaj yeem siv cov associative arrays los koom nrog kuv SNPs nrog lub rooj rau hauv lub rooj thiab cov ntaub ntawv nyoos yam tsis siv Spark.

Txhawm rau ua qhov no, hauv AWK tsab ntawv kuv siv qhov thaiv BEGIN. Qhov no yog ib qho ntawm cov cai uas raug tua ua ntej thawj kab ntawm cov ntaub ntawv raug xa mus rau lub ntsiab lus ntawm tsab ntawv.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

pab neeg while(getline...) loaded tag nrho cov kab los ntawm pawg CSV (bin), teeb tsa thawj kab (SNP lub npe) ua tus yuam sij rau kev sib koom array bin thiab tus nqi thib ob (pab pawg) raws li tus nqi. Tom qab ntawd hauv qhov thaiv { }, uas yog ua tiav ntawm txhua kab ntawm cov ntaub ntawv tseem ceeb, txhua kab raug xa mus rau cov ntaub ntawv tso tawm, uas tau txais lub npe tshwj xeeb nyob ntawm nws pawg (hauv): ..._bin_"bin[$1]"_....

Hloov pauv batch_num ΠΈ chunk_id phim cov ntaub ntawv muab los ntawm lub raj xa dej, zam kev sib tw, thiab txhua txoj kev ua tiav xov parallel, sau rau nws tus kheej cov ntaub ntawv tshwj xeeb.

Txij li thaum kuv tau tawg tag nrho cov ntaub ntawv nyoos mus rau hauv cov folders ntawm chromosomes tshuav dhau los ntawm kuv qhov kev sim yav dhau los nrog AWK, tam sim no kuv tuaj yeem sau lwm tsab ntawv Bash los ua cov chromosome ib zaug thiab xa cov ntaub ntawv sib sib zog nqus rau S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Tsab ntawv muaj ob ntu parallel.

Hauv thawj ntu, cov ntaub ntawv tau nyeem los ntawm txhua cov ntaub ntawv uas muaj cov ntaub ntawv ntawm cov chromosome uas xav tau, tom qab ntawd cov ntaub ntawv no tau muab faib rau hauv cov xov, uas faib cov ntaub ntawv mus rau hauv pawg tsim nyog (bin). Txhawm rau kom tsis txhob muaj kev sib tw thaum ntau cov xov sau rau tib cov ntaub ntawv, AWK hla cov npe cov ntaub ntawv los sau cov ntaub ntawv mus rau qhov chaw sib txawv, piv txwv li. chr_10_bin_52_batch_2_aa.csv. Yog li ntawd, ntau cov ntaub ntawv me me raug tsim rau ntawm lub disk (rau qhov no kuv siv terabyte EBS ntim).

Conveyor los ntawm ntu thib ob parallel mus los ntawm cov pab pawg (bin) thiab muab lawv cov ntaub ntawv sib txuas rau hauv CSV c catthiab ces xa lawv rau export.

Tshaj tawm hauv R?

Kuv tau kawm dab tsi: Koj tuaj yeem tiv tauj stdin ΠΈ stdout los ntawm R tsab ntawv, thiab yog li siv nws hauv cov kav dej.

Tej zaum koj yuav tau pom cov kab no hauv koj tsab ntawv Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Nws txhais tag nrho cov ntaub ntawv sib koom ua ke (bin) rau hauv R tsab ntawv hauv qab no. {} yog cov txheej txheem tshwj xeeb parallel, uas ntxig cov ntaub ntawv nws xa mus rau cov dej teev ncaj qha rau hauv cov lus txib nws tus kheej. Kev xaiv {#} muab tus xov tooj tshwj xeeb ID, thiab {%} sawv cev rau qhov chaw ua haujlwm (rov ua dua, tab sis tsis txhob ib txhij). Ib daim ntawv teev tag nrho cov kev xaiv tuaj yeem pom hauv cov ntaub ntawv.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Thaum muaj kev hloov pauv file("stdin") kis mus rau readr::read_csv, cov ntaub ntawv txhais rau hauv R tsab ntawv yog loaded rau hauv ib tug ncej, uas yog ces nyob rau hauv daim ntawv .rds- siv cov ntaub ntawv aws.s3 sau ncaj qha rau S3.

RDS yog ib yam dab tsi zoo li junior version ntawm Parquet, tsis muaj frills ntawm hais lus cia.

Tom qab ua tiav cov ntawv Bash kuv tau txais ib pob .rds-files nyob rau hauv S3, uas tso cai rau kuv siv npaum compression thiab built-in hom.

Txawm hais tias siv tus nres R, txhua yam ua haujlwm sai heev. Tsis yog qhov xav tsis thoob, qhov chaw ntawm R uas nyeem thiab sau cov ntaub ntawv tau zoo heev. Tom qab kuaj ntawm ib qho nruab nrab chromosome, txoj haujlwm ua tiav ntawm C5n.4xl piv txwv li ntawm ob teev.

S3 txwv

Kuv tau kawm dab tsi: Ua tsaug rau kev siv txoj kev ntse, S3 tuaj yeem ua ntau yam ntaub ntawv.

Kuv txhawj xeeb seb S3 puas tuaj yeem daws tau ntau cov ntaub ntawv uas tau hloov mus rau nws. Kuv tuaj yeem ua rau cov npe cov ntaub ntawv muaj txiaj ntsig, tab sis S3 yuav nrhiav lawv li cas?

Parsing 25TB siv AWK thiab R
Cov ntawv tais ceev tseg hauv S3 tsuas yog rau kev qhia, qhov tseeb qhov system tsis txaus siab rau lub cim /. Los ntawm S3 FAQ nplooj ntawv.

Nws zoo nkaus li tias S3 sawv cev rau txoj hauv kev mus rau ib qho ntaub ntawv raws li tus yuam sij yooj yim hauv cov lus hash lossis cov ntaub ntawv raws li cov ntaub ntawv. Lub thoob tuaj yeem xav tias yog lub rooj, thiab cov ntaub ntawv tuaj yeem suav tias yog cov ntaub ntawv hauv lub rooj.

Txij li kev ceev thiab kev ua tau zoo yog qhov tseem ceeb rau kev ua kom tau nyiaj ntawm Amazon, nws tsis muaj qhov xav tsis thoob tias qhov tseem ceeb-raws li cov ntaub ntawv-txoj kev yog freaking optimized. Kuv sim nrhiav qhov sib npaug: yog li kuv tsis tas yuav tsum tau txais ntau qhov kev thov, tab sis qhov kev thov tau ua tiav sai. Nws muab tawm tias nws yog qhov zoo tshaj plaws los ua txog 20 txhiab bin ntaub ntawv. Kuv xav tias yog tias peb txuas ntxiv ua kom zoo dua, peb tuaj yeem ua tiav qhov nrawm (piv txwv li, ua lub thoob tshwj xeeb rau cov ntaub ntawv, yog li txo qhov loj ntawm lub rooj saib). Tab sis tsis muaj sijhawm lossis nyiaj txiag rau kev sim ntxiv.

Yuav ua li cas yog cross compatibility?

Qhov kuv kawm: Qhov thib ib ua rau lub sij hawm nkim yog optimizing koj txoj kev cia ntxov ntxov.

Lub sijhawm no, nws yog ib qho tseem ceeb heev uas yuav tau nug koj tus kheej: "Vim li cas siv cov ntaub ntawv tsim nyog?" Yog vim li cas nyob rau hauv loading ceev (gzipped CSV cov ntaub ntawv siv 7 lub sij hawm ntev mus thauj khoom) thiab compatibility nrog peb workflows. Kuv yuav rov xav dua yog tias R tuaj yeem yooj yim thauj cov ntaub ntawv Parquet (lossis Arrow) yam tsis muaj Spark load. Txhua tus neeg hauv peb lub lab siv R, thiab yog tias kuv xav hloov cov ntaub ntawv mus rau lwm hom ntawv, kuv tseem muaj cov ntaub ntawv qub, yog li kuv tuaj yeem khiav lub raj xa dej ntxiv.

Kev faib ua haujlwm

Kuv tau kawm dab tsi: Tsis txhob sim optimize cov hauj lwm manually, cia lub computer ua nws.

Kuv tau debugged lub workflow ntawm ib chromosome, tam sim no kuv yuav tsum tau ua tag nrho lwm cov ntaub ntawv.
Kuv xav tsa ob peb EC2 piv txwv rau kev hloov dua siab tshiab, tab sis tib lub sijhawm kuv ntshai kom tau txais qhov tsis sib npaug ntawm cov haujlwm sib txawv (ib yam li Spark raug kev txom nyem los ntawm kev sib faib tsis sib npaug). Tsis tas li ntawd, kuv tsis xav tsa ib qho piv txwv ntawm chromosome, vim hais tias rau AWS cov nyiaj muaj qhov txwv tsis pub dhau 10 zaus.

Tom qab ntawd kuv txiav txim siab sau tsab ntawv hauv R txhawm rau txhim kho cov haujlwm ua haujlwm.

Ua ntej, kuv nug S3 los xam seb qhov chaw khaws cia ntau npaum li cas txhua tus chromosome nyob.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Tom qab ntawd kuv tau sau ib txoj haujlwm uas yuav siv tag nrho qhov loj me, sib hloov ntawm qhov kev txiav txim ntawm chromosomes, faib ua pawg num_jobs thiab qhia koj tias qhov sib txawv ntawm txhua qhov kev ua haujlwm yog li cas.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Γ— 3]>

Tom qab ntawd kuv tau khiav dhau ib txhiab shuffles siv purrr thiab xaiv qhov zoo tshaj plaws.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Yog li kuv tau xaus nrog cov haujlwm uas zoo sib xws hauv qhov loj. Tom qab ntawd txhua yam uas tau tso tseg yog qhwv kuv tsab ntawv Bash yav dhau los hauv lub voj loj for. Qhov kev ua kom zoo no siv sijhawm li 10 feeb los sau. Thiab qhov no tsawg dua qhov kuv yuav siv los tsim cov haujlwm yog tias lawv tsis sib npaug. Yog li ntawd, kuv xav tias kuv yog txoj cai nrog qhov ua ntej optimization.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Thaum kawg kuv ntxiv cov lus txib kaw:

sudo shutdown -h now

... thiab txhua yam ua tiav! Siv AWS CLI, Kuv tau tsa cov piv txwv uas siv qhov kev xaiv user_data muab lawv Bash scripts ntawm lawv cov dej num rau kev ua. Lawv khiav thiab kaw cia li, yog li kuv tsis tau them nyiaj rau kev ua haujlwm ntxiv.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Cia peb ntim!

Kuv tau kawm dab tsi: API yuav tsum yooj yim rau qhov yooj yim thiab yooj yim ntawm kev siv.

Thaum kawg kuv tau txais cov ntaub ntawv nyob rau hauv qhov chaw thiab daim ntawv. Txhua yam uas tseem tshuav yog ua kom yooj yim cov txheej txheem ntawm kev siv cov ntaub ntawv ntau npaum li ua tau kom yooj yim rau kuv cov npoj yaig. Kuv xav ua ib qho yooj yim API rau tsim kev thov. Yog tias yav tom ntej kuv txiav txim siab hloov ntawm .rds rau Parquet cov ntaub ntawv, ces qhov no yuav tsum yog qhov teeb meem rau kuv, tsis yog rau kuv cov npoj yaig. Rau qhov no kuv txiav txim siab ua ib pob R hauv.

Tsim thiab sau ib pob yooj yim heev uas tsuas muaj ob peb cov ntaub ntawv nkag tau rau hauv ib qho kev ua haujlwm get_snp. Kuv kuj tau ua lub vev xaib rau kuv cov npoj yaig pkg ua, yog li lawv tuaj yeem pom cov piv txwv thiab cov ntaub ntawv yooj yim.

Parsing 25TB siv AWK thiab R

Ntse caching

Kuv tau kawm dab tsi: Yog tias koj cov ntaub ntawv npaj tau zoo, caching yuav yooj yim!

Txij li thaum ib qho ntawm cov haujlwm tseem ceeb tau siv tib lub qauv tshuaj ntsuam xyuas rau pob SNP, kuv txiav txim siab siv binning rau kuv kom zoo dua. Thaum xa cov ntaub ntawv los ntawm SNP, tag nrho cov ntaub ntawv los ntawm pawg (bin) txuas rau cov khoom xa rov qab. Ntawd yog, cov lus nug qub tuaj yeem (hauv txoj kev xav) ua kom cov lus nug tshiab.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Thaum tsim lub pob, kuv tau khiav ntau qhov ntsuas los sib piv ceev thaum siv ntau txoj kev. Kuv pom zoo kom tsis txhob hnov ​​​​qab txog qhov no, vim tias qee zaum cov txiaj ntsig tsis tau xav txog. Piv txwv li, dplyr::filter tau nrawm dua li kev ntes kab uas siv indexing-raws li filtering, thiab retrieving ib kem los ntawm cov ntaub ntawv lim tau sai dua li siv indexing syntax.

Thov nco ntsoov tias qhov khoom prev_snp_results muaj tus yuam sij snps_in_bin. Qhov no yog ib qho ntawm txhua qhov tshwj xeeb SNPs hauv ib pab pawg (bin), uas tso cai rau koj los xyuas sai yog tias koj twb muaj cov ntaub ntawv los ntawm cov lus nug dhau los. Nws kuj tseem ua rau nws yooj yim mus ncig los ntawm tag nrho cov SNPs hauv ib pab pawg (bin) nrog cov cai no:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Π Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹

Tam sim no peb tuaj yeem (thiab tau pib ua tiag) khiav cov qauv thiab cov xwm txheej uas yav tas los siv tsis tau rau peb. Qhov zoo tshaj plaws yog tias kuv cov npoj yaig lab tsis tas yuav xav txog qhov teeb meem. Lawv tsuas muaj lub luag haujlwm uas ua haujlwm.

Thiab txawm hais tias lub pob spares lawv cov ntsiab lus, kuv sim ua kom cov ntaub ntawv hom yooj yim txaus uas lawv tuaj yeem paub nws yog tias kuv dheev ploj tag kis ...

Qhov ceev tau nce noticeably. Peb feem ntau scan functionally tseem ceeb genome fragments. Yav dhau los, peb tsis tuaj yeem ua qhov no (nws tau dhau los ua kim dhau), tab sis tam sim no, ua tsaug rau pab pawg (bin) qauv thiab caching, thov rau ib qho SNP siv qhov nruab nrab tsawg dua 0,1 vib nas this, thiab cov ntaub ntawv siv yog li ntawd. tsawg uas tus nqi rau S3 yog txiv laum huab xeeb.

xaus

Kab lus no tsis yog phau ntawv qhia txhua. Txoj kev daws tau los ua tus kheej, thiab yuav luag tsis zoo. Es tsis txhob, nws yog ib tug travelogue. Kuv xav kom lwm tus to taub tias qhov kev txiav txim siab zoo li no tsis tshwm sim hauv lub taub hau, lawv yog qhov tshwm sim ntawm kev sim thiab qhov yuam kev. Tsis tas li ntawd, yog tias koj tab tom nrhiav rau tus kws tshawb fawb cov ntaub ntawv, nco ntsoov tias siv cov cuab yeej no kom muaj txiaj ntsig zoo yuav tsum muaj kev paub dhau los, thiab kev paub dhau los raug nqi nyiaj. Kuv zoo siab tias kuv muaj txoj hauv kev los them, tab sis ntau tus neeg uas tuaj yeem ua txoj haujlwm zoo ib yam li kuv yuav tsis muaj sijhawm vim tsis muaj nyiaj txawm tias sim.

Cov ntaub ntawv loj cov cuab yeej muaj ntau yam. Yog tias koj muaj sijhawm, koj tuaj yeem sau tau cov kev daws teeb meem sai dua siv cov ntaub ntawv ntse tu, khaws cia, thiab cov txheej txheem rho tawm. Thaum kawg nws los txog rau kev txheeb xyuas tus nqi-cov txiaj ntsig.

Qhov kuv kawm:

  • tsis muaj txoj hauv kev pheej yig los ntsuas 25 TB ib zaug;
  • ceev faj nrog qhov loj ntawm koj cov ntaub ntawv Parquet thiab lawv lub koom haum;
  • Partitions hauv Spark yuav tsum sib npaug;
  • Feem ntau, tsis txhob sim ua 2,5 lab partitions;
  • Kev txheeb xyuas tseem nyuaj, ib yam li kev teeb tsa Spark;
  • Qee zaum cov ntaub ntawv tshwj xeeb yuav tsum muaj kev daws teeb meem tshwj xeeb;
  • Spark aggregation yog ceev, tab sis partitioning tseem kim;
  • tsis txhob tsaug zog thaum lawv qhia koj cov hauv paus, tej zaum ib tug neeg twb daws koj qhov teeb meem rov qab rau xyoo 1980s;
  • gnu parallel - qhov no yog tej yam yees siv, txhua leej txhua tus yuav tsum siv nws;
  • Spark nyiam uncompressed cov ntaub ntawv thiab tsis nyiam combining partitions;
  • Spark muaj nyiaj siv ua haujlwm ntau dhau thaum daws teeb meem yooj yim;
  • AWK's associative arrays ua tau zoo heev;
  • koj hu tau stdin ΠΈ stdout los ntawm R tsab ntawv, thiab yog li siv nws hauv cov kav dej;
  • Ua tsaug rau kev siv txoj kev ntse, S3 tuaj yeem ua ntau cov ntaub ntawv;
  • Lub ntsiab yog vim li cas rau nkim sij hawm yog prematurely optimizing koj txoj kev cia;
  • tsis txhob sim optimize tej hauj lwm manually, cia lub computer ua nws;
  • API yuav tsum yooj yim rau qhov yooj yim thiab yooj yim ntawm kev siv;
  • Yog tias koj cov ntaub ntawv npaj tau zoo, caching yuav yooj yim!

Tau qhov twg los: www.hab.com

Ntxiv ib saib