Pag-parse sa 25TB gamit ang AWK ug R

Pag-parse sa 25TB gamit ang AWK ug R
Giunsa pagbasa kini nga artikulo: Nangayo kog pasaylo kay taas kaayo ug gubot ang text. Aron makadaginot ka og panahon, sugdan nako ang matag kapitulo sa usa ka "Unsa ang Akong Nakat-unan" nga pasiuna, nga nag-summarize sa esensya sa kapitulo sa usa o duha ka sentence.

“Ipakita lang kanako ang solusyon!” Kung gusto lang nimo nga makita kung diin ako gikan, unya laktawan ang kapitulo nga "Nahimong Dugang nga Imbentibo," apan sa akong hunahuna mas makapaikag ug mapuslanon ang pagbasa bahin sa kapakyasan.

Bag-o lang ako gitahasan sa pag-set up sa usa ka proseso alang sa pagproseso sa usa ka dako nga gidaghanon sa hilaw nga mga sequence sa DNA (sa teknikal nga usa ka SNP chip). Ang panginahanglan mao ang dali nga pagkuha sa datos bahin sa usa ka gihatag nga genetic nga lokasyon (gitawag nga SNP) alang sa sunod nga pagmodelo ug uban pang mga buluhaton. Gamit ang R ug AWK, nakahimo ako sa paglimpyo ug pag-organisar sa mga datos sa natural nga paagi, nga makapadali sa pagproseso sa pangutana. Dili kini sayon ​​alang kanako ug nagkinahanglan og daghang mga pag-uli. Kini nga artikulo makatabang kanimo sa paglikay sa pipila sa akong mga kasaypanan ug ipakita kanimo kung unsa ang akong nahimo.

Una, pipila ka pasiuna nga mga katin-awan.

data

Ang among sentro sa pagproseso sa genetic nga impormasyon sa unibersidad naghatag kanamo og datos sa porma sa usa ka 25 TB TSV. Gidawat nako kini nga gibahin sa 5 nga mga pakete, gi-compress sa Gzip, ang matag usa adunay mga 240 nga upat ka gigabyte nga mga file. Ang matag laray adunay datos alang sa usa ka SNP gikan sa usa ka indibidwal. Sa kinatibuk-an, ang datos sa ~2,5 milyon nga mga SNP ug ~60 ka libo nga mga tawo ang gipasa. Dugang sa impormasyon sa SNP, ang mga file adunay daghang mga kolum nga adunay mga numero nga nagpakita sa lainlaing mga kinaiya, sama sa intensity sa pagbasa, frequency sa lainlaing mga alel, ug uban pa. Sa kinatibuk-an adunay mga 30 ka kolum nga adunay talagsaon nga mga kantidad.

Tumong

Sama sa bisan unsang proyekto sa pagdumala sa datos, ang labing hinungdanon nga butang mao ang pagtino kung giunsa gamiton ang datos. Niini nga kaso kami kasagarang mopili ug mga modelo ug mga workflow para sa SNP base sa SNP. Sa ato pa, magkinahanglan lang kami og data sa usa ka SNP matag higayon. Kinahanglan kong makat-on unsaon pagkuha sa tanang mga rekord nga nalangkit sa usa sa 2,5 ka milyon nga mga SNP sa sayon, dali ug barato kutob sa mahimo.

Unsaon nga dili kini buhaton

Sa pagkutlo sa usa ka angay nga cliché:

Wala ko mapakyas sa usa ka libo ka beses, nadiskobrehan lang nako ang usa ka libo nga mga paagi aron malikayan ang pag-parse sa usa ka hugpong sa datos sa usa ka format nga mahigalaon sa pangutana.

Unang pagsulay

Unsay akong nakat-onan: Walay barato nga paagi sa pag-parse sa 25 TB sa usa ka higayon.

Pagkuha sa kurso nga "Advanced nga mga Pamaagi alang sa Daghang Pagproseso sa Data" sa Vanderbilt University, sigurado ako nga ang lansis naa sa bag. Lagmit mokabat ug usa o duha ka oras aron ma-set up ang Hive server nga modagan sa tanang datos ug i-report ang resulta. Tungod kay ang among datos gitipigan sa AWS S3, gigamit nako ang serbisyo Athena, nga nagtugot kanimo sa paggamit sa Hive SQL nga mga pangutana sa S3 nga datos. Dili nimo kinahanglan nga mag-set up / magpataas sa usa ka cluster sa Hive, ug magbayad ka usab alang sa datos nga imong gipangita.

Pagkahuman nako gipakita kang Athena ang akong datos ug ang pormat niini, nagdagan ako pipila ka mga pagsulay nga adunay mga pangutana nga sama niini:

select * from intensityData limit 10;

Ug dali nga nakadawat maayo nga pagkahan-ay nga mga sangputanan. Andam.

Hangtud nga gisulayan namon nga gamiton ang datos sa among trabaho ...

Gihangyo ko nga kuhaon ang tanang impormasyon sa SNP aron masulayan ang modelo. Gidagan nako ang pangutana:


select * from intensityData 
where snp = 'rs123456';

...ug nagsugod sa paghulat. Pagkahuman sa walo ka minuto ug labaw pa sa 4 TB nga gihangyo nga datos, nadawat nako ang resulta. Gisingil ni Athena pinaagi sa gidaghanon sa datos nga nakit-an, $5 matag terabyte. Busa kining usa ka hangyo nagkantidad ug $20 ug walo ka minuto nga paghulat. Aron mapadagan ang modelo sa tanan nga datos, kinahanglan namon nga maghulat 38 ka tuig ug magbayad $ 50 milyon. Dayag, dili kini angay alang kanamo.

Kinahanglan nga gamiton ang Parquet ...

Unsay akong nakat-onan: Pag-amping sa gidak-on sa imong Parquet files ug sa ilang organisasyon.

Una nakong gisulayan ang pag-ayo sa sitwasyon pinaagi sa pag-convert sa tanang TSV ngadto sa Parquet nga mga file. Kombenyente sila alang sa pagtrabaho sa dagkong mga set sa datos tungod kay ang impormasyon niini gitipigan sa kolumnar nga porma: ang matag kolum anaa sa kaugalingong memorya/disk nga bahin, sukwahi sa mga text file, diin ang mga laray adunay mga elemento sa matag kolum. Ug kung kinahanglan nimo pangitaon ang usa ka butang, basaha lang ang gikinahanglan nga kolum. Dugang pa, ang matag file nagtipig usa ka lainlaing kantidad sa usa ka kolum, busa kung ang kantidad nga imong gipangita wala sa hanay sa kolum, ang Spark dili mag-usik sa oras sa pag-scan sa tibuuk nga file.

Nagdagan ko og usa ka yano nga buluhaton AWS Pandikit aron mabag-o ang among mga TSV sa Parquet ug ihulog ang mga bag-ong file sa Athena. Nagkinahanglan kini og mga 5 ka oras. Apan sa dihang akong gipadagan ang hangyo, nagkinahanglag parehas nga gidugayon sa oras ug gamay ra nga kuwarta aron makompleto. Ang tinuod mao nga ang Spark, nga naningkamot nga ma-optimize ang buluhaton, nag-unpack lang sa usa ka TSV chunk ug gibutang kini sa kaugalingon nga Parquet chunk. Ug tungod kay ang matag tipik igo ra nga gisudlan sa tibuuk nga mga rekord sa daghang mga tawo, ang matag file naglangkob sa tanan nga mga SNP, mao nga kinahanglan ablihan ni Spark ang tanan nga mga file aron makuha ang kasayuran nga kinahanglan niini.

Makapainteres, ang default (ug girekomenda) nga tipo sa compression sa Parquet, snappy, dili mabahin. Busa, ang matag tigpatuman natanggong sa tahas sa pag-unpack ug pag-download sa tibuok 3,5 GB nga dataset.

Pag-parse sa 25TB gamit ang AWK ug R

Atong sabton ang problema

Unsay akong nakat-onan: Lisud ang paghan-ay, labi na kung ang datos giapod-apod.

Para nako karon nasabtan na nako ang esensya sa problema. Kinahanglan lang nako nga ihan-ay ang datos pinaagi sa kolum sa SNP, dili sa mga tawo. Dayon daghang mga SNP ang tipigan sa usa ka bulag nga tipik sa datos, ug dayon ang "smart" nga function sa Parquet nga "abli lamang kung ang bili anaa sa range" magpakita sa iyang kaugalingon sa tanan nga himaya niini. Ikasubo, ang paghan-ay sa binilyon nga mga laray nga nagkatag sa usa ka cluster napamatud-an nga usa ka lisud nga buluhaton.

Dili gyud gusto sa AWS nga mag-isyu og refund tungod sa "I'm a distracted student" rason. Pagkahuman nako pagdagan sa pagsunud sa Amazon Glue, midagan kini sulod sa 2 ka adlaw ug nahagsa.

Unsa ang bahin sa partitioning?

Unsay akong nakat-onan: Ang mga partisyon sa Spark kinahanglang balanse.

Unya nakahunahuna ko sa pagbahin sa datos sa mga chromosome. Adunay 23 niini (ug daghan pa kung imong tagdon ang mitochondrial DNA ug wala ma-map nga mga rehiyon).
Kini magtugot kanimo sa pagbahin sa datos ngadto sa gagmay nga mga tipik. Kung magdugang ka ug usa lang ka linya sa Spark export function sa Glue script partition_by = "chr", unya ang datos kinahanglang bahinon ngadto sa mga balde.

Pag-parse sa 25TB gamit ang AWK ug R
Ang genome naglangkob sa daghang mga tipik nga gitawag og chromosome.

Ikasubo, wala kini molihok. Ang mga chromosome adunay lain-laing mga gidak-on, nga nagpasabot sa lain-laing mga kantidad sa impormasyon. Nagpasabot kini nga ang mga buluhaton nga gipadala ni Spark sa mga trabahante dili balanse ug hinay nga nahuman tungod kay ang pipila ka mga node nahuman nga sayo ug wala’y trabaho. Apan, ang mga buluhaton nahuman. Apan sa dihang nangayo ug usa ka SNP, ang imbalance nagpahinabo na usab ug mga problema. Ang gasto sa pagproseso sa mga SNP sa mas dagkong mga chromosome (nga mao, kung diin gusto namon makakuha mga datos) mikunhod lang sa hapit usa ka hinungdan nga 10. Daghan, apan dili igo.

Unsa kaha kon atong bahinon kini ngadto sa mas gagmayng mga bahin?

Unsay akong nakat-onan: Ayaw gayud pagsulay sa pagbuhat sa 2,5 ka milyon nga partisyon sa tanan.

Nakahukom ko nga buhaton ang tanan ug gibahin ang matag SNP. Gisiguro niini nga ang mga partisyon parehas ang gidak-on. MAAYO KINI NGA IDEYA. Gigamit nako ang Glue ug gidugang ang usa ka inosente nga linya partition_by = 'snp'. Ang buluhaton nagsugod ug nagsugod sa pagpatuman. Paglabay sa usa ka adlaw akong gisusi ug nakita nga wala gihapoy nasulat sa S3, mao nga akong gipamatay ang buluhaton. Morag gisulat ni Glue ang mga intermediate nga mga file sa usa ka tinago nga lokasyon sa S3, daghang mga file, tingali usa ka milyon. Isip resulta, ang akong sayop nagkantidad ug kapin sa usa ka libo ka dolyares ug wala makapahimuot sa akong mentor.

Pagbahin + paghan-ay

Unsay akong nakat-onan: Lisod gihapon ang paghan-ay, sama sa pag-tune sa Spark.

Ang akong kataposang pagsulay sa pagbahin naglakip nako sa pagbahinbahin sa mga chromosome ug dayon paghan-ay sa matag partisyon. Sa teorya, kini makapadali sa matag pangutana tungod kay ang gitinguha nga datos sa SNP kinahanglan nga naa sa sulod sa pipila ka mga parquet chunks sulod sa gihatag nga range. Ikasubo, ang paghan-ay bisan ang mga partitioned data nahimo nga usa ka lisud nga buluhaton. Isip resulta, mibalhin ko sa EMR para sa usa ka custom cluster ug migamit ug walo ka gamhanang instance (C5.4xl) ug Sparklyr para makamugna ug mas flexible workflow...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...apan, wala gihapon nahuman ang buluhaton. Gi-configure nako kini sa lainlaing mga paagi: gipadako ang alokasyon sa panumduman alang sa matag tigpatuman sa pangutana, gigamit ang mga node nga adunay daghang memorya, gigamit ang mga variable sa broadcast (mga variable sa broadcasting), apan sa matag higayon nga kini nahimo nga tunga nga mga lakang, ug hinayhinay nga nagsugod ang mga tigpatuman. nga mapakyas hangtud nga ang tanan mihunong.

Mas mamugnaon ko

Unsay akong nakat-onan: Usahay ang espesyal nga datos nanginahanglan espesyal nga solusyon.

Ang matag SNP adunay bili sa posisyon. Kini usa ka numero nga katumbas sa gidaghanon sa mga base sa chromosome niini. Kini usa ka maayo ug natural nga paagi sa pag-organisar sa among datos. Sa sinugdan gusto kong magbahin sa mga rehiyon sa matag chromosome. Pananglitan, ang mga posisyon 1 - 2000, 2001 - 4000, ug uban pa. Apan ang problema mao nga ang mga SNP dili parehas nga giapod-apod sa mga chromosome, busa ang mga gidak-on sa grupo magkalainlain kaayo.

Pag-parse sa 25TB gamit ang AWK ug R

Ingon usa ka sangputanan, nakaabut ako sa pagkahugno sa mga posisyon sa mga kategorya (ranggo). Gamit ang na-download na nga datos, nagpadagan ko og hangyo aron makakuha og lista sa talagsaong mga SNP, ang ilang mga posisyon ug mga chromosome. Dayon akong gihan-ay ang datos sulod sa matag chromosome ug gikolekta ang mga SNP ngadto sa mga grupo (bin) sa gihatag nga gidak-on. Ingnon ta nga 1000 ka SNP matag usa. Kini naghatag kanako sa SNP-sa-grupo-per-chromosome nga relasyon.

Sa katapusan, naghimo ako og mga grupo (bin) sa 75 ka mga SNP, ang hinungdan ipasabut sa ubos.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Unang pagsulay sa Spark

Unsay akong nakat-onan: Ang spark aggregation paspas, pero ang partitioning mahal gihapon.

Gusto nakong basahon kining gamay nga (2,5 ka milyon nga mga laray) nga data frame ngadto sa Spark, isagol kini sa hilaw nga datos, ug dayon bahinon kini sa bag-ong gidugang nga kolum. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

akong gigamit sdf_broadcast(), mao nga nahibal-an ni Spark nga kinahanglan ipadala ang data frame sa tanan nga mga node. Kini mapuslanon kung ang datos gamay ra ang gidak-on ug gikinahanglan alang sa tanan nga mga buluhaton. Kung dili, ang Spark naningkamot nga mahimong maalamon ug nag-apod-apod sa datos kung gikinahanglan, nga mahimong hinungdan sa paghinay.

Ug pag-usab, ang akong ideya wala molihok: ang mga buluhaton nagtrabaho sulod sa pipila ka panahon, nahuman ang unyon, ug dayon, sama sa mga tigpatuman nga gilunsad pinaagi sa partitioning, nagsugod sa pagkapakyas.

Pagdugang sa AWK

Unsay akong nakat-onan: Ayaw pagkatulog kung gitudloan ka sa mga sukaranan. Sigurado nga adunay nakasulbad sa imong problema kaniadtong 1980s.

Hangtud niini nga punto, ang hinungdan sa tanan nakong mga kapakyasan sa Spark mao ang paghugpong sa datos sa cluster. Tingali ang sitwasyon mahimong mapauswag sa pre-treatment. Nakahukom ko nga sulayan ang pagbahin sa hilaw nga datos sa teksto ngadto sa mga kolum sa mga chromosome, mao nga naglaum ako nga mahatagan ang Spark sa "pre-partitioned" nga datos.

Gipangita nako ang StackOverflow kung giunsa ang pagbahin sa mga kantidad sa kolum ug nakit-an nindot kaayo nga tubag. Uban sa AWK mahimo nimong bahinon ang usa ka text file pinaagi sa mga kantidad sa kolum pinaagi sa pagsulat niini sa usa ka script kaysa ipadala ang mga resulta sa stdout.

Nagsulat ako usa ka script sa Bash aron sulayan kini. Gi-download ang usa sa mga naka-pack nga TSV, dayon gi-unpack kini gamit gzip ug gipadala sa awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Nagtrabaho kini!

Pagpuno sa mga cores

Unsay akong nakat-onan: gnu parallel - kini usa ka mahika nga butang, ang tanan kinahanglan nga mogamit niini.

Medyo hinay ang panagbulag ug sa dihang nagsugod ko htoparon masusi ang paggamit sa usa ka gamhanan (ug mahal) nga pananglitan sa EC2, nahimo nga usa lang ka core ug mga 200 MB nga memorya ang akong gigamit. Aron masulbad ang problema ug dili mawad-an og daghang salapi, kinahanglan namon nga mahibal-an kung giunsa ang pagpahiangay sa trabaho. Maayo na lang, sa usa ka talagsaon nga libro Data Science sa Command Line Nakakita ko og chapter ni Jeron Janssens bahin sa parallelization. Gikan niini akong nakat-unan gnu parallel, usa ka flexible kaayo nga pamaagi sa pagpatuman sa multithreading sa Unix.

Pag-parse sa 25TB gamit ang AWK ug R
Sa dihang gisugdan nako ang partitioning gamit ang bag-ong proseso, maayo ra ang tanan, apan aduna gihapoy bottleneck - ang pag-download sa mga butang sa S3 sa disk dili kaayo paspas ug dili hingpit nga parallelized. Aron ayuhon kini, gibuhat nako kini:

  1. Nahibal-an nako nga posible nga ipatuman ang yugto sa pag-download sa S3 direkta sa pipeline, nga hingpit nga giwagtang ang intermediate nga pagtipig sa disk. Nagpasabut kini nga malikayan nako ang pagsulat sa hilaw nga datos sa disk ug magamit ang labi ka gamay, ug busa mas barato, pagtipig sa AWS.
  2. team aws configure set default.s3.max_concurrent_requests 50 nadugangan pag-ayo ang gidaghanon sa mga hilo nga gigamit sa AWS CLI (sa default adunay 10).
  3. Mibalhin ko sa usa ka pananglitan sa EC2 nga na-optimize alang sa katulin sa network, nga adunay letra nga n sa ngalan. Akong nahibal-an nga ang pagkawala sa gahum sa pagproseso kung gigamit ang n-instances labaw pa sa bayad sa pagtaas sa katulin sa pagkarga. Alang sa kadaghanan sa mga buluhaton gigamit nako ang c5n.4xl.
  4. Nausab gzip sa pigz, kini usa ka himan nga gzip nga makahimo sa mga cool nga butang aron maparehas ang una nga dili parehas nga buluhaton sa pag-decompress sa mga file (kini nakatabang sa labing gamay).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Kini nga mga lakang gihiusa sa usag usa aron mahimo nga dali nga molihok ang tanan. Pinaagi sa pagdugang sa mga katulin sa pag-download ug pagwagtang sa mga pagsulat sa disk, mahimo na nako nga maproseso ang usa ka 5 terabyte nga pakete sa pipila lang ka oras.

Kini nga tweet kinahanglan nga naghisgot sa 'TSV'. Alaut.

Gamit ang bag-ong na-parse nga datos

Unsay akong nakat-onan: Ang Spark ganahan sa wala ma-compress nga datos ug dili ganahan sa paghiusa sa mga partisyon.

Karon ang datos naa sa S3 sa usa ka unpacked (basaha: gipaambit) ug semi-ordered nga format, ug makabalik ko sa Spark pag-usab. Usa ka sorpresa ang naghulat kanako: Napakyas na usab ako sa pagkab-ot sa akong gusto! Lisud kaayo ang pagsulti sa Spark kung giunsa ang pagkabahinbahin sa datos. Ug bisan kung gibuhat nako kini, nahimo nga daghang mga partisyon (95 ka libo), ug kung gigamit nako coalesce gipakunhod ang ilang gidaghanon ngadto sa makatarunganon nga mga limitasyon, kini nakaguba sa akong pagbahin. Sigurado ako nga mahimo kini nga ayohon, apan pagkahuman sa pila ka adlaw nga pagpangita wala ako makit-an nga solusyon. Sa katapusan nahuman nako ang tanan nga mga buluhaton sa Spark, bisan kung nagdugay ug ang akong split Parquet files dili kaayo gamay (~ 200 KB). Bisan pa, ang datos kung diin kini gikinahanglan.

Pag-parse sa 25TB gamit ang AWK ug R
Gamay ra kaayo ug dili patas, talagsaon!

Pagsulay sa lokal nga mga pangutana sa Spark

Unsay akong nakat-onan: Ang Spark adunay sobra nga overhead kung nagsulbad sa yano nga mga problema.

Pinaagi sa pag-download sa datos sa usa ka maalamon nga pormat, akong nasulayan ang katulin. Pagbutang ug R script para magpadagan ug lokal nga Spark server, ug dayon i-load ang Spark data frame gikan sa gipiho nga Parquet group storage (bin). Gisulayan nako nga i-load ang tanan nga datos apan dili makuha sa Sparklyr nga mailhan ang pagbahin.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Ang pagpatay mikabat og 29,415 segundos. Labing maayo, apan dili kaayo maayo alang sa mass testing sa bisan unsa. Dugang pa, dili nako mapadali ang mga butang sa pag-cache tungod kay sa dihang gisulayan nako ang pag-cache sa usa ka frame sa datos sa memorya, ang Spark kanunay nga nag-crash, bisan kung gigahin nako ang labaw sa 50 GB nga memorya sa usa ka dataset nga wala’y gibug-aton nga 15.

Balik sa AWK

Unsay akong nakat-onan: Ang mga associative arrays sa AWK episyente kaayo.

Nakaamgo ko nga makab-ot nako ang mas taas nga tulin. Nahinumdom ko niana sa usa ka talagsaon AWK tutorial ni Bruce Barnett Nakabasa ko bahin sa usa ka cool nga bahin nga gitawag og "associative arrays" Sa tinuud, kini ang mga pares nga hinungdanon nga kantidad, nga sa usa ka hinungdan gitawag nga lahi sa AWK, ug busa wala ako maghunahuna bahin kanila. Roman Cheplyaka nahinumdom nga ang termino nga "associative arrays" mas tigulang kay sa termino nga "key-value pair". Bisan kung ikaw pangitaa ang key-value sa Google Ngram, dili nimo makita kini nga termino didto, apan makit-an nimo ang mga kauban nga arrays! Dugang pa, ang "key-value pair" kasagarang nalangkit sa mga database, mao nga mas makataronganon ang pagtandi niini sa hashmap. Nakaamgo ko nga magamit nako kini nga mga associative arrays aron i-associate ang akong mga SNP sa usa ka lamesa sa bin ug hilaw nga datos nga wala gigamit ang Spark.

Aron mahimo kini, sa AWK script akong gigamit ang block BEGIN. Kini usa ka piraso sa code nga gipatuman sa wala pa ang una nga linya sa datos ipasa sa punoan nga lawas sa script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

team while(getline...) gikarga ang tanang mga laray gikan sa grupo sa CSV (bin), ibutang ang unang kolum (ngalan sa SNP) isip yawe alang sa associative array bin ug ang ikaduha nga kantidad (grupo) ingon nga kantidad. Unya sa block { }, nga gipatuman sa tanang linya sa main file, ang matag linya ipadala ngadto sa output file, nga makadawat ug talagsaong ngalan depende sa grupo niini (bin): ..._bin_"bin[$1]"_....

Mga variable batch_num и chunk_id gipares sa datos nga gihatag sa pipeline, paglikay sa kondisyon sa lumba, ug ang matag execution thread nga nagdagan parallel, misulat sa iyang kaugalingong talagsaong file.

Tungod kay gisabwag nako ang tanan nga hilaw nga datos sa mga folder sa mga chromosome nga nahabilin sa akong miaging eksperimento sa AWK, karon makasulat na ako ug laing Bash script aron maproseso ang usa ka chromosome sa usa ka higayon ug ipadala ang mas lawom nga partitioned data sa S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Ang script adunay duha ka seksyon parallel.

Sa una nga seksyon, ang datos gibasa gikan sa tanan nga mga file nga adunay sulud nga kasayuran sa gitinguha nga chromosome, unya kini nga datos giapod-apod sa mga hilo, nga nag-apod-apod sa mga file sa angay nga mga grupo (bin). Aron malikayan ang mga kondisyon sa lumba kung daghang mga hilo ang nagsulat sa parehas nga file, ang AWK nagpasa sa mga ngalan sa file aron isulat ang datos sa lainlaing mga lugar, pananglitan. chr_10_bin_52_batch_2_aa.csv. Ingon usa ka sangputanan, daghang gagmay nga mga file ang gihimo sa disk (alang niini gigamit nako ang mga volume nga terabyte EBS).

Conveyor gikan sa ikaduhang seksyon parallel moagi sa mga grupo (bin) ug maghiusa sa ilang indibidwal nga mga file ngadto sa komon nga CSV c catug dayon ipadala sila alang sa eksport.

Pag-broadcast sa R?

Unsay akong nakat-onan: Mahimo nimong kontakon stdin и stdout gikan sa usa ka R script, ug busa gamiton kini sa pipeline.

Tingali namatikdan nimo kini nga linya sa imong Bash script: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Gihubad niini ang tanan nga gidugtong nga mga file sa grupo (bin) sa R ​​script sa ubos. {} usa ka espesyal nga teknik parallel, nga nagsal-ot sa bisan unsang datos nga gipadala niini ngadto sa piho nga sapa direkta ngadto sa sugo mismo. Opsyon {#} naghatag ug talagsaon nga thread ID, ug {%} nagrepresentar sa numero sa slot sa trabaho (gibalikbalik, apan dili dungan). Ang usa ka lista sa tanan nga mga kapilian makita sa dokumentasyon.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Sa diha nga ang usa ka variable file("stdin") gipasa sa readr::read_csv, ang datos nga gihubad ngadto sa R ​​script gikarga ngadto sa usa ka frame, nga unya anaa sa porma .rds-file gamit aws.s3 direkta nga gisulat sa S3.

Ang RDS usa ka butang nga sama sa usa ka junior nga bersyon sa Parquet, nga wala’y mga frills sa pagtipig sa speaker.

Pagkahuman sa script sa Bash nakakuha ako usa ka bundle .rds-mga file nga nahimutang sa S3, nga nagtugot kanako sa paggamit sa episyente nga compression ug built-in nga mga tipo.

Bisan pa sa paggamit sa brake R, ang tanan nagtrabaho kaayo nga dali. Dili ikatingala, ang mga bahin sa R ​​nga nagbasa ug nagsulat sa datos labi nga na-optimize. Human sa pagsulay sa usa ka medium-sized nga chromosome, ang trabaho nahuman sa usa ka C5n.4xl nga pananglitan sa mga duha ka oras.

S3 Limitasyon

Unsay akong nakat-onan: Salamat sa pagpatuman sa maalamon nga dalan, ang S3 makahimo sa pagdumala sa daghang mga file.

Nabalaka ko kung kaya ba sa S3 ang pagdumala sa daghang mga file nga gibalhin niini. Mahimo nako nga masabtan ang mga ngalan sa file, apan unsaon man kini pagpangita sa S3?

Pag-parse sa 25TB gamit ang AWK ug R
Ang mga folder sa S3 para lang sa pagpakita, sa pagkatinuod ang sistema dili interesado sa simbolo /. Gikan sa S3 FAQ nga panid.

Mopatim-aw nga ang S3 nagrepresentar sa agianan sa usa ka partikular nga file isip usa ka yano nga yawe sa usa ka matang sa hash table o database nga nakabase sa dokumento. Ang usa ka balde mahimong isipon nga usa ka lamesa, ug ang mga file mahimong isipon nga mga rekord sa kana nga lamesa.

Tungod kay ang katulin ug kaepektibo hinungdanon aron makaganansya sa Amazon, dili katingad-an nga kini nga sistema nga yawe-as-a-file-path nga freaking na-optimize. Gisulayan nako nga mangita usa ka balanse: aron dili ako kinahanglan nga maghimo daghang mga hangyo sa pagkuha, apan nga ang mga hangyo dali nga gipatuman. Nahibal-an nga labing maayo nga maghimo mga 20 ka libo nga mga file sa bin. Sa akong hunahuna kung magpadayon kita sa pag-optimize, makab-ot naton ang usa ka pagtaas sa katulin (pananglitan, paghimo usa ka espesyal nga balde alang lamang sa datos, sa ingon pagkunhod sa gidak-on sa lamesa sa pagpangita). Apan walay panahon o salapi alang sa dugang nga mga eksperimento.

Unsa ang mahitungod sa cross compatibility?

Unsa ang Akong Nakat-unan: Ang numero unong hinungdan sa nausik nga oras mao ang pag-optimize sa imong pamaagi sa pagtipig sa wala pa sa panahon.

Niini nga punto, hinungdanon kaayo nga pangutan-on ang imong kaugalingon: "Ngano nga mogamit usa ka proprietary file format?" Ang hinungdan anaa sa katulin sa pagkarga (ang mga gzipped nga CSV file mikuha ug 7 ka pilo nga mas dugay sa pagkarga) ug pagkaangay sa among mga workflow. Mahimo nakong hunahunaon pag-usab kung ang R dali nga makakarga sa Parquet (o Arrow) nga mga file nga wala ang Spark load. Ang tanan sa among lab naggamit sa R, ug kung kinahanglan nako nga i-convert ang datos sa lain nga format, naa pa nako ang orihinal nga data sa teksto, aron mahimo ra nako nga mapadagan ang pipeline pag-usab.

Dibisyon sa trabaho

Unsay akong nakat-onan: Ayaw pagsulay sa pag-optimize sa mga trabaho nga mano-mano, tugoti ang kompyuter nga buhaton kini.

Gi-debug nako ang workflow sa usa ka chromosome, karon kinahanglan nako nga iproseso ang tanan nga ubang mga datos.
Gusto nako nga ipataas ang daghang mga higayon sa EC2 alang sa pagkakabig, apan sa parehas nga oras nahadlok ako nga makakuha usa ka dili balanse nga pagkarga sa lainlaing mga trabaho sa pagproseso (sama nga nag-antos si Spark sa dili balanse nga mga partisyon). Dugang pa, dili ako interesado sa pagpataas sa usa ka higayon matag chromosome, tungod kay alang sa mga account sa AWS adunay default nga limitasyon sa 10 nga mga higayon.

Dayon nakahukom ko nga magsulat og script sa R ​​aron ma-optimize ang mga trabaho sa pagproseso.

Una, gihangyo nako ang S3 nga kuwentahon kung pila ang espasyo sa pagtipig sa matag chromosome.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Unya nagsulat ako usa ka function nga nagkuha sa kinatibuk-ang gidak-on, gi-shuffle ang han-ay sa mga chromosome, gibahin kini sa mga grupo. num_jobs ug isulti kanimo kung unsa ka lainlain ang mga gidak-on sa tanan nga mga trabaho sa pagproseso.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Unya midagan ko sa usa ka libo nga shuffle gamit ang purrr ug gipili ang labing maayo.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Mao nga nahuman nako ang usa ka hugpong sa mga buluhaton nga parehas kaayo sa gidak-on. Unya ang tanan nga nahabilin mao ang pagputos sa akong miaging Bash script sa usa ka dako nga loop for. Kini nga pag-optimize gikuha mga 10 minuto sa pagsulat. Ug kini labi ka gamay kaysa akong gasto sa mano-mano nga paghimo og mga buluhaton kung dili kini balanse. Busa, sa akong hunahuna husto ako sa kini nga pasiuna nga pag-optimize.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Sa katapusan akong idugang ang shutdown command:

sudo shutdown -h now

... ug ang tanan nagtrabaho! Gamit ang AWS CLI, gipataas nako ang mga higayon gamit ang kapilian user_data gihatagan sila og mga Bash script sa ilang mga buluhaton alang sa pagproseso. Sila midagan ug mi-shut down awtomatik, mao nga wala ko nagbayad alang sa dugang nga gahum sa pagproseso.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Mag-empake ta!

Unsay akong nakat-onan: Ang API kinahanglan nga yano alang sa kasayon ​​ug pagka-flexible sa paggamit.

Sa katapusan nakuha nako ang datos sa husto nga lugar ug porma. Ang nahabilin mao ang pagpayano sa proseso sa paggamit sa datos kutob sa mahimo aron mas dali alang sa akong mga kauban. Gusto nako nga maghimo usa ka yano nga API alang sa paghimo og mga hangyo. Kung sa umaabot nakahukom ko nga mobalhin gikan .rds sa Parquet files, nan kini kinahanglan nga usa ka problema alang kanako, dili sa akong mga kauban. Tungod niini nakahukom ko nga maghimo usa ka internal nga pakete sa R.

Paghimo ug pagdokumento sa usa ka yano kaayo nga pakete nga adunay pipila lamang nga mga function sa pag-access sa datos nga giorganisar sa palibot sa usa ka function get_snp. Naghimo usab ako usa ka website alang sa akong mga kauban pkgdown, aron dali nilang makita ang mga pananglitan ug dokumentasyon.

Pag-parse sa 25TB gamit ang AWK ug R

Smart caching

Unsay akong nakat-onan: Kung ang imong data andam kaayo, ang pag-cache mahimong sayon!

Tungod kay ang usa sa mga nag-unang workflow mi-apply sa samang modelo sa pagtuki sa SNP package, nakahukom ko nga gamiton ang binning sa akong bentaha. Sa pagpadala sa datos pinaagi sa SNP, ang tanang impormasyon gikan sa grupo (bin) gilakip sa gibalik nga butang. Kana mao, ang daan nga mga pangutana mahimo (sa teorya) makapadali sa pagproseso sa bag-ong mga pangutana.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Sa paghimo sa package, nagdagan ako daghang mga benchmark aron itandi ang katulin kung mogamit lainlaing mga pamaagi. Girekomenda ko nga dili nimo kini pasagdan, tungod kay usahay ang mga sangputanan wala damha. Pananglitan, dplyr::filter mas paspas kay sa pagkuha sa mga laray gamit ang indexing-based nga pagsala, ug ang pagkuha sa usa ka column gikan sa nasala nga data frame mas paspas kay sa paggamit sa indexing syntax.

Palihug timan-i nga ang butang prev_snp_results naglangkob sa yawe snps_in_bin. Kini usa ka han-ay sa tanan nga talagsaon nga mga SNP sa usa ka grupo (bin), nga nagtugot kanimo sa dali nga pagsusi kung naa ka na ba mga datos gikan sa miaging pangutana. Gipasayon ​​usab niini ang pag-loop sa tanang mga SNP sa usa ka grupo (bin) uban niini nga code:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

Karon mahimo na namon (ug nagsugod na sa seryoso) nga pagpadagan sa mga modelo ug mga senaryo nga kaniadto dili maabut kanamo. Ang labing kaayo nga butang mao nga ang akong mga kauban sa lab dili kinahanglan nga maghunahuna bahin sa bisan unsang mga komplikasyon. Naa ra silay function nga mugana.

Ug bisan kung ang package nagluwas kanila sa mga detalye, gisulayan nako nga himuon nga yano ang format sa datos aron mahibal-an nila kung kalit nga nawala ako ugma ...

Ang katulin miuswag pag-ayo. Kasagaran nga gi-scan namon ang hinungdanon nga mga tipik sa genome. Kaniadto, dili namo mahimo kini (kini nahimong mahal kaayo), apan karon, salamat sa grupo (bin) nga istruktura ug caching, ang usa ka hangyo alang sa usa ka SNP nagkinahanglan sa aberids nga ubos sa 0,1 segundos, ug ang paggamit sa datos ubos kaayo. nga mani ang bili sa S3.

konklusyon

Kini nga artikulo dili usa ka giya. Ang solusyon nahimo nga indibidwal, ug hapit sigurado nga dili maayo. Hinunoa, kini usa ka travelogue. Gusto nako nga masabtan sa uban nga ang ingon nga mga desisyon dili makita nga hingpit nga naporma sa ulo, kini resulta sa pagsulay ug sayup. Usab, kung nangita ka usa ka data scientist, hinumdomi nga ang paggamit niini nga mga himan epektibo nga nanginahanglan kasinatian, ug ang kasinatian naggasto sa salapi. Nalipay ko nga ako adunay mga paagi sa pagbayad, apan daghang uban pa nga makahimo sa parehas nga trabaho nga labi ka maayo kaysa kanako dili gyud adunay higayon tungod sa kakulang sa salapi aron masulayan.

Daghang gamit sa datos ang daghang gamit. Kung naa kay oras, hapit ka makasulat usa ka mas paspas nga solusyon gamit ang mga pamaagi sa paglimpyo, pagtipig, ug pagkuha sa intelihente nga datos. Sa katapusan kini moabut ngadto sa usa ka cost-benefit analysis.

Ang akong nakat-unan:

  • walay barato nga paagi sa pag-parse sa 25 TB sa usa ka higayon;
  • Pag-amping sa gidak-on sa imong Parquet files ug sa ilang organisasyon;
  • Ang mga partisyon sa Spark kinahanglang balanse;
  • Sa kinatibuk-an, ayaw pagsulay sa paghimo sa 2,5 milyon nga partisyon;
  • Lisod gihapon ang paghan-ay, sama sa pag-set up sa Spark;
  • usahay ang espesyal nga datos nanginahanglan espesyal nga solusyon;
  • Ang spark aggregation paspas, apan ang partitioning mahal gihapon;
  • ayaw pagkatulog kung gitudloan ka nila sa mga sukaranan, tingali adunay nakasulbad sa imong problema kaniadtong 1980s;
  • gnu parallel - kini usa ka mahika nga butang, kinahanglan nga gamiton kini sa tanan;
  • Ang Spark ganahan sa wala ma-compress nga datos ug dili ganahan sa paghiusa sa mga partisyon;
  • Ang Spark adunay sobra nga overhead sa pagsulbad sa yano nga mga problema;
  • Ang mga associative arrays sa AWK episyente kaayo;
  • mahimo nimong kontakon stdin и stdout gikan sa usa ka R script, ug busa gamiton kini sa pipeline;
  • Salamat sa pagpatuman sa maalamon nga dalan, ang S3 makaproseso sa daghang mga file;
  • Ang nag-unang rason sa pag-usik sa panahon mao ang ahat nga pag-optimize sa imong pamaagi sa pagtipig;
  • ayaw pagsulay sa pag-optimize sa mga buluhaton nga mano-mano, tugoti ang kompyuter nga buhaton kini;
  • Ang API kinahanglan nga yano alang sa kasayon ​​ug pagka-flexible sa paggamit;
  • Kung ang imong data andam kaayo, ang pag-cache mahimong sayon!

Source: www.habr.com

Idugang sa usa ka comment