Pag-parse ng 25TB gamit ang AWK at R

Pag-parse ng 25TB gamit ang AWK at R
Paano basahin ang artikulong ito: Humihingi ako ng paumanhin sa napakahaba at magulo ng text. Upang makatipid ka ng oras, sisimulan ko ang bawat kabanata sa isang panimula na "Ang Natutuhan Ko", na nagbubuod sa kakanyahan ng kabanata sa isa o dalawang pangungusap.

"Ipakita mo lang sa akin ang solusyon!" Kung gusto mo lang makita kung saan ako nanggaling, pagkatapos ay lumaktaw sa kabanata na "Pagiging Mas Mapag-imbento," ngunit sa palagay ko mas kawili-wili at kapaki-pakinabang na basahin ang tungkol sa kabiguan.

Kamakailan ay inatasan ako sa pag-set up ng isang proseso para sa pagproseso ng isang malaking dami ng mga hilaw na sequence ng DNA (teknikal na isang SNP chip). Ang pangangailangan ay upang mabilis na makakuha ng data tungkol sa isang ibinigay na genetic na lokasyon (tinatawag na SNP) para sa kasunod na pagmomodelo at iba pang mga gawain. Gamit ang R at AWK, nagawa kong linisin at ayusin ang data sa natural na paraan, na lubos na nagpapabilis sa pagproseso ng query. Ito ay hindi madali para sa akin at nangangailangan ng maraming pag-ulit. Tutulungan ka ng artikulong ito na maiwasan ang ilan sa aking mga pagkakamali at ipakita sa iyo kung ano ang aking natapos.

Una, ilang panimulang paliwanag.

Data

Ang aming sentro ng pagproseso ng genetic na impormasyon sa unibersidad ay nagbigay sa amin ng data sa anyo ng isang 25 TB TSV. Natanggap ko ang mga ito na nahati sa 5 Gzip-compressed na pakete, bawat isa ay naglalaman ng humigit-kumulang 240 apat na gigabyte na file. Ang bawat row ay naglalaman ng data para sa isang SNP mula sa isang indibidwal. Sa kabuuan, naipadala ang data sa ~2,5 milyong SNP at ~60 libong tao. Bilang karagdagan sa impormasyon ng SNP, ang mga file ay naglalaman ng maraming column na may mga numerong nagpapakita ng iba't ibang katangian, gaya ng read intensity, frequency ng iba't ibang alleles, atbp. Sa kabuuan, may humigit-kumulang 30 column na may mga natatanging halaga.

Layunin

Tulad ng anumang proyekto sa pamamahala ng data, ang pinakamahalagang bagay ay upang matukoy kung paano gagamitin ang data. Sa kasong ito kadalasan ay pipili kami ng mga modelo at daloy ng trabaho para sa SNP batay sa SNP. Ibig sabihin, kakailanganin lang namin ng data sa isang SNP sa isang pagkakataon. Kinailangan kong matutunan kung paano kunin ang lahat ng mga rekord na nauugnay sa isa sa 2,5 milyong SNP nang madali, mabilis at mura hangga't maaari.

Paano hindi ito gagawin

Upang mag-quote ng angkop na clichΓ©:

Hindi ako nabigo ng isang libong beses, natuklasan ko lang ang isang libong paraan upang maiwasan ang pag-parse ng isang bungkos ng data sa isang query-friendly na format.

Unang pagsubok

Ano ang natutunan ko: Walang murang paraan upang mai-parse ang 25 TB sa isang pagkakataon.

Sa pagkuha ng kursong "Mga Advanced na Pamamaraan para sa Pagproseso ng Malaking Data" sa Vanderbilt University, sigurado akong nasa bag ang lansihin. Malamang na aabutin ng isang oras o dalawa upang i-set up ang server ng Hive upang patakbuhin ang lahat ng data at iulat ang resulta. Dahil ang aming data ay nakaimbak sa AWS S3, ginamit ko ang serbisyo Atenas, na nagbibigay-daan sa iyong ilapat ang mga query sa Hive SQL sa S3 data. Hindi mo kailangang mag-set up/magtaas ng Hive cluster, at magbabayad ka lang para sa data na hinahanap mo.

Pagkatapos kong ipakita kay Athena ang aking data at ang format nito, nagpatakbo ako ng ilang mga pagsubok na may mga query na tulad nito:

select * from intensityData limit 10;

At mabilis na nakatanggap ng maayos na mga resulta. handa na.

Hanggang sa sinubukan naming gamitin ang data sa aming trabaho...

Hiniling sa akin na ilabas ang lahat ng impormasyon ng SNP upang subukan ang modelo. Pinatakbo ko ang query:


select * from intensityData 
where snp = 'rs123456';

...at nagsimulang maghintay. Pagkatapos ng walong minuto at higit sa 4 na TB ng hiniling na data, natanggap ko ang resulta. Naniningil si Athena ayon sa dami ng data na natagpuan, $5 bawat terabyte. Kaya ang nag-iisang kahilingang ito ay nagkakahalaga ng $20 at walong minutong paghihintay. Upang patakbuhin ang modelo sa lahat ng data, kinailangan naming maghintay ng 38 taon at magbayad ng $50 milyon. Malinaw, hindi ito angkop para sa amin.

Kailangang gumamit ng Parquet...

Ano ang natutunan ko: Mag-ingat sa laki ng iyong mga Parquet file at kanilang organisasyon.

Sinubukan ko munang ayusin ang sitwasyon sa pamamagitan ng pag-convert sa lahat ng TSV Parquet file. Ang mga ito ay maginhawa para sa pagtatrabaho sa malalaking set ng data dahil ang impormasyon sa mga ito ay nakaimbak sa columnar form: ang bawat column ay nasa sarili nitong memory/disk segment, sa kaibahan sa mga text file, kung saan ang mga row ay naglalaman ng mga elemento ng bawat column. At kung kailangan mong makahanap ng isang bagay, pagkatapos ay basahin lamang ang kinakailangang hanay. Bilang karagdagan, ang bawat file ay nag-iimbak ng isang hanay ng mga halaga sa isang hanay, kaya kung ang halaga na iyong hinahanap ay wala sa hanay ng hanay, ang Spark ay hindi mag-aaksaya ng oras sa pag-scan sa buong file.

Nagpatakbo ako ng isang simpleng gawain AWS Pandikit upang i-convert ang aming mga TSV sa Parquet at i-drop ang mga bagong file sa Athena. Tumagal ito ng halos 5 oras. Ngunit nang patakbuhin ko ang kahilingan, tumagal ng halos kaparehong tagal ng oras at kaunting pera upang makumpleto. Ang katotohanan ay ang Spark, na sinusubukang i-optimize ang gawain, i-unpack lang ang isang TSV chunk at ilagay ito sa sarili nitong Parquet chunk. At dahil ang bawat chunk ay sapat na malaki upang maglaman ng buong talaan ng maraming tao, ang bawat file ay naglalaman ng lahat ng mga SNP, kaya kinailangan ng Spark na buksan ang lahat ng mga file upang kunin ang impormasyong kailangan nito.

Nang kawili-wili, ang default (at inirerekomenda) na uri ng compression ng Parquet, mabilis, ay hindi nahahati. Samakatuwid, ang bawat tagapagpatupad ay natigil sa gawain ng pag-unpack at pag-download ng buong 3,5 GB na dataset.

Pag-parse ng 25TB gamit ang AWK at R

Intindihin natin ang problema

Ano ang natutunan ko: Mahirap ang pag-uuri, lalo na kung ang data ay ipinamamahagi.

Tila sa akin ngayon naunawaan ko ang kakanyahan ng problema. Kailangan ko lang ayusin ang data ayon sa column ng SNP, hindi ng mga tao. Pagkatapos ay iimbak ang ilang SNP sa isang hiwalay na data chunk, at pagkatapos ay ang "smart" na function ng Parquet na "bubukas lamang kung ang halaga ay nasa hanay" ay lalabas sa lahat ng kaluwalhatian nito. Sa kasamaang palad, ang pag-uuri sa bilyun-bilyong mga row na nakakalat sa isang cluster ay napatunayang isang mahirap na gawain.

Talagang ayaw ng AWS na mag-isyu ng refund dahil sa kadahilanang "I'm a distracted student." Pagkatapos kong tumakbo sa pag-uuri sa Amazon Glue, tumakbo ito ng 2 araw at nag-crash.

Paano ang paghati?

Ano ang natutunan ko: Dapat balanse ang mga partisyon sa Spark.

Pagkatapos ay nakaisip ako ng ideya ng paghahati ng data sa mga chromosome. Mayroong 23 sa kanila (at ilan pa kung isasaalang-alang mo ang mitochondrial DNA at mga hindi naka-map na rehiyon).
Papayagan ka nitong hatiin ang data sa mas maliliit na piraso. Kung magdaragdag ka ng isang linya lang sa Spark export function sa Glue script partition_by = "chr", pagkatapos ay dapat na hatiin ang data sa mga bucket.

Pag-parse ng 25TB gamit ang AWK at R
Ang genome ay binubuo ng maraming fragment na tinatawag na chromosome.

Sa kasamaang palad, hindi ito gumana. Ang mga chromosome ay may iba't ibang laki, na nangangahulugang iba't ibang dami ng impormasyon. Nangangahulugan ito na ang mga gawain na ipinadala ni Spark sa mga manggagawa ay hindi balanse at mabagal na natapos dahil ang ilang mga node ay natapos nang maaga at walang ginagawa. Gayunpaman, natapos ang mga gawain. Ngunit nang humiling ng isang SNP, ang kawalan ng timbang ay muling nagdulot ng mga problema. Ang halaga ng pagproseso ng mga SNP sa mas malalaking chromosome (iyon ay, kung saan gusto naming kumuha ng data) ay nabawasan lang ng halos 10 factor. Marami, ngunit hindi sapat.

Paano kung hatiin natin ito sa mas maliliit na bahagi?

Ano ang natutunan ko: Huwag subukang gumawa ng 2,5 milyong partisyon.

Nagpasya akong lumabas lahat at hatiin ang bawat SNP. Tiniyak nito na ang mga partisyon ay may pantay na laki. ISANG MASAMANG IDEYA. Gumamit ako ng Glue at nagdagdag ng inosenteng linya partition_by = 'snp'. Nagsimula ang gawain at nagsimulang isagawa. Pagkaraan ng isang araw, tiningnan ko at nakita kong wala pa ring nakasulat sa S3, kaya pinatay ko ang gawain. Mukhang nagsusulat si Glue ng mga intermediate na file sa isang nakatagong lokasyon sa S3, maraming file, marahil ay ilang milyon. Bilang isang resulta, ang aking pagkakamali ay nagkakahalaga ng higit sa isang libong dolyar at hindi nasiyahan ang aking tagapagturo.

Paghahati + pagbubukod-bukod

Ano ang natutunan ko: Mahirap pa rin ang pag-uuri, gaya ng pag-tune ng Spark.

Ang aking huling pagtatangka sa paghati ay kinasasangkutan ko ng paghahati ng mga kromosom at pagkatapos ay pag-uuri-uriin ang bawat pagkahati. Sa teorya, mapapabilis nito ang bawat query dahil ang nais na data ng SNP ay kailangang nasa loob ng ilang parquet chunks sa loob ng isang partikular na hanay. Sa kasamaang palad, ang pag-uuri ng kahit na nahati na data ay naging isang mahirap na gawain. Bilang resulta, lumipat ako sa EMR para sa isang custom na cluster at gumamit ako ng walong mahuhusay na instance (C5.4xl) at Sparklyr para gumawa ng mas flexible na workflow...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...gayunpaman, hindi pa rin natapos ang gawain. Na-configure ko ito sa iba't ibang paraan: nadagdagan ang paglalaan ng memorya para sa bawat tagapagpatupad ng query, gumamit ng mga node na may malaking halaga ng memorya, gumamit ng mga variable ng broadcast (mga variable ng pagsasahimpapawid), ngunit sa bawat oras na ito ay naging mga kalahating sukat, at unti-unting nagsimula ang mga tagapagpatupad mabigo hanggang sa tumigil ang lahat.

Mas nagiging creative ako

Ano ang natutunan ko: Minsan ang espesyal na data ay nangangailangan ng mga espesyal na solusyon.

Ang bawat SNP ay may halaga ng posisyon. Ito ay isang numero na tumutugma sa bilang ng mga base sa kahabaan ng chromosome nito. Ito ay isang maganda at natural na paraan upang ayusin ang aming data. Noong una, gusto kong hatiin ayon sa mga rehiyon ng bawat chromosome. Halimbawa, ang mga posisyon 1 - 2000, 2001 - 4000, atbp. Ngunit ang problema ay ang mga SNP ay hindi pantay na ipinamamahagi sa mga chromosome, kaya ang mga laki ng grupo ay mag-iiba nang malaki.

Pag-parse ng 25TB gamit ang AWK at R

Bilang resulta, napunta ako sa isang breakdown ng mga posisyon sa mga kategorya (ranggo). Gamit ang na-download na data, nagpatakbo ako ng kahilingan para makakuha ng listahan ng mga natatanging SNP, ang kanilang mga posisyon at chromosome. Pagkatapos ay inayos ko ang data sa loob ng bawat chromosome at nakolekta ang mga SNP sa mga grupo (bin) ng isang naibigay na laki. Sabihin nating 1000 SNP bawat isa. Binigyan ako nito ng SNP-to-group-per-chromosome na relasyon.

Sa huli, gumawa ako ng mga grupo (bin) ng 75 SNP, ang dahilan ay ipapaliwanag sa ibaba.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Unang subukan sa Spark

Ano ang natutunan ko: Mabilis ang pagsasama-sama ng spark, ngunit mahal pa rin ang paghahati.

Nais kong basahin ang maliit na (2,5 milyong row) na data frame na ito sa Spark, pagsamahin ito sa raw data, at pagkatapos ay hatiin ito ng bagong idinagdag na column bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ginamit ko sdf_broadcast(), kaya alam ng Spark na dapat nitong ipadala ang data frame sa lahat ng node. Ito ay kapaki-pakinabang kung ang data ay maliit sa laki at kinakailangan para sa lahat ng mga gawain. Kung hindi, sinusubukan ni Spark na maging matalino at namamahagi ng data kung kinakailangan, na maaaring magdulot ng mga pagbagal.

At muli, ang aking ideya ay hindi gumana: ang mga gawain ay nagtrabaho nang ilang oras, nakumpleto ang unyon, at pagkatapos, tulad ng mga tagapagpatupad na inilunsad sa pamamagitan ng paghahati, nagsimula silang mabigo.

Pagdaragdag ng AWK

Ano ang natutunan ko: Huwag matulog kapag tinuturuan ka ng mga pangunahing kaalaman. Tiyak na may nakalutas na sa iyong problema noong 1980s.

Hanggang sa puntong ito, ang dahilan ng lahat ng aking pagkabigo sa Spark ay ang paghalu-halo ng data sa cluster. Marahil ang sitwasyon ay maaaring mapabuti sa paunang paggamot. Nagpasya akong subukang hatiin ang raw text data sa mga column ng mga chromosome, kaya umaasa akong mabigyan ang Spark ng "pre-partitioned" na data.

Naghanap ako sa StackOverflow para sa kung paano hatiin ayon sa mga halaga ng column at natagpuan napakagandang sagot. Sa AWK maaari mong hatiin ang isang text file sa pamamagitan ng mga halaga ng column sa pamamagitan ng pagsulat nito sa isang script sa halip na ipadala ang mga resulta sa stdout.

Sumulat ako ng Bash script para subukan ito. Na-download ang isa sa mga naka-package na TSV, pagkatapos ay i-unpack ito gamit gzip at ipinadala sa awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Gumana ito!

Pagpuno ng mga Core

Ano ang natutunan ko: gnu parallel - ito ay isang mahiwagang bagay, dapat gamitin ito ng lahat.

Medyo mabagal ang paghihiwalay at nang magsimula ako htopupang suriin ang paggamit ng isang malakas (at mahal) na halimbawa ng EC2, lumabas na gumagamit lamang ako ng isang core at humigit-kumulang 200 MB ng memorya. Upang malutas ang problema at hindi mawalan ng maraming pera, kailangan naming malaman kung paano i-parallelize ang trabaho. Sa kabutihang palad, sa isang ganap na kamangha-manghang libro Data Science sa Command Line Nakakita ako ng isang kabanata ni Jeron Janssens sa parallelization. Mula dito natutunan ko ang tungkol sa gnu parallel, isang napaka-flexible na paraan para sa pagpapatupad ng multithreading sa Unix.

Pag-parse ng 25TB gamit ang AWK at R
Noong sinimulan ko ang partitioning gamit ang bagong proseso, maayos ang lahat, ngunit mayroon pa ring bottleneck - ang pag-download ng mga bagay sa S3 sa disk ay hindi masyadong mabilis at hindi ganap na parallelized. Upang ayusin ito, ginawa ko ito:

  1. Nalaman ko na posible na ipatupad ang yugto ng pag-download ng S3 nang direkta sa pipeline, ganap na inaalis ang intermediate na imbakan sa disk. Nangangahulugan ito na maiiwasan ko ang pagsusulat ng hilaw na data sa disk at gumamit ng mas maliit, at samakatuwid ay mas mura, imbakan sa AWS.
  2. Koponan aws configure set default.s3.max_concurrent_requests 50 lubhang nadagdagan ang bilang ng mga thread na ginagamit ng AWS CLI (bilang default ay mayroong 10).
  3. Lumipat ako sa isang EC2 instance na na-optimize para sa bilis ng network, na may letrang n sa pangalan. Nalaman ko na ang pagkawala ng kapangyarihan sa pagpoproseso kapag gumagamit ng n-instances ay higit pa sa nabayaran ng pagtaas ng bilis ng paglo-load. Para sa karamihan ng mga gawain ginamit ko ang c5n.4xl.
  4. Nagbago gzip sa pigz, ito ay isang tool ng gzip na maaaring gumawa ng mga cool na bagay upang iparallelize ang una na hindi parallelized na gawain ng pag-decompress ng mga file (ito ay nakatulong sa pinakamaliit).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ang mga hakbang na ito ay pinagsama sa isa't isa upang gawin ang lahat nang napakabilis. Sa pamamagitan ng pagtaas ng bilis ng pag-download at pag-aalis ng mga pagsusulat sa disk, maaari na akong magproseso ng 5 terabyte na pakete sa loob lamang ng ilang oras.

Dapat ay binanggit ng tweet na ito ang 'TSV'. Naku.

Gamit ang bagong na-parse na data

Ano ang natutunan ko: Gusto ni Spark ang hindi naka-compress na data at hindi gusto ang pagsasama-sama ng mga partisyon.

Ngayon ang data ay nasa S3 sa isang unpacked (read: shared) at semi-ordered na format, at maaari akong bumalik sa Spark muli. Isang sorpresa ang naghihintay sa akin: Muli akong nabigo upang makamit ang gusto ko! Napakahirap sabihin sa Spark nang eksakto kung paano nahati ang data. At kahit na ginawa ko ito, lumabas na napakaraming partisyon (95 thousand), at kapag ginamit ko coalesce binawasan ang kanilang bilang sa mga makatwirang limitasyon, sinira nito ang aking pagkahati. Sigurado akong maaayos ito, ngunit pagkatapos ng ilang araw ng paghahanap ay wala akong mahanap na solusyon. Sa kalaunan ay natapos ko ang lahat ng mga gawain sa Spark, bagama't tumagal ito ng ilang sandali at ang aking mga split Parquet file ay hindi masyadong maliit (~200 KB). Gayunpaman, ang data ay kung saan ito kinakailangan.

Pag-parse ng 25TB gamit ang AWK at R
Masyadong maliit at hindi pantay, kahanga-hanga!

Pagsubok ng mga lokal na query sa Spark

Ano ang natutunan ko: Masyadong maraming overhead ang Spark kapag nilulutas ang mga simpleng problema.

Sa pamamagitan ng pag-download ng data sa isang matalinong format, nasubukan ko ang bilis. Mag-set up ng R script para magpatakbo ng lokal na Spark server, at pagkatapos ay nag-load ng Spark data frame mula sa tinukoy na Parquet group storage (bin). Sinubukan kong i-load ang lahat ng data, ngunit hindi makuha ng Sparklyr na makilala ang pagkahati.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Ang pagpapatupad ay tumagal ng 29,415 segundo. Mas mahusay, ngunit hindi masyadong mahusay para sa mass testing ng anumang bagay. Bukod pa rito, hindi ko mapabilis ang mga bagay gamit ang pag-cache dahil noong sinubukan kong i-cache ang isang data frame sa memorya, palaging nag-crash ang Spark, kahit na naglaan ako ng higit sa 50 GB ng memorya sa isang dataset na mas mababa sa 15 ang timbang.

Bumalik sa AWK

Ano ang natutunan ko: Napakahusay ng mga associative array sa AWK.

Napagtanto ko na makakamit ko ang mas mataas na bilis. Naalala ko iyon sa isang kahanga-hanga AWK tutorial ni Bruce Barnett Nabasa ko ang tungkol sa isang cool na feature na tinatawag na β€œassociative arrays" Sa pangkalahatan, ang mga ito ay mga key-value pairs, na sa ilang kadahilanan ay tinawag nang iba sa AWK, at samakatuwid ay hindi ko masyadong inisip ang mga ito. Roman Cheplyaka naalala na ang terminong "associative arrays" ay mas matanda kaysa sa terminong "key-value pair". Kahit ikaw hanapin ang key-value sa Google Ngram, hindi mo makikita ang terminong ito doon, ngunit makakahanap ka ng mga magkakaugnay na array! Bilang karagdagan, ang "key-value pair" ay kadalasang nauugnay sa mga database, kaya mas makatuwirang ihambing ito sa isang hashmap. Napagtanto ko na magagamit ko ang mga associative array na ito upang iugnay ang aking mga SNP sa isang bin table at raw data nang hindi gumagamit ng Spark.

Upang gawin ito, sa AWK script ginamit ko ang block BEGIN. Ito ay isang piraso ng code na isinasagawa bago ang unang linya ng data ay ipasa sa pangunahing katawan ng script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Koponan while(getline...) ni-load ang lahat ng row mula sa CSV group (bin), itakda ang unang column (pangalan ng SNP) bilang susi para sa associative array bin at ang pangalawang halaga (pangkat) bilang halaga. Tapos sa block { }, na isinasagawa sa lahat ng mga linya ng pangunahing file, ang bawat linya ay ipinadala sa output file, na tumatanggap ng isang natatanging pangalan depende sa grupo nito (bin): ..._bin_"bin[$1]"_....

Mga variable batch_num ΠΈ chunk_id tumugma sa data na ibinigay ng pipeline, pag-iwas sa isang kondisyon ng lahi, at bawat execution thread na tumatakbo parallel, sumulat sa sarili nitong natatanging file.

Dahil ikinalat ko ang lahat ng raw data sa mga folder sa mga chromosome na natitira mula sa aking nakaraang eksperimento sa AWK, ngayon ay maaari na akong magsulat ng isa pang Bash script upang iproseso ang isang chromosome sa isang pagkakataon at magpadala ng mas malalim na naka-partition na data sa S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Ang script ay may dalawang seksyon parallel.

Sa unang seksyon, ang data ay binabasa mula sa lahat ng mga file na naglalaman ng impormasyon sa nais na chromosome, pagkatapos ay ang data na ito ay ipinamamahagi sa mga thread, na namamahagi ng mga file sa naaangkop na mga grupo (bin). Upang maiwasan ang mga kundisyon ng lahi kapag maraming thread ang sumulat sa parehong file, ipinapasa ng AWK ang mga pangalan ng file upang magsulat ng data sa iba't ibang lugar, hal. chr_10_bin_52_batch_2_aa.csv. Bilang resulta, maraming maliliit na file ang nilikha sa disk (para dito ginamit ko ang mga volume ng terabyte EBS).

Conveyor mula sa pangalawang seksyon parallel dumadaan sa mga grupo (bin) at pinagsasama ang kanilang mga indibidwal na file sa karaniwang CSV c catat pagkatapos ay ipapadala ang mga ito para i-export.

Nagbo-broadcast sa R?

Ano ang natutunan ko: Maaari kang makipag-ugnayan stdin ΠΈ stdout mula sa isang R script, at samakatuwid ay gamitin ito sa pipeline.

Maaaring napansin mo ang linyang ito sa iyong Bash script: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Isinasalin nito ang lahat ng pinagsama-samang mga file ng pangkat (bin) sa R ​​script sa ibaba. {} ay isang espesyal na pamamaraan parallel, na naglalagay ng anumang data na ipinapadala nito sa tinukoy na stream nang direkta sa mismong command. Pagpipilian {#} nagbibigay ng natatanging thread ID, at {%} kumakatawan sa numero ng slot ng trabaho (paulit-ulit, ngunit hindi kailanman sabay-sabay). Ang isang listahan ng lahat ng mga opsyon ay matatagpuan sa dokumentasyon.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kapag ang isang variable file("stdin") ipinadala sa readr::read_csv, ang data na isinalin sa R ​​script ay nilo-load sa isang frame, na pagkatapos ay nasa form .rds-file gamit ang aws.s3 direktang nakasulat sa S3.

Ang RDS ay parang isang junior na bersyon ng Parquet, nang walang mga frills ng storage ng speaker.

Matapos tapusin ang script ng Bash ay nakakuha ako ng isang bundle .rds-mga file na matatagpuan sa S3, na nagpapahintulot sa akin na gumamit ng mahusay na compression at mga built-in na uri.

Sa kabila ng paggamit ng preno R, ang lahat ay gumana nang napakabilis. Hindi nakakagulat, ang mga bahagi ng R na nagbabasa at nagsusulat ng data ay lubos na na-optimize. Pagkatapos ng pagsubok sa isang medium-sized na chromosome, natapos ang trabaho sa isang C5n.4xl instance sa loob ng humigit-kumulang dalawang oras.

S3 Limitasyon

Ano ang natutunan ko: Salamat sa pagpapatupad ng matalinong landas, kayang hawakan ng S3 ang maraming file.

Nag-aalala ako kung kakayanin ng S3 ang maraming file na inilipat dito. Magagawa kong magkaroon ng kahulugan ang mga pangalan ng file, ngunit paano hahanapin ng S3 ang mga ito?

Pag-parse ng 25TB gamit ang AWK at R
Ang mga folder sa S3 ay para lamang ipakita, sa katunayan ang system ay hindi interesado sa simbolo /. Mula sa S3 FAQ page.

Lumilitaw na ang S3 ay kumakatawan sa landas sa isang partikular na file bilang isang simpleng key sa isang uri ng hash table o database na nakabatay sa dokumento. Ang isang bucket ay maaaring ituring na isang talahanayan, at ang mga file ay maaaring ituring na mga talaan sa talahanayang iyon.

Dahil ang bilis at kahusayan ay mahalaga para kumita sa Amazon, hindi nakakagulat na ang key-as-a-file-path system na ito ay na-optimize. Sinubukan kong maghanap ng balanse: para hindi na ako kailangang gumawa ng maraming kahilingan, ngunit mabilis na naisakatuparan ang mga kahilingan. Ito ay lumabas na pinakamahusay na gumawa ng humigit-kumulang 20 libong mga file ng bin. Sa tingin ko, kung patuloy tayong mag-o-optimize, makakamit natin ang isang pagtaas sa bilis (halimbawa, paggawa ng isang espesyal na bucket para lamang sa data, kaya nababawasan ang laki ng lookup table). Ngunit walang oras o pera para sa karagdagang mga eksperimento.

Paano naman ang cross compatibility?

Ang Natutuhan Ko: Ang pangunahing dahilan ng nasayang na oras ay ang pag-optimize ng iyong paraan ng pag-iimbak nang maaga.

Sa puntong ito, napakahalagang tanungin ang iyong sarili: "Bakit gagamit ng proprietary file format?" Ang dahilan ay nakasalalay sa bilis ng paglo-load (nagtagal ng 7 beses na mas matagal ang pag-load ng mga naka-gzip na CSV file) at pagiging tugma sa aming mga daloy ng trabaho. Maaari kong muling isaalang-alang kung madaling mai-load ng R ang Parquet (o Arrow) na mga file nang walang Spark load. Ang lahat sa aming lab ay gumagamit ng R, at kung kailangan kong i-convert ang data sa ibang format, mayroon pa rin akong orihinal na data ng teksto, kaya maaari ko na lang patakbuhin ang pipeline muli.

Dibisyon ng trabaho

Ano ang natutunan ko: Huwag subukang manu-manong i-optimize ang mga trabaho, hayaan ang computer na gawin ito.

Na-debug ko ang daloy ng trabaho sa isang chromosome, ngayon kailangan kong iproseso ang lahat ng iba pang data.
Nais kong itaas ang ilang mga pagkakataon sa EC2 para sa conversion, ngunit sa parehong oras ay natatakot akong makakuha ng isang hindi balanseng pagkarga sa iba't ibang mga trabaho sa pagpoproseso (tulad ng Spark ay nagdusa mula sa hindi balanseng mga partisyon). Bilang karagdagan, hindi ako interesado sa pagtaas ng isang instance bawat chromosome, dahil para sa mga AWS account ay may default na limitasyon na 10 instance.

Pagkatapos ay nagpasya akong magsulat ng isang script sa R ​​upang ma-optimize ang mga trabaho sa pagpoproseso.

Una, tinanong ko ang S3 na kalkulahin kung gaano karaming espasyo sa imbakan ang sinasakop ng bawat kromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Pagkatapos ay nagsulat ako ng isang function na kumukuha ng kabuuang sukat, bina-shuffle ang pagkakasunud-sunod ng mga chromosome, hinahati ang mga ito sa mga grupo num_jobs at sinasabi sa iyo kung gaano kaiba ang laki ng lahat ng mga trabaho sa pagpoproseso.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Γ— 3]>

Pagkatapos ay tumakbo ako sa isang libong shuffle gamit ang purrr at pinili ang pinakamahusay.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Kaya napunta ako sa isang hanay ng mga gawain na halos magkapareho sa laki. Pagkatapos ang lahat na natitira ay upang balutin ang aking nakaraang Bash script sa isang malaking loop for. Ang pag-optimize na ito ay tumagal nang humigit-kumulang 10 minuto upang magsulat. At ito ay mas mababa kaysa sa gagastusin ko sa manu-manong paggawa ng mga gawain kung hindi balanse ang mga ito. Samakatuwid, sa tingin ko ay tama ako sa paunang pag-optimize na ito.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Sa dulo idagdag ko ang shutdown command:

sudo shutdown -h now

... at naging maayos ang lahat! Gamit ang AWS CLI, nagtaas ako ng mga pagkakataon gamit ang opsyon user_data binigyan sila ng Bash script ng kanilang mga gawain para sa pagproseso. Tumakbo sila at awtomatikong nagsara, kaya hindi ako nagbabayad para sa dagdag na kapangyarihan sa pagproseso.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Mag-impake na tayo!

Ano ang natutunan ko: Ang API ay dapat na simple para sa kadalian at flexibility ng paggamit.

Sa wakas nakuha ko ang data sa tamang lugar at form. Ang natitira na lang ay pasimplehin ang proseso ng paggamit ng data hangga't maaari upang gawing mas madali para sa aking mga kasamahan. Nais kong gumawa ng isang simpleng API para sa paglikha ng mga kahilingan. Kung sa hinaharap ay magpasya akong lumipat mula sa .rds sa mga Parquet file, kung gayon ito ay dapat na isang problema para sa akin, hindi para sa aking mga kasamahan. Para dito nagpasya akong gumawa ng panloob na R package.

Bumuo at idokumento ang isang napakasimpleng pakete na naglalaman lamang ng ilang mga function ng pag-access ng data na nakaayos sa paligid ng isang function get_snp. Gumawa din ako ng website para sa mga kasamahan ko pkgdown, para madali nilang makita ang mga halimbawa at dokumentasyon.

Pag-parse ng 25TB gamit ang AWK at R

Smart caching

Ano ang natutunan ko: Kung ang iyong data ay handang mabuti, ang pag-cache ay magiging madali!

Dahil ang isa sa mga pangunahing daloy ng trabaho ay naglapat ng parehong modelo ng pagsusuri sa pakete ng SNP, nagpasya akong gumamit ng binning sa aking kalamangan. Kapag nagpapadala ng data sa pamamagitan ng SNP, ang lahat ng impormasyon mula sa grupo (bin) ay nakakabit sa ibinalik na bagay. Iyon ay, ang mga lumang query ay maaaring (sa teorya) na mapabilis ang pagproseso ng mga bagong query.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Sa pagbuo ng package, nagpatakbo ako ng maraming mga benchmark upang ihambing ang bilis kapag gumagamit ng iba't ibang mga pamamaraan. Inirerekumenda ko na huwag pabayaan ito, dahil kung minsan ang mga resulta ay hindi inaasahan. Halimbawa, dplyr::filter ay mas mabilis kaysa sa pagkuha ng mga row gamit ang indexing-based na pag-filter, at ang pagkuha ng isang column mula sa isang na-filter na data frame ay mas mabilis kaysa sa paggamit ng indexing syntax.

Mangyaring tandaan na ang bagay prev_snp_results naglalaman ng susi snps_in_bin. Isa itong hanay ng lahat ng natatanging SNP sa isang grupo (bin), na nagbibigay-daan sa iyong mabilis na suriin kung mayroon ka nang data mula sa isang nakaraang query. Pinapadali din nitong i-loop ang lahat ng SNP sa isang grupo (bin) gamit ang code na ito:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Natuklasan

Ngayon ay maaari na tayong (at nagsimula nang seryoso) magpatakbo ng mga modelo at senaryo na dati ay hindi naa-access sa amin. Ang pinakamagandang bagay ay ang aking mga kasamahan sa lab ay hindi kailangang mag-isip tungkol sa anumang mga komplikasyon. Mayroon lang silang function na gumagana.

At kahit na ang pakete ay hindi nagbibigay sa kanila ng mga detalye, sinubukan kong gawing simple ang format ng data para malaman nila kung bigla akong nawala bukas...

Kapansin-pansing tumaas ang bilis. Karaniwan naming ini-scan ang mga functionally makabuluhang genome fragment. Noong nakaraan, hindi namin magawa ito (napalabas na masyadong mahal), ngunit ngayon, salamat sa istraktura ng grupo (bin) at pag-cache, ang isang kahilingan para sa isang SNP ay tumatagal sa average na mas mababa sa 0,1 segundo, at ang paggamit ng data ay kaya. mababa na ang mga gastos para sa S3 ay mani.

Konklusyon

Ang artikulong ito ay hindi isang gabay. Ang solusyon ay naging indibidwal, at halos tiyak na hindi pinakamainam. Sa halip, ito ay isang travelogue. Nais kong maunawaan ng iba na ang mga naturang desisyon ay hindi lilitaw na ganap na nabuo sa ulo, ang mga ito ay resulta ng pagsubok at pagkakamali. Gayundin, kung naghahanap ka ng data scientist, tandaan na ang paggamit ng mga tool na ito ay epektibong nangangailangan ng karanasan, at ang karanasan ay nagkakahalaga ng pera. Masaya ako na nagkaroon ako ng paraan upang magbayad, ngunit marami pang iba na kayang gawin ang parehong trabaho nang mas mahusay kaysa sa akin ay hindi magkakaroon ng pagkakataon dahil sa kakulangan ng pera upang subukan.

Ang mga tool ng malaking data ay maraming nalalaman. Kung may oras ka, halos tiyak na makakasulat ka ng mas mabilis na solusyon gamit ang mga diskarte sa paglilinis, pag-iimbak, at pagkuha ng matalinong data. Sa huli ito ay bumaba sa isang cost-benefit analysis.

Ang natutunan ko:

  • walang murang paraan upang mai-parse ang 25 TB sa isang pagkakataon;
  • mag-ingat sa laki ng iyong mga Parquet file at kanilang organisasyon;
  • Dapat balanse ang mga partisyon sa Spark;
  • Sa pangkalahatan, huwag subukang gumawa ng 2,5 milyong partisyon;
  • Mahirap pa rin ang pag-uuri, gaya ng pagse-set up ng Spark;
  • kung minsan ang espesyal na data ay nangangailangan ng mga espesyal na solusyon;
  • Mabilis ang pagsasama-sama ng spark, ngunit mahal pa rin ang paghahati;
  • huwag matulog kapag itinuro nila sa iyo ang mga pangunahing kaalaman, malamang na may nakalutas na sa iyong problema noong 1980s;
  • gnu parallel - ito ay isang mahiwagang bagay, dapat gamitin ito ng lahat;
  • Gusto ni Spark ang hindi naka-compress na data at hindi gusto ang pagsasama-sama ng mga partisyon;
  • Masyadong maraming overhead ang Spark kapag nilulutas ang mga simpleng problema;
  • Napakahusay ng mga associative array ng AWK;
  • maaari mong kontakin stdin ΠΈ stdout mula sa isang R script, at samakatuwid ay gamitin ito sa pipeline;
  • Salamat sa pagpapatupad ng matalinong landas, ang S3 ay maaaring magproseso ng maraming mga file;
  • Ang pangunahing dahilan ng pag-aaksaya ng oras ay ang maagang pag-optimize ng iyong paraan ng pag-iimbak;
  • huwag subukang i-optimize ang mga gawain nang manu-mano, hayaan ang computer na gawin ito;
  • Ang API ay dapat na simple para sa kapakanan ng kadalian at flexibility ng paggamit;
  • Kung ang iyong data ay handa nang mabuti, ang pag-cache ay magiging madali!

Pinagmulan: www.habr.com

Magdagdag ng komento