Nyochaa 25TB site na iji AWK na R

Nyochaa 25TB site na iji AWK na R
Otu esi agụ akụkọ a: A na m arịọ mgbaghara maka ederede dị ogologo na ọgba aghara. Iji chekwaa oge, m na-eji okwu mmeghe “Ihe M Mụtara” na-amalite isiakwụkwọ nke ọ bụla, bụ́ nke na-achịkọta isi ihe dị n’isiakwụkwọ ahụ n’otu ahịrịokwu ma ọ bụ abụọ.

"Naanị gosi m ngwọta ya!" Ọ bụrụ na ịchọrọ ịhụ ebe m si bịa, gaa na isiakwụkwọ "Ịghọ ndị ọzọ na-emepụta ihe," ma echere m na ọ bụ ihe na-adọrọ mmasị ma bara uru ịgụ banyere ọdịda.

Enyere m ọrụ n'oge na-adịbeghị anya ka m guzobe usoro maka ịhazi nnukwu usoro DNA nke raw (na teknụzụ SNP). Mkpa ọ dị bụ ịnweta data ngwa ngwa gbasara ọnọdụ mkpụrụ ndụ ihe nketa enyere (nke a na-akpọ SNP) maka ịmegharị ihe na-esote yana ọrụ ndị ọzọ. Iji R na AWK, enwere m ike ihicha na hazie data n'ụzọ okike, na-eme ka nhazi ajụjụ dị ngwa ngwa. Nke a adịghịrị m mfe ma chọọ ọtụtụ ugboro ugboro. Edemede a ga-enyere gị aka ịzere ụfọdụ mmejọ m wee gosi gị ihe m mechara mee.

Nke mbụ, ụfọdụ nkọwa mmalite.

data

Ebe nhazi ozi gbasara mkpụrụ ndụ ihe nketa mahadum anyị nyere anyị data n'ụdị 25 TB TSV. Enwetara m ha kewara n'ime ngwungwu 5 Gzip-mpịakọta, nke ọ bụla nwere ihe dị ka faịlụ 240 dị gigabyte anọ. Ahịrị ọ bụla nwere data maka otu SNP sitere na otu onye. Na mkpokọta, data na ~ 2,5 nde SNP na ~ 60 puku mmadụ bufere. Na mgbakwunye na ozi SNP, faịlụ ndị ahụ nwere ọtụtụ ogidi nwere ọnụọgụ na-egosipụta njirimara dị iche iche, dị ka ike ịgụ akwụkwọ, ugboro ole alleles dị iche iche, wdg. Na mkpokọta enwere ihe dịka ogidi iri atọ nwere ụkpụrụ pụrụ iche.

Mgbalị

Dị ka ọ dị n'ọrụ njikwa data ọ bụla, ihe kacha mkpa bụ ikpebi otu a ga-esi jiri data ahụ. N'okwu a anyị ga-ahọrọkarị ụdị na usoro ọrụ maka SNP dabere na SNP. Ya bụ, anyị ga-achọ naanị data na otu SNP n'otu oge. Ekwesịrị m ịmụta ka m ga-esi weghachite ndekọ niile metụtara otu n'ime nde 2,5 SNP dị mfe, ngwa ngwa na ọnụ ala dị ka o kwere mee.

Kedu ka esi eme nke a

Iji kwupụta cliché dabara adaba:

Edaghị m otu puku ugboro, achọpụtara m otu puku ụzọ m ga-esi zere ịkpachapụta ụyọkọ data n'ụdị omume enyi.

Buru ụzọ nwaa

Gịnị ka m mụtara: Enweghị ụzọ dị ọnụ ala iji kpachapụta TB 25 n'otu oge.

N'ịbụ onye gụchara usoro "Advanced Methods for Big Data Processing" na Mahadum Vanderbilt, ejiri m n'aka na aghụghọ ahụ dị na akpa ahụ. Ọ ga-ewe otu awa ma ọ bụ abụọ iji guzobe ihe nkesa Hive ka ọ gafee data niile wee kọọ nsonaazụ ya. Ebe ọ bụ na echekwara data anyị na AWS S3, ejiri m ọrụ ahụ Athena, nke na-enye gị ohere itinye ajụjụ Hive SQL na data S3. Ịkwesighi ịtọlite/ebulite ụyọkọ Hive, ị na-akwụkwa ụgwọ naanị maka data ị na-achọ.

Mgbe m gosichara Athena data m na usoro ya, ejiri m ajụjụ mee nyocha ụfọdụ:

select * from intensityData limit 10;

Na ngwa ngwa natara nke ọma ahaziri pụta. Njikere.

Ruo mgbe anyị gbalịrị iji data na ọrụ anyị ...

A gwara m ka m wepụ ozi SNP niile iji nwalee ihe nlereanya ahụ. M gbara ajụjụ a:


select * from intensityData 
where snp = 'rs123456';

... wee malite ichere. Mgbe nkeji asatọ na karịa 4 TB nke data achọrọ, enwetara m nsonaazụ. Athena na-ana site na olu data achọtara, $5 kwa terabyte. Ya mere otu arịrịọ a na-eri $20 na nkeji asatọ nke ichere. Iji mee ihe nlereanya ahụ na data niile, anyị ga-echere afọ 38 ma kwụọ nde $ 50. N'ụzọ doro anya, nke a adịghị mma maka anyị.

Ọ dị mkpa iji parquet ...

Gịnị ka m mụtara: Kpachara anya na nha faịlụ Parquet gị yana nzukọ ha.

M gbalịrị idozi ọnọdụ ahụ site n'ịtụgharị TSV niile Parquet faịlụ. Ha na-adaba adaba maka ịrụ ọrụ na nnukwu data data n'ihi na a na-echekwa ozi dị na ha n'ụdị columnar: kọlụm ọ bụla dị na ebe nchekwa / diski nke ya, n'ụzọ dị iche na faịlụ ederede, nke ahịrị nwere ihe dị na kọlụm ọ bụla. Ma ọ bụrụ na ịchọrọ ịchọta ihe, wee gụọ naanị kọlụm achọrọ. Na mgbakwunye, faịlụ ọ bụla na-echekwa ụkpụrụ dị iche iche na kọlụm, yabụ ọ bụrụ na uru ị na-achọ adịghị na kọlụm kọlụm, Spark agaghị egbu oge iji nyochaa faịlụ niile.

M rụrụ ọrụ dị mfe AWS mama iji tọghata TSV anyị na Parquet wee tụba faịlụ ọhụrụ n'ime Athena. O were ihe dị ka awa ise. Ma mgbe m gbapụrụ arịrịọ ahụ, o were ihe dị ka otu oge na obere ego iji mechaa. Nke bụ eziokwu bụ na Spark, na-anwa ịkwalite ọrụ ahụ, wepụta otu TSV chunk ma tinye ya na chunk Parquet nke ya. Na n'ihi na nchikota nke ọ bụla buru ibu iji nwee ndekọ dum nke ọtụtụ mmadụ, faịlụ ọ bụla nwere SNP niile, ya mere Spark ga-emepe faịlụ niile iji wepụ ozi ọ chọrọ.

N'ụzọ na-akpali mmasị, ụdị mkpakọ nke ndabara (na akwadoro) Parquet, nke na-adịghị mma, anaghị ekewa. Ya mere, onye ọ bụla na-arụ ọrụ na-arapara n'ahụ ọrụ nke ịwepu na nbudata dataset 3,5 GB zuru ezu.

Nyochaa 25TB site na iji AWK na R

Ka anyị ghọta nsogbu ahụ

Gịnị ka m mụtara: Ịhazi ihe siri ike, karịsịa ma ọ bụrụ na ekesa data.

Ọ dị m ka m ghọtara isi nsogbu ahụ ugbu a. Naanị m chọrọ ịhazi data site na kọlụm SNP, ọ bụghị ndị mmadụ. Mgbe ahụ, a ga-echekwa ọtụtụ SNP na nkwụsị data dị iche iche, mgbe ahụ, ọrụ "smart" nke Parquet "meghere naanị ma ọ bụrụ na uru dị na nso" ga-egosipụta onwe ya n'ebube ya niile. N'ụzọ dị mwute, ịhazi ọtụtụ ijeri ahịrị ndị gbasasịrị n'otu ụyọkọ ghọrọ ọrụ siri ike.

AWS achọghị inye nkwụghachi n'ihi ihe kpatara "Abụ m nwa akwụkwọ dọpụ uche". Mgbe m gbasịrị nhazi na Amazon Glue, ọ gbara ụbọchị 2 wee daa.

Kedu maka nkewa?

Gịnị ka m mụtara: Nkebi na Spark ga-abụ nke ziri ezi.

Mgbe ahụ, m bịara na echiche nke nkebi data na chromosomes. Enwere 23 n'ime ha (na ọtụtụ ndị ọzọ ma ọ bụrụ na ị na-eburu n'uche DNA mitochondrial na mpaghara na-enweghị atụ).
Nke a ga-enye gị ohere kewaa data n'ime obere iberibe. Ọ bụrụ na ị gbakwunye naanị otu ahịrị na ọrụ mbupụ Spark na edemede gluu partition_by = "chr", mgbe ahụ, a ga-ekewa data ahụ na bọket.

Nyochaa 25TB site na iji AWK na R
Mkpụrụ ndụ ihe nketa ahụ nwere ọtụtụ iberibe a na-akpọ chromosomes.

N'ụzọ dị mwute, ọ naghị arụ ọrụ. Chromosomes nwere nha dị iche iche, nke pụtara ozi dị iche iche. Nke a pụtara na ọrụ Spark zigaara ndị ọrụ adịghị mma ma jiri nwayọ rụchaa ya n'ihi na ụfọdụ ọnụ na-arụcha n'isi ma na-abaghị uru. Agbanyeghị, arụchara ọrụ ndị ahụ. Mana mgbe ị na-arịọ maka otu SNP, enweghị ahaghị nhata kpatara nsogbu ọzọ. Ọnụ ahịa nhazi SNP na chromosomes buru ibu (ya bụ, ebe anyị chọrọ inweta data) agbadala naanị ihe dị ka ihe ruru iri. Ọtụtụ, mana ezughị oke.

Gịnị ma ọ bụrụ na anyị kee ya n'ime obere akụkụ?

Gịnị ka m mụtara: Ọ dịghị mgbe na-agbalị ime 2,5 nde partitions ọlị.

Ekpebiri m ịpụ niile wee kewaa SNP ọ bụla. Nke a mere ka o doo anya na akụkụ ahụ hà nhata. Ọ BỤ echiche ọjọọ. Eji m gluu wee tinye ahịrị na-emeghị ihe ọjọọ partition_by = 'snp'. Ọrụ ahụ malitere wee malite ịrụ ya. Otu ụbọchị ka e mesịrị, m lere anya wee hụ na ọ ka nwere ihe e dere na S3, n'ihi ya, m gburu ọrụ ahụ. Ọ dị ka Glue na-ede faịlụ etiti na ebe zoro ezo na S3, ọtụtụ faịlụ, ikekwe nde abụọ. N’ihi ya, ndudue m riri ihe karịrị otu puku dollar ma ọ dịghị amasị onye ndụmọdụ m.

Nkebi + nhazi

Gịnị ka m mụtara: Ịhazi ka siri ike, dịka ịmegharị Spark.

Mgbalị ikpeazụ m mere na nkewa gụnyere m ikewa chromosomes wee hazie nkebi ọ bụla. Na tiori, nke a ga-eme ka ajụjụ ọ bụla dị ngwa n'ihi na data SNP achọrọ ga-adị n'ime chunks Parquet ole na ole n'ime oke enyere. N'ụzọ dị mwute, nhazi ọbụna data kewara ekewa wee bụrụ ọrụ siri ike. N'ihi ya, m gbanwere na EMR maka ụyọkọ omenala ma jiri oge asatọ dị ike (C5.4xl) na Sparklyr mepụta usoro ọrụ na-agbanwe agbanwe ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... Otú ọ dị, ọrụ ahụ ka emechabeghị. Haziri m ya n'ụzọ dị iche iche: mụbaa oke ebe nchekwa maka onye na-eme ajụjụ ọ bụla, jiri ọnụ ọnụ nwere nnukwu ebe nchekwa, jiri mgbanwe mgbasa ozi (mgbanwe mgbasa ozi), mana oge ọ bụla ndị a tụgharịrị bụrụ ọkara, ma jiri nwayọ malite ndị mmebe ahụ. daa ruo mgbe ihe niile kwụsịrị.

M na-aghọwanye ihe okike

Gịnị ka m mụtara: Mgbe ụfọdụ, data pụrụ iche chọrọ ngwọta pụrụ iche.

SNP ọ bụla nwere uru ọnọdụ. Nke a bụ ọnụọgụ kwekọrọ na ọnụọgụ ntọala dị n'akụkụ chromosome ya. Nke a bụ ụzọ dị mma na nke okike iji hazie data anyị. Na mbụ, achọrọ m ikewa site na mpaghara chromosome ọ bụla. Dịka ọmụmaatụ, ọkwa 1 - 2000, 2001 - 4000, wdg. Mana nsogbu bụ na SNP adịghị ekesa nke ọma n'ofe chromosomes, yabụ nha otu ga-adịgasị iche nke ukwuu.

Nyochaa 25TB site na iji AWK na R

N'ihi ya, a bịara m na ndakpọ ọkwá n'ime otu (ọkwa). N'iji data ebudatara, agbagara m arịrịọ ka m nweta ndepụta nke SNP pụrụ iche, ọnọdụ ha na chromosomes. M wee hazie data ahụ n'ime chromosome ọ bụla wee chịkọta SNP n'ime otu (bin) nke nha enyere. Ka anyị kwuo 1000 SNP nke ọ bụla. Nke a nyere m mmekọrịta SNP-na-otu-kwa-chromosome.

N'ikpeazụ, emere m otu (bin) nke 75 SNP, a ga-akọwa ihe kpatara ya n'okpuru.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Buru ụzọ jiri Spark nwaa

Gịnị ka m mụtara: Nchịkọta ọkụ na-adị ngwa ngwa, mana nkewa ka dị oke ọnụ.

Achọrọ m ịgụ obere data a (ahịrị nde 2,5) n'ime Spark, jikọta ya na data raw, wee kewaa ya site na kọlụm agbakwunyere ọhụrụ. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Eji m ya sdf_broadcast(), yabụ Spark maara na o kwesịrị izipu etiti data na ọnụ ọnụ niile. Nke a bara uru ma ọ bụrụ na data dị ntakịrị na nke achọrọ maka ọrụ niile. Ma ọ bụghị ya, Spark na-agbalị ka ọ mara ihe ma na-ekesa data dịka ọ dị mkpa, nke nwere ike ime ka ọ daa.

Ma ọzọ, echiche m arụghị ọrụ: ọrụ ndị ahụ na-arụ ọrụ ruo oge ụfọdụ, mechaa njikọ ahụ, mgbe ahụ, dị ka ndị na-arụ ọrụ na-arụ ọrụ site na nkewa, malitere ịda.

Na-agbakwụnye AWK

Gịnị ka m mụtara: Adịla ụra mgbe a na-akụziri gị ihe ndị bụ́ isi. N'ezie, ọ dị onye edozila nsogbu gị laa azụ na 1980s.

Ruo ugbu a, ihe kpatara ọdịda m niile na Spark bụ mkpọmkpọ data dị na ụyọkọ ahụ. Ikekwe enwere ike imeziwanye ọnọdụ ahụ site na ọgwụgwọ mbụ. Ekpebiri m ịnwale ikewa data ederede raw ka ọ bụrụ kọlụm chromosomes, yabụ enwere m olile anya ịnye Spark data “ekesarala mbụ”.

Achọkwara m na StackOverflow maka otu esi ekewa site na ụkpụrụ kọlụm wee chọta azịza dị ukwuu dị otú ahụ. Site na AWK ị nwere ike kewaa faịlụ ederede site na ụkpụrụ kọlụm site na ide ya na edemede kama izipu nsonaazụ ya na stdout.

Edere m edemede Bash ka m nwalee ya. Budata otu n'ime ngwugwu TSV, wee bupụta ya site na iji gzip ma zigara ya awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ọ rụrụ ọrụ!

Na-ejuputa cores

Gịnị ka m mụtara: gnu parallel - ọ bụ ihe anwansi, onye ọ bụla kwesịrị iji ya.

Nkewa ahụ bụ nnọọ ngwa ngwa na mgbe m malitere htopiji lelee iji ihe atụ EC2 dị ike (ma dị oke ọnụ), ọ tụgharịrị na m na-eji naanị otu isi na ihe dịka 200 MB nke ebe nchekwa. Iji dozie nsogbu ahụ ma ghara ịla n'iyi nnukwu ego, anyị ga-achọpụta otú anyị ga-esi yie ọrụ ahụ. Ọ dabara nke ọma, n'ime akwụkwọ dị ịtụnanya kpamkpam Data Science na Command Line Achọtara m otu isiakwụkwọ Jeron Janssens gbasara myirịta. Site na ya ka m mụtara banyere ya gnu parallel, usoro mgbanwe dị ukwuu maka mmejuputa multithreading na Unix.

Nyochaa 25TB site na iji AWK na R
Mgbe m malitere nkewa site na iji usoro ọhụrụ ahụ, ihe niile dị mma, ma a ka nwere ihe mgbochi - nbudata S3 ihe na diski adịghị ngwa ngwa ma ọ bụghị nke zuru oke. Iji dozie nke a, emere m nke a:

  1. Achọpụtara m na ọ ga-ekwe omume iji mejuputa usoro nbudata S3 ozugbo na pipeline, na-ewepụ kpamkpam nchekwa etiti na diski. Nke a pụtara na m nwere ike izere ịde data raw na diski wee jiri ọbụna obere, ya mere dị ọnụ ala, nchekwa na AWS.
  2. Otu aws configure set default.s3.max_concurrent_requests 50 mụbara ọnụ ọgụgụ eri nke AWS CLI na-eji (na ndabara enwere 10).
  3. Agbanwere m na ihe atụ EC2 emebere maka ọsọ netwọkụ, yana mkpụrụedemede n n'aha. Achọpụtara m na mfu nke ike nhazi mgbe ị na-eji n-ihe atụ karịa ụgwọ ọrụ site na mmụba nke ngwa ngwa. Maka ọtụtụ ọrụ m na-eji c5n.4xl.
  4. Gbanwee gzip on pigz, Nke a bụ ngwá ọrụ gzip nke nwere ike ime ihe dị mma iji mee ka ọrụ mbụ na-enweghị isi nke imebi faịlụ (nke a nyeere obere aka).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

A na-ejikọta usoro ndị a na ibe ha iji mee ka ihe niile na-arụ ọrụ ngwa ngwa. Site n'ịbawanye ọsọ nbudata na iwepụ ihe odide diski, enwere m ike hazie ngwugwu terabyte 5 n'ime awa ole na ole.

Tweet a kwesịrị ịkpọrọ 'TSV'. Ewoo.

Iji data tụgharịrị ọhụrụ

Gịnị ka m mụtara: Spark na-amasị data enweghị mgbakọ ma ọ naghị amasị ijikọta nkebi.

Ugbu a data ahụ dị na S3 n'ime akpakọghị (gụọ: nkekọrịta) yana usoro edobere obere, enwere m ike ịlaghachi na Spark ọzọ. Ihe tụrụ m n'anya chere m: Emebighị m ọzọ imezu ihe m chọrọ! O siri ike ịgwa Spark kpọmkwem ka esi kewaa data ahụ. Na ọbụna mgbe m mere nke a, ọ tụgharịrị na e nwere ọtụtụ partitions (95 puku), na mgbe m na-eji coalesce belatara ọnụ ọgụgụ ha ruo oke ezi uche dị na ya, nke a mebiri nkewa m. Eji m n'aka na enwere ike idozi nke a, mana ka ụbọchị ole na ole nyochachara, enweghị m ike ịchọta azịza ya. Emechara m rụchaa ọrụ niile na Spark, n'agbanyeghị na ọ were obere oge na faịlụ Parquet kewara m adịchaghị obere (~ 200 KB). Agbanyeghị, data ahụ bụ ebe achọrọ ya.

Nyochaa 25TB site na iji AWK na R
Obere obere na enweghị isi, ọmarịcha!

Na-anwale ajụjụ Spark mpaghara

Gịnị ka m mụtara: Spark nwere oke ibu mgbe ọ na-edozi nsogbu ndị dị mfe.

Site na nbudata data na usoro amamihe, enwere m ike ịnwale ọsọ ahụ. Hazie script R ka ọ na-agba ọsọ nkesa Spark mpaghara, wee kwajuo fremu data Spark site na nchekwa otu Parquet akọwapụtara (bin). Agbalịrị m ịkwanye data niile mana enweghị m ike ịnweta Sparklyr ịmata nkewa ahụ.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Mgbu ahụ were 29,415 sekọnd. Ọ ka mma, mana ọ bụghị oke mma maka nyocha ọtụtụ ihe ọ bụla. Na mgbakwunye, enweghị m ike iji caching mee ihe ọsọ ọsọ n'ihi na mgbe m nwara ịchekwa etiti data na ebe nchekwa, Spark na-adakarị, ọbụlagodi mgbe m kenyere ihe karịrị 50 GB nke ebe nchekwa na dataset nke ruru ihe na-erughị 15.

Lọghachi na AWK

Gịnị ka m mụtara: Ngwakọta mmekọrịta na AWK na-arụ ọrụ nke ọma.

Aghọtara m na m nwere ike ime ọsọ ọsọ dị elu. Echetara m nke ahụ na ọmarịcha Nkuzi AWK sitere n'aka Bruce Barnett Agụrụ m maka ọmarịcha atụmatụ akpọrọ "ihe jikọrọ ọnụ" N'ụzọ bụ isi, ndị a bụ ụzọ abụọ bara uru, nke a na-akpọ maka ihe ụfọdụ dị iche na AWK, yabụ n'ụzọ ụfọdụ echeghị m ọtụtụ ihe gbasara ha. Roman Cheplyaka chetara na okwu a bụ "associative arrays" tọrọ nke ukwuu karịa okwu "igodo-uru ụzọ". Ọ bụrụgodị na ị lelee igodo-uru na Google Ngram, ị gaghị ahụ okwu a n'ebe ahụ, mana ị ga-ahụ ihe ndị na-akpakọrịta! Na mgbakwunye, a na-ejikọkarị “ụzọ-uru ụzọ” na ọdụ data, ya mere ọ na-enwe nghọta karịa iji ya tụnyere hashmap. Achọpụtara m na m nwere ike iji usoro mmekọ ndị a iji jikọta SNPs na tebụl biini na data raw na-ejighị Spark.

Iji mee nke a, na edemede AWK ejiri m ngọngọ BEGIN. Nke a bụ mpempe koodu nke a na-eme tupu e nyefee data nke mbụ na isi akụkụ nke edemede ahụ.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

otu while(getline...) kwajuru ahịrị niile sitere na otu CSV (bin), tọọ kọlụm nke mbụ (aha SNP) dị ka igodo maka nhazi mmekọrịta. bin na uru nke abụọ (otu) dị ka uru. Mgbe ahụ na ngọngọ { }, nke a na-eme na ahịrị niile nke faịlụ bụ isi, a na-eziga ahịrị ọ bụla na faịlụ mmepụta, nke na-enweta aha pụrụ iche dabere na otu ya (bin): ..._bin_"bin[$1]"_....

Ihe dịgasị iche iche batch_num и chunk_id dakọtara na data nyere site na pipeline, na-ezere ọnọdụ agbụrụ, na eri ogbugbu ọ bụla na-agba ọsọ parallel, dere na faịlụ ya pụrụ iche.

Ebe ọ bụ na m gbasasịa data raw niile n'ime nchekwa na chromosomes hapụrụ site na nnwale m gara aga na AWK, ugbu a enwere m ike dee edemede Bash ọzọ iji hazie otu chromosome n'otu oge wee ziga data kewara miri emi na S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Edemede ahụ nwere ngalaba abụọ parallel.

Na ngalaba nke mbụ, a na-agụ data site na faịlụ niile nwere ozi na chromosome chọrọ, mgbe ahụ, a na-ekesa data a n'ofe eri, nke na-ekesa faịlụ n'ime otu kwesịrị ekwesị (bin). Iji zere ọnọdụ agbụrụ mgbe ọtụtụ eri na-ede n'otu faịlụ ahụ, AWK na-enyefe aha faịlụ iji dee data n'ebe dị iche iche, dịka. chr_10_bin_52_batch_2_aa.csv. N'ihi ya, a na-emepụta ọtụtụ obere faịlụ na diski (maka nke a, m na-eji terabyte EBS mpịakọta).

Nbufe sitere na ngalaba nke abụọ parallel na-agafe otu (bin) wee jikọta faịlụ ha n'otu n'otu na CSV c catwee ziga ha maka mbupụ.

Mgbasa ozi na R?

Gịnị ka m mụtara: Ị nwere ike ịkpọtụrụ stdin и stdout site na edemede R, ya mere jiri ya na pipeline.

O nwere ike ịbụ na ị hụla ahịrị a na edemede Bash gị: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ọ na-atụgharị faịlụ otu niile jikọtara ọnụ n'ime edemede R dị n'okpuru. {} bụ usoro pụrụ iche parallel, nke na-etinye data ọ bụla ọ na-eziga na iyi akọwapụtara ozugbo n'ime iwu ahụ n'onwe ya. Nhọrọ {#} na-enye ID eri pụrụ iche, na {%} na-anọchi anya nọmba oghere ọrụ (ugboro ugboro, mana ọ bụghị n'otu oge). Enwere ike ịchọta ndepụta nhọrọ niile na akwụkwọ.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Mgbe mgbanwe file("stdin") bufere ya readr::read_csv, A na-etinye data a sụgharịrị n'ime edemede R n'ime etiti, nke dị na ụdị .rds- iji faịlụ aws.s3 edere ya ozugbo na S3.

RDS bụ ihe dị ka ụdị obere Parquet, na-enweghị frills nke nchekwa okwu.

Mgbe m mechara script Bash enwetara m ngwugwu .rds-faịlụ dị na S3, nke nyere m ohere iji mkpakọ nke ọma na ụdị arụnyere n'ime ya.

N'agbanyeghị iji breeki R, ihe niile na-arụ ọrụ ngwa ngwa. Ọ bụghị ihe ijuanya na akụkụ R nke na-agụ na ide data na-akacha mma nke ukwuu. Mgbe anwalechara otu chromosome nke nwere ọkara, ọrụ a rụchara na ihe atụ C5n.4xl n'ihe dị ka awa abụọ.

Oke S3

Gịnị ka m mụtara: Ekele maka mmejuputa ụzọ smart, S3 nwere ike ijikwa ọtụtụ faịlụ.

Enwere m nchegbu ma S3 ọ ga-enwe ike ijikwa ọtụtụ faịlụ ndị e bufere na ya. Enwere m ike ime ka aha faịlụ ahụ nwee uche, mana kedu ka S3 ga-esi chọọ ha?

Nyochaa 25TB site na iji AWK na R
Mpempe akwụkwọ dị na S3 bụ naanị ihe ngosi, n'ezie sistemụ enweghị mmasị na akara ahụ /. Site na ibe S3 FAQ.

Ọ dị ka S3 na-anọchi anya ụzọ gaa na otu faịlụ dị ka igodo dị mfe n'ụdị tebụl hash ma ọ bụ nchekwa data dabere na akwụkwọ. Enwere ike iche na ịwụ dị ka tebụl, a pụkwara iwere faịlụ dị ka ndekọ na tebụl ahụ.

Ebe ọ bụ na ọsọ na arụmọrụ dị mkpa iji nweta uru na Amazon, ọ bụghị ihe ijuanya na usoro igodo-dị ka faịlụ-ụzọ a na-eme ka ọ dịkwuo mma. Agbalịrị m ịchọta nguzozi: ka m wee ghara ịrịọ ọtụtụ arịrịọ, mana na e gburu arịrịọ ndị ahụ ngwa ngwa. Ọ tụgharịrị na ọ kacha mma ịme ihe dị ka puku faịlụ 20. Echere m na ọ bụrụ na anyị na-aga n'ihu na-ebuli elu, anyị nwere ike nweta mmụba na ọsọ ọsọ (dịka ọmụmaatụ, ịme bọket pụrụ iche naanị maka data, si otú a na-ebelata nha nke tebụl nyocha). Ma ọ dịghị oge ma ọ bụ ego maka nyocha ọzọ.

Kedu maka ndakọrịta cross?

Ihe M Mụtara: Ihe mbụ na-akpata igbu oge bụ ịkwalite usoro nchekwa gị n'oge.

N'oge a, ọ dị ezigbo mkpa ịjụ onwe gị: "Gịnị kpatara iji usoro faịlụ nke nwe ya?" Ihe kpatara ya dabere na nbudata ọsọ (faịlụ CSV gzipped were oge 7 ibu ibu) yana ndakọrịta na usoro ọrụ anyị. Enwere m ike ịtụgharị uche ma ọ bụrụ R nwere ike ibunye faịlụ Parquet (ma ọ bụ Arrow) ngwa ngwa na-enweghị ibu Spark. Onye ọ bụla nọ na ụlọ nyocha anyị na-eji R, ma ọ bụrụ na achọrọ m ịgbanwe data ahụ na usoro ọzọ, m ka nwere data ederede mbụ, yabụ enwere m ike ịgbanye pipeline ọzọ.

Nkewa nke ọrụ

Gịnị ka m mụtara: Agbalịla iji aka gị kwalite ọrụ, ka kọmputa mee ya.

Emebiala m usoro ọrụ na otu chromosome, ugbu a achọrọ m ịhazi data ndị ọzọ niile.
Achọrọ m iwelite ọtụtụ oge EC2 maka ntụgharị, mana n'otu oge ahụ, m na-atụ egwu ịnweta ibu na-enweghị oke n'ofe ọrụ nhazi dị iche iche (dị ka Spark na-ata ahụhụ site na akụkụ na-enweghị isi). Na mgbakwunye, enweghị m mmasị n'ịwelite otu ihe atụ kwa chromosome, n'ihi na maka akaụntụ AWS enwere njedebe ndabara nke oge 10.

Mgbe ahụ, ekpebiri m ide edemede na R iji kwalite ọrụ nhazi.

Nke mbụ, ajụrụ m S3 ka ọ gbakọọ ebe nchekwa ole chromosome ọ bụla nwere.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

M wee dee ọrụ na-ewe mkpokọta nha, na-emegharị usoro nke chromosomes, kewaa ha n'ime otu. num_jobs ma gwa gị otú nha nke ọrụ nhazi niile si dị iche.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Mgbe ahụ, m na-agba ọsọ site na otu puku shuffles na-eji purrr wee họrọ nke kacha mma.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

N'ihi ya, emechara m ọtụtụ ọrụ ndị yiri nnọọ nha. Mgbe ahụ, ihe fọdụrụ bụ ka m kechie script Bash gara aga na nnukwu loop for. Nkwalite a were ihe dị ka nkeji iri iji dee. Na nke a dị obere karịa ka m ga-eji aka na-emepụta ọrụ ma ọ bụrụ na ha enweghị nha nha. Ya mere, echere m na m ziri ezi na nkwalite mbido a.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na njedebe, m na-agbakwunye iwu nkwụsị:

sudo shutdown -h now

... na ihe niile na-arụ ọrụ! Iji AWS CLI, ewelitere m ikpe site na iji nhọrọ user_data nyere ha Bash script nke ọrụ ha maka nhazi. Ha gbara ọsọ ma mechie ozugbo, yabụ anaghị m akwụ ụgwọ maka ike nhazi ọzọ.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Ka anyị kwakọba!

Gịnị ka m mụtara: API kwesịrị ịdị mfe maka ịdị mfe na mgbanwe nke ojiji.

N'ikpeazụ enwetara m data na ebe kwesịrị ekwesị na ụdị. Naanị ihe fọdụrụ bụ ime ka usoro nke iji data dị mfe dị ka o kwere mee iji mee ka ọ dịrị ndị ọrụ ibe m mfe. Achọrọ m ịme API dị mfe maka ịmepụta arịrịọ. Ọ bụrụ na n'ọdịnihu m kpebiri ịgbanwe si .rds na faịlụ Parquet, mgbe ahụ nke a kwesịrị ịbụ nsogbu nye m, ọ bụghị maka ndị ọrụ ibe m. Maka nke a, ekpebiri m ime ngwugwu R n'ime.

Wulite ma detuo ngwugwu dị mfe nke nwere naanị ọrụ ịnweta data ole na ole ahaziri gburugburu otu ọrụ get_snp. Emekwara m weebụsaịtị maka ndị ọrụ ibe m pkgdown, ka ha wee nwee ike ịhụ ihe atụ na akwụkwọ ngwa ngwa.

Nyochaa 25TB site na iji AWK na R

Smart caching

Gịnị ka m mụtara: Ọ bụrụ na akwadoro data gị nke ọma, caching ga-adị mfe!

Ebe ọ bụ na otu n'ime isi ọrụ na-arụ ọrụ na-etinye otu ụdị nyocha ahụ na ngwugwu SNP, ekpebiri m iji binning maka uru m. Mgbe ị na-ebufe data site na SNP, a na-ejikọta ozi niile sitere na otu (bin) na ihe eweghachiri. Ya bụ, ajụjụ ochie nwere ike (na tiori) mee ngwa ngwa nhazi nke ajụjụ ọhụrụ.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Mgbe m na-ewu ngwugwu ahụ, agbaara m ọtụtụ akara iji tụnyere ọsọ mgbe ị na-eji ụzọ dị iche iche. Ana m akwado ka ị ghara ileghara nke a anya, n'ihi na mgbe ụfọdụ, nsonaazụ na-atụghị anya ya. Ọmụmaatụ, dplyr::filter dị ngwa ngwa karịa iwere ahịrị site na iji nzacha dabere na indexing, na iweghachite otu kọlụm site na etiti data enyora dị ngwa ngwa karịa iji indexing syntax.

Biko mara na ihe ahụ prev_snp_results nwere igodo snps_in_bin. Nke a bụ usoro SNP pụrụ iche n'ime otu (bin), na-enye gị ohere ịlele ngwa ngwa ma ị nwetala data sitere na ajụjụ gara aga. Ọ na-emekwa ka ọ dị mfe iji koodu a wee banye SNP niile n'otu (bin)

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

Ugbu a, anyị nwere ike (ma malitela nke ọma) na-agba ọsọ ụdị na ọnọdụ ndị anyị na-agaghị enweta na mbụ. Ihe kacha mma bụ na ndị ọrụ ibe m na ụlọ nyocha enweghị ike iche maka nsogbu ọ bụla. Ha nwere naanị ọrụ na-arụ ọrụ.

Ma ọ bụ ezie na ngwugwu ahụ na-echekwa ha nkọwa, agbalịrị m ime ka usoro data dị mfe nke na ha nwere ike ịchọpụta ma ọ bụrụ na m na-apụ na mberede echi ...

Ọsọ abawanyela nke ọma. Anyị na-enyochakarị iberibe genome dị mkpa na-arụ ọrụ. Na mbụ, anyị enweghị ike ime nke a (ọ tụgharịrị dị oke ọnụ), ma ugbu a, ekele maka otu (bin) nhazi na caching, arịrịọ maka otu SNP na-ewe ihe na-erughị 0,1 sekọnd, na ojiji data dị otú ahụ. dị ala na ọnụ ahịa S3 bụ ahụekere.

nkwubi

Akụkọ a abụghị ntụzịaka ma ọlị. Ngwọta ahụ tụgharịrị bụrụ onye ọ bụla, ọ fọrọ nke nta ka ọ bụrụ na ọ bụghị ezigbo. Kama, ọ bụ akwụkwọ akụkọ njem. Achọrọ m ka ndị ọzọ ghọta na mkpebi ndị dị otú ahụ adịghị apụta n'ụzọ zuru ezu n'isi, ha bụ nsonaazụ nke ikpe na njehie. Ọzọkwa, ọ bụrụ na ị na-achọ onye ọkà mmụta sayensị data, buru n'uche na iji ngwá ọrụ ndị a na-achọ ahụmahụ nke ọma, na ahụmahụ na-efu ego. Obi dị m ụtọ na m nwere ego iji kwụọ ụgwọ, mana ọtụtụ ndị ọzọ nwere ike ịrụ otu ọrụ karịa m agaghị enwe ohere n'ihi enweghị ego ọbụna ịnwale.

Ngwá ọrụ data buru ibu na-emekọrịta ihe. Ọ bụrụ na ị nwere oge, ị nwere ike dee ngwa ngwa ngwa ngwa site na iji smart data ihicha, nchekwa na usoro mmịpụta. N'ikpeazụ ọ na-agbadata na nyocha uru-erite uru.

Ihe m mụtara:

  • ọ dịghị ụzọ dị ọnụ ala iji kpachapụta TB 25 n'otu oge;
  • kpachara anya na nha faịlụ Parquet na nzukọ ha;
  • Nkebi dị na Spark ga-edozirịrị;
  • N'ozuzu, ọ dịghị mgbe ị na-agbalị ime ka 2,5 nde partitions;
  • Ịhazi ka siri ike, dị ka ịtọlite ​​Spark;
  • mgbe ụfọdụ data pụrụ iche chọrọ ngwọta pụrụ iche;
  • Mkpokọta ọkụ na-adị ngwa ngwa, mana nkewa ka dị oke ọnụ;
  • adala ụra mgbe ha na-akụziri gị ihe ndị bụ isi, ọ ga-abụ na mmadụ edozila nsogbu gị laa azụ na 1980s;
  • gnu parallel - nke a bụ ihe anwansi, onye ọ bụla kwesịrị iji ya;
  • Spark na-enwe mmasị na data enweghị mgbagwoju anya ma ọ dịghị amasị ijikọta nkebi;
  • Spark nwere oke ibu mgbe ọ na-edozi nsogbu ndị dị mfe;
  • Usoro mmekọ nke AWK na-arụ ọrụ nke ọma;
  • ị nwere ike ịkpọtụrụ stdin и stdout site na edemede R, ya mere jiri ya na pipeline;
  • Ekele maka mmejuputa ụzọ smart, S3 nwere ike hazie ọtụtụ faịlụ;
  • Isi ihe mere ị ga-eji egbu oge bụ ịkwalite usoro nchekwa gị n'oge;
  • anwala iji aka kwalite ọrụ, ka kọmputa mee ya;
  • API kwesịrị ịdị mfe n'ihi ịdị mfe na mgbanwe nke ojiji;
  • Ọ bụrụ na akwadoro data gị nke ọma, caching ga-adị mfe!

isi: www.habr.com

Tinye a comment