Parsing 25TB nggunakake AWK lan R

Parsing 25TB nggunakake AWK lan R
Carane maca artikel iki: Nyuwun pangapunten awit seratan ingkang panjang lan semrawut. Kanggo ngirit wektu, aku miwiti saben bab kanthi introduksi "Apa sing Aku Sinau", sing ngringkes inti saka bab kasebut ing siji utawa rong ukara.

“Tuduhna solusine!” Yen sampeyan mung pengin ndeleng saka ngendi aku teka, banjur pindhah menyang bab "Dadi Luwih Inventif," nanging aku luwih menarik lan migunani kanggo maca babagan kegagalan.

Aku bubar ditugasake kanggo nyetel proses kanggo ngolah akeh urutan DNA mentah (secara teknis chip SNP). Keperluan kasebut kanthi cepet entuk data babagan lokasi genetik sing diwenehake (disebut SNP) kanggo pemodelan sabanjure lan tugas liyane. Nggunakake R lan AWK, aku bisa ngresiki lan ngatur data kanthi cara alami, kanthi cepet nyepetake pangolahan pitakon. Iki ora gampang kanggo aku lan mbutuhake akeh iterasi. Artikel iki bakal mbantu sampeyan ngindhari sawetara kesalahanku lan nuduhake apa sing daklakoni.

Kaping pisanan, sawetara panjelasan pambuka.

data

Pusat pangolahan informasi genetik universitas kita nyedhiyakake data kanthi bentuk TSV 25 TB. Aku nampa wong-wong mau dipérang dadi 5 paket, dikompres dening Gzip, sing saben ana udakara 240 file papat gigabyte. Saben baris ngemot data kanggo siji SNP saka siji individu. Secara total, data babagan ~ 2,5 yuta SNP lan ~ 60 ewu wong ditularake. Saliyane informasi SNP, file kasebut ngemot pirang-pirang kolom kanthi nomer sing nggambarake macem-macem karakteristik, kayata intensitas maca, frekuensi alel sing beda-beda, lsp. Total ana kira-kira 30 kolom kanthi nilai unik.

Tujuane

Kaya proyek manajemen data, sing paling penting yaiku nemtokake cara data bakal digunakake. Ing kasus iki kita biasane bakal milih model lan alur kerja kanggo SNP adhedhasar SNP. Tegese, kita mung butuh data ing siji SNP sekaligus. Aku kudu sinau carane njupuk kabeh cathetan sing digandhengake karo salah siji saka 2,5 yuta SNPs minangka gampang, cepet lan murah sabisa.

Carane ora nindakake iki

Kanggo ngutip klise sing cocog:

Aku ora gagal kaping sewu, aku mung nemokake sewu cara supaya ora ngurai pirang-pirang data kanthi format sing ramah pitakon.

Usaha pertama

Apa aku sinau: Ora ana cara sing murah kanggo ngurai 25 TB sekaligus.

Sawise njupuk kursus "Metode Lanjut kanggo Pengolahan Data Besar" ing Universitas Vanderbilt, aku yakin yen trik kasebut ana ing tas. Sampeyan mbokmenawa bakal njupuk siji utawa loro jam kanggo nyiyapake server Hive kanggo mbukak liwat kabeh data lan laporan asil. Amarga data kita disimpen ing AWS S3, aku nggunakake layanan kasebut Athena, sing ngidini sampeyan ngetrapake pitakon SQL Hive menyang data S3. Sampeyan ora perlu nyiyapake / mundhakaken kluster Hive, lan sampeyan uga mbayar mung kanggo data sing looking for.

Sawise aku nuduhake Athena data lan format, aku nglakokake sawetara tes kanthi pitakon kaya iki:

select * from intensityData limit 10;

Lan kanthi cepet nampa asil sing disusun kanthi apik. siyap.

Nganti kita nyoba nggunakake data ing karya kita ...

Aku dijaluk kanggo narik metu kabeh informasi SNP kanggo test model ing. Aku mbukak pitakon:


select * from intensityData 
where snp = 'rs123456';

...lan wiwit ngenteni. Sawise wolung menit lan luwih saka 4 TB data sing dijaluk, aku nampa asil kasebut. Athena ngisi kanthi volume data sing ditemokake, $5 saben terabyte. Dadi panjalukan siji iki biaya $20 lan ngenteni wolung menit. Kanggo mbukak model ing kabeh data, kita kudu ngenteni 38 taun lan mbayar $ 50 yuta. Temenan, iki ora cocok kanggo kita.

Sampeyan kudu nggunakake parket ...

Apa aku sinau: Ati-ati karo ukuran file Parquet lan organisasi.

Aku pisanan nyoba ndandani kahanan kanthi ngowahi kabeh TSV dadi File parket. Padha trep kanggo nggarap set data gedhe amarga informasi kasebut disimpen ing wangun kolom: saben kolom dumunung ing segmen memori / disk dhewe, beda karo file teks, ing ngendi larik ngemot unsur saben kolom. Lan yen sampeyan kudu golek soko, banjur mung maca kolom sing dibutuhake. Kajaba iku, saben file nyimpen sawetara nilai ing kolom, dadi yen nilai sing sampeyan goleki ora ana ing kisaran kolom kasebut, Spark ora bakal mbuwang wektu kanggo mindhai kabeh file.

Aku mbukak tugas prasaja AWS Lem kanggo ngowahi TSV kita dadi Parquet lan nyelehake file anyar menyang Athena. Butuh udakara 5 jam. Nanging nalika aku mbukak request, iku njupuk bab jumlah sing padha wektu lan sethitik kurang dhuwit kanggo ngrampungake. Kasunyatane yaiku Spark, nyoba ngoptimalake tugas kasebut, mung mbukak siji potongan TSV lan sijine ing potongan Parquet dhewe. Lan amarga saben potongan cukup gedhe kanggo nyimpen kabeh cathetan akeh wong, saben file ngemot kabeh SNP, mula Spark kudu mbukak kabeh file kanggo ngekstrak informasi sing dibutuhake.

Sing nggumunake, jinis kompresi standar (lan dianjurake) parket, snappy, ora bisa dipisah. Mulane, saben eksekutor macet ing tugas mbongkar lan ndownload set data 3,5 GB lengkap.

Parsing 25TB nggunakake AWK lan R

Ayo padha ngerti masalah

Apa aku sinau: Ngurutake angel, utamane yen data disebarake.

Iku ketoke kula sing saiki aku ngerti inti saka masalah. Aku mung perlu ngurutake data miturut kolom SNP, dudu wong. Banjur sawetara SNP bakal disimpen ing potongan data sing kapisah, banjur fungsi "pinter" Parquet "mung mbukak yen nilai ana ing kisaran" bakal nuduhake dhewe ing kabeh kamulyan. Sayange, ngurutake milyaran larik sing kasebar ing kluster dadi tugas sing angel.

AWS temtunipun ora pengin ngetokake mbalekaken amarga alasan "Aku murid sing bingung". Sawise aku mlayu ngurutake ing Amazon Glue, mlaku 2 dina lan nabrak.

Apa babagan partisi?

Apa aku sinau: Partisi ing Spark kudu imbang.

Banjur aku duwe ide kanggo misahake data ing kromosom. Ana 23 (lan sawetara liyane yen sampeyan nganggep DNA mitokondria lan wilayah sing ora dipetakan).
Iki bakal ngidini sampeyan pamisah data dadi potongan sing luwih cilik. Yen sampeyan nambahake mung siji baris kanggo fungsi ekspor Spark ing script Lem partition_by = "chr", banjur data kudu dipérang dadi ember.

Parsing 25TB nggunakake AWK lan R
Genom kasusun saka pirang-pirang pecahan sing disebut kromosom.

Sayange, ora bisa. Kromosom nduweni ukuran sing beda-beda, tegese jumlah informasi sing beda. Iki tegese tugas sing dikirim Spark menyang buruh ora seimbang lan rampung alon-alon amarga sawetara simpul rampung awal lan nganggur. Nanging, tugas wis rampung. Nanging nalika njaluk siji SNP, ora seimbang maneh nyebabake masalah. Biaya ngolah SNP ing kromosom sing luwih gedhe (yaiku, ing ngendi kita pengin entuk data) mung suda kira-kira 10 faktor. Akeh, nanging ora cukup.

Apa yen kita dibagi dadi bagean sing luwih cilik?

Apa aku sinau: Aja nyoba nggawe partisi 2,5 yuta.

Aku mutusaké kanggo metu kabeh lan partisi saben SNP. Iki mesthekake yen sekat padha ukuran padha. IKU IDEAS BAD. Aku digunakake Glue lan nambah baris resik partition_by = 'snp'. Tugas kasebut diwiwiti lan diwiwiti. Sawijining dina aku mriksa lan weruh yen ora ana sing ditulis ing S3, mula aku mateni tugas kasebut. Katon kaya Glue nulis file penengah menyang lokasi sing didhelikake ing S3, akeh file, bisa uga sawetara yuta. Akibaté, kesalahanku luwih saka sewu dolar lan ora nyenengake guruku.

Pemisahan + ngurutake

Apa aku sinau: Ngurutake isih angel, kaya tuning Spark.

Nyoba pungkasan ing pemisahan melu aku misahake kromosom banjur ngurutake saben partisi. Ing teori, iki bakal nyepetake saben pitakon amarga data SNP sing dikarepake kudu ana ing sawetara potongan Parquet ing sawetara tartamtu. Sayange, ngurutake data sing wis dipisah-pisah dadi tugas sing angel. Akibaté, aku ngalih menyang EMR kanggo kluster khusus lan nggunakake wolung kedadean kuat (C5.4xl) lan Sparklyr kanggo nggawe alur kerja luwih fleksibel ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...nanging tugase isih durung rampung. Aku ngatur kanthi macem-macem cara: nambah alokasi memori kanggo saben pelaksana pitakon, nggunakake simpul kanthi jumlah memori sing akeh, nggunakake variabel siaran (variabel penyiaran), nanging saben-saben iki dadi setengah ukuran, lan mboko sithik para eksekutor wiwit. gagal nganti kabeh mandheg.

Aku dadi luwih kreatif

Apa aku sinau: Kadhangkala data khusus mbutuhake solusi khusus.

Saben SNP nduweni nilai posisi. Iki minangka nomer sing cocog karo jumlah basa ing kromosom. Iki minangka cara sing apik lan alami kanggo ngatur data kita. Kaping pisanan, aku pengin misahake wilayah saben kromosom. Contone, posisi 1 - 2000, 2001 - 4000, lsp. Nanging masalahe yaiku SNP ora disebarake merata ing kromosom, mula ukuran klompok bakal beda-beda.

Parsing 25TB nggunakake AWK lan R

Akibaté, aku teka ing risak saka posisi menyang kategori (pangkat). Nggunakake data sing wis diundhuh, aku njaluk njaluk dhaptar SNP unik, posisi lan kromosom. Banjur aku ngurutake data ing saben kromosom lan nglumpukake SNP dadi klompok (bin) kanthi ukuran tartamtu. Ayo ngomong 1000 SNP saben. Iki menehi kula hubungan SNP-kanggo-kelompok-saben-kromosom.

Pungkasane, aku nggawe grup (bin) 75 SNP, alasane bakal dijlentrehake ing ngisor iki.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Coba pisanan karo Spark

Apa aku sinau: Spark aggregation cepet, nanging partisi isih larang.

Aku pengin maca pigura data cilik (2,5 yuta larik) iki menyang Spark, gabungke karo data mentah, banjur dibagi karo kolom sing mentas ditambahake. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

tak nggo sdf_broadcast(), supaya Spark ngerti yen ngirim pigura data menyang kabeh kelenjar. Iki migunani yen data ukurane cilik lan dibutuhake kanggo kabeh tugas. Yen ora, Spark nyoba dadi pinter lan nyebarake data yen perlu, sing bisa nyebabake kalem.

Lan maneh, gagasanku ora bisa ditindakake: tugas-tugas wis sawetara wektu, ngrampungake serikat pekerja, lan banjur, kaya para pelaksana sing diluncurake kanthi partisi, dheweke wiwit gagal.

Tambah AWK

Apa aku sinau: Aja turu yen diwulangi dhasar. Mesthine ana sing wis ngrampungake masalah sampeyan ing taun 1980-an.

Nganti titik iki, alesan kanggo kabeh gagal karo Spark ana jumble data ing kluster. Mbok menawa kahanan bisa didandani kanthi pre-treatment. Aku mutusake kanggo nyoba pamisah data teks mentah menyang kolom kromosom kanthi pangarep-arep nyedhiyakake Spark karo data "pre-partitioned".

Aku nggolèki ing StackOverflow carane pamisah dening nilai kolom lan ketemu jawaban sing apik banget. Kanthi AWK, sampeyan bisa pamisah file teks kanthi nilai kolom kanthi nulis ing skrip tinimbang ngirim asil menyang stdout.

Aku nulis skrip Bash kanggo nyoba. Ngundhuh salah sawijining TSV sing wis dibungkus, banjur unpack nganggo gzip lan dikirim menyang awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Iku bisa!

Isi inti

Apa aku sinau: gnu parallel - iku bab gaib, saben wong kudu nggunakake.

Pemisahan cukup alon lan nalika aku miwiti htopkanggo mriksa nggunakake kuat (lan larang) Kayata EC2, iku nguripake metu sing aku nggunakake mung siji inti lan bab 200 MB memori. Kanggo ngatasi masalah lan ora kelangan dhuwit akeh, kita kudu ngerti carane paralel karya. Begjanipun, ing buku pancen apik tenan Ilmu Data ing Command Line Aku nemokake bab dening Jeron Janssens babagan paralelisasi. Saka iku aku sinau babagan gnu parallel, cara sing fleksibel banget kanggo ngleksanakake multithreading ing Unix.

Parsing 25TB nggunakake AWK lan R
Nalika aku miwiti pemisahan nggunakake proses anyar, kabeh iku nggoleki, nanging isih ana bottleneck - download obyek S3 kanggo disk ora cepet banget lan ora kebak parallelized. Kanggo ndandani iki, aku nindakake iki:

  1. Aku ketemu metu sing iku bisa kanggo ngleksanakake tataran download S3 langsung ing pipo, rampung mbusak panyimpenan penengah ing disk. Iki tegese aku bisa supaya ora nulis data mentah kanggo disk lan nggunakake malah luwih cilik, lan mulane luwih murah, panyimpenan ing AWS.
  2. tim aws configure set default.s3.max_concurrent_requests 50 nemen tambah nomer Utas sing AWS CLI nggunakake (kanthi standar ana 10).
  3. Aku ngalih menyang conto EC2 optimized kanggo kacepetan jaringan, karo huruf n ing jeneng. Aku wis ketemu sing mundhut saka daya Processing nalika nggunakake n-kedadean luwih saka menehi ganti rugi dening Tambah ing kacepetan loading. Kanggo paling tugas aku digunakake c5n.4xl.
  4. Diganti gzip ing pigz, iki minangka alat gzip sing bisa nindakake perkara sing nyenengake kanggo parallelize tugas sing ora dikompres saka file decompressing (iki mbantu paling ora).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Langkah-langkah kasebut digabungake supaya kabeh bisa cepet banget. Kanthi nambah kacepetan download lan mbusak disk nulis, aku saiki bisa ngolah paket 5 terabyte mung sawetara jam.

Tweet iki kudune nyebutake 'TSV'. Alah.

Nggunakake data sing mentas diurai

Apa aku sinau: Spark seneng data sing ora dikompres lan ora seneng nggabungake partisi.

Saiki data ing S3 ing unpacked (maca: sambungan) lan format semi-diatur, lan aku bisa bali menyang Spark maneh. A surprise nunggu kula: Aku maneh gagal kanggo entuk apa aku wanted! Pancen angel banget kanggo ngandhani Spark kanthi persis carane data dipisahake. Lan sanajan aku nindakake iki, ternyata ana akeh banget partisi (95 ewu), lan nalika digunakake coalesce suda nomer kanggo watesan cukup, iki numpes pemisahan sandi. Aku yakin iki bisa didandani, nanging sawise sawetara dina nggoleki aku ora bisa nemokake solusi. Aku pungkasanipun rampung kabeh tugas ing Spark, senajan njupuk nalika lan file Parquet pamisah sandi ora cilik banget (~ 200 KB). Nanging, data ana ing ngendi sing dibutuhake.

Parsing 25TB nggunakake AWK lan R
Cilik banget lan ora rata, apik banget!

Nguji pitakon Spark lokal

Apa aku sinau: Spark wis kakehan overhead nalika mecahaken masalah prasaja.

Kanthi ngundhuh data kanthi format sing pinter, aku bisa nguji kacepetan. Nggawe script R kanggo mbukak server Spark lokal, lan banjur dimuat pigura data Spark saka panyimpenan grup Parquet kasebut (bin). Aku nyoba kanggo mbukak kabeh data nanging ora bisa njaluk Sparklyr kanggo ngenali pemisahan.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Eksekusi njupuk 29,415 detik. Luwih apik, nanging ora apik banget kanggo tes massa apa wae. Kajaba iku, aku ora bisa nyepetake babagan cache amarga nalika nyoba nyimpen pigura data ing memori, Spark tansah nabrak, sanajan aku nyedhiyakake luwih saka 50 GB memori menyang dataset sing bobote kurang saka 15.

Bali menyang AWK

Apa aku sinau: Array asosiatif ing AWK banget efisien.

Aku nyadari yen aku bisa entuk kecepatan sing luwih dhuwur. Aku elinga ing apik banget Tutorial AWK dening Bruce Barnett Aku maca babagan fitur keren sing diarani "susunan asosiatif" Ateges, iki minangka pasangan kunci-nilai, sing sakperangan alesan diarani beda ing AWK, lan mulane aku ora mikir babagan iki. Roman Cheplyaka ngelingi yen istilah "arrays asosiatif" luwih tuwa tinimbang istilah "pasangan kunci-nilai". Malah yen sampeyan goleki nilai kunci ing Google Ngram, sampeyan ora bakal weruh istilah iki ana, nanging sampeyan bakal nemokake array asosiatif! Kajaba iku, "pasangan kunci-nilai" paling kerep digandhengake karo basis data, saengga luwih apik kanggo mbandhingake karo peta hash. Aku nyadari yen aku bisa nggunakake array asosiatif iki kanggo nggandhengake SNP karo meja bin lan data mentah tanpa nggunakake Spark.

Kanggo nindakake iki, ing skrip AWK aku nggunakake blok kasebut BEGIN. Iki minangka potongan kode sing dieksekusi sadurunge baris data pisanan dikirim menyang awak utama skrip.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

tim while(getline...) dimuat kabeh baris saka grup CSV (bin), nyetel kolom pisanan (jeneng SNP) minangka tombol kanggo array asosiatif bin lan nilai kapindho (kelompok) minangka nilai. Banjur ing blok { }, sing dieksekusi ing kabeh baris file utama, saben baris dikirim menyang file output, sing nampa jeneng unik gumantung saka klompok (bin): ..._bin_"bin[$1]"_....

Variabel batch_num и chunk_id cocog karo data sing diwenehake dening pipa, ngindhari kondisi balapan, lan saben thread eksekusi mlaku parallel, nulis menyang file unik dhewe.

Wiwit aku nyebar kabeh data mentah menyang folder ing kromosom sing ditinggalake saka eksperimen sadurunge karo AWK, saiki aku bisa nulis skrip Bash liyane kanggo ngolah siji kromosom sekaligus lan ngirim data partisi sing luwih jero menyang S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skrip duwe rong bagean parallel.

Ing bagean pisanan, data diwaca saka kabeh file sing ngemot informasi babagan kromosom sing dikarepake, banjur data iki disebarake ing benang, sing nyebarake file menyang klompok sing cocog (bin). Kanggo ngindhari kahanan balapan nalika sawetara thread nulis menyang file sing padha, AWK ngliwati jeneng file kanggo nulis data menyang panggonan sing beda, f.eks. chr_10_bin_52_batch_2_aa.csv. Akibaté, akeh file cilik digawe ing disk (kanggo iki aku nggunakake volume terabyte EBS).

Conveyor saka bagean kapindho parallel liwat kelompok (bin) lan nggabungke file individu menyang CSV umum c catbanjur dikirim kanggo ekspor.

Siaran ing R?

Apa aku sinau: Bisa kontak stdin и stdout saka script R, lan mulane nggunakake ing pipeline.

Sampeyan bisa uga wis ngelingi baris iki ing skrip Bash sampeyan: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Iku nerjemahake kabeh file grup concatenated (bin) menyang script R ngisor. {} yaiku teknik khusus parallel, sing nglebokake data apa wae sing dikirim menyang stream sing ditemtokake langsung menyang printah kasebut. Pilihan {#} menehi ID thread unik, lan {%} nggantosi nomer slot proyek (baleni, nanging ora tau bebarengan). Dhaptar kabeh opsi bisa ditemokake ing dokumentasi.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Nalika variabel file("stdin") ditularaké kanggo readr::read_csv, data sing diterjemahake menyang aksara R dimuat menyang pigura, kang banjur ing wangun .rds-file nggunakake aws.s3 ditulis langsung menyang S3.

RDS kaya versi junior saka Parquet, tanpa frills panyimpenan speaker.

Sawise rampung skrip Bash aku entuk bundel .rds-file dumunung ing S3, kang ngidini kula kanggo nggunakake komprèsi efisien lan dibangun ing jinis.

Senadyan nggunakake brake R, kabeh bisa cepet banget. Ora kaget, bagean R sing maca lan nulis data dioptimalake banget. Sawise nyoba ing siji kromosom medium-ukuran, proyek rampung ing conto C5n.4xl ing bab rong jam.

S3 Watesan

Apa aku sinau: Thanks kanggo implementasine path pinter, S3 bisa nangani akeh file.

Aku kuwatir apa S3 bakal bisa nangani akeh file sing ditransfer menyang. Aku bisa nggawe jeneng file nggawe akal, nanging kepiye carane S3 nggoleki?

Parsing 25TB nggunakake AWK lan R
Folder ing S3 mung kanggo nuduhake, nyatane sistem ora kasengsem ing simbol /. Saka kaca FAQ S3.

Katon yen S3 makili path menyang file tartamtu minangka tombol prasaja ing tabel hash utawa basis data basis dokumen. Ember bisa dianggep minangka meja, lan file bisa dianggep minangka cathetan ing tabel kasebut.

Amarga kacepetan lan efisiensi penting kanggo entuk bathi ing Amazon, mula ora kaget yen sistem path key-as-a-file-path iki dioptimalake. Aku nyoba golek imbangan: supaya aku ora kudu njaluk akeh njaluk panjalukan, nanging panjalukan wis kaleksanan cepet. Ternyata paling apik nggawe file bin udakara 20 ewu. Aku yen kita terus kanggo ngoptimalake, kita bisa entuk Tambah ing kacepetan (contone, nggawe ember khusus mung kanggo data, saéngga nyuda ukuran meja goleki). Nanging ora ana wektu utawa dhuwit kanggo eksperimen luwih lanjut.

Apa babagan kompatibilitas silang?

Apa Aku Sinau: Panyebab nomer siji saka wektu boroske yaiku ngoptimalake cara panyimpenan kanthi prematur.

Ing wektu iki, penting banget kanggo takon dhewe: "Napa nggunakake format file proprietary?" Alesane dumunung ing kacepetan loading (file CSV gzipped njupuk 7 kaping luwih suwe kanggo mbukak) lan kompatibilitas karo alur kerja kita. Aku bisa nimbang maneh yen R bisa gampang mbukak Parquet (utawa Arrow) file tanpa mbukak Spark. Saben uwong ing lab kita nggunakake R, lan yen aku kudu ngowahi data menyang format liyane, Aku isih duwe data teks asli, supaya aku mung bisa mbukak pipeline maneh.

Divisi karya

Apa aku sinau: Aja nyoba kanggo ngoptimalake proyek kanthi manual, supaya komputer nindakaken.

Aku wis debug alur kerja ing siji kromosom, saiki aku kudu ngolah kabeh data liyane.
Aku pengin ngunggahake sawetara conto EC2 kanggo konversi, nanging ing wektu sing padha aku wedi entuk beban sing ora seimbang ing macem-macem proyek pangolahan (kaya Spark nandhang partisi sing ora seimbang). Kajaba iku, aku ora kasengsem ngunggahake siji conto saben kromosom, amarga kanggo akun AWS ana watesan standar 10 kasus.

Banjur aku mutusake kanggo nulis skrip ing R kanggo ngoptimalake proyek pangolahan.

Kaping pisanan, aku takon S3 kanggo ngetung pira papan panyimpenan sing dikuwasani saben kromosom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Banjur aku nulis fungsi sing njupuk ukuran total, shuffles urutan kromosom, dibagi dadi klompok. num_jobs lan ngandhani carane beda ukuran kabeh proyek Processing.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Banjur aku mlayu liwat sewu shuffles nggunakake purrr lan milih sing paling apik.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Dadi aku rampung karo pesawat tugas sing padha banget ukurane. Banjur kabeh sing isih ana yaiku mbungkus skrip Bash sadurunge ing loop gedhe for. Optimasi iki butuh udakara 10 menit kanggo nulis. Lan iki luwih murah tinimbang sing bakal ditindakake kanthi manual nggawe tugas yen ora seimbang. Mulane, aku mikir yen aku bener karo optimasi awal iki.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Ing pungkasan aku nambah printah mateni:

sudo shutdown -h now

... lan kabeh bisa metu! Nggunakake AWS CLI, aku ngangkat conto nggunakake pilihan kasebut user_data menehi skrip Bash tugas kanggo diproses. Padha mlayu lan mateni kanthi otomatis, mula aku ora mbayar daya pangolahan ekstra.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Ayo pak!

Apa aku sinau: API kudu prasaja kanggo ease lan keluwesan panggunaan.

Akhire aku entuk data ing panggonan lan formulir sing bener. Sing isih ana yaiku nyederhanakake proses nggunakake data sabisa-bisa supaya luwih gampang kanggo kanca-kanca. Aku wanted kanggo nggawe API prasaja kanggo nggawe panjalukan. Yen ing mangsa aku arep ngalih saka .rds menyang file Parquet, banjur iki kudu dadi masalah kanggo kula, ora kanggo kolega. Iki aku mutusaké kanggo nggawe paket R internal.

Mbangun lan document paket banget prasaja ngemot mung sawetara fungsi akses data diatur watara fungsi get_snp. Aku uga nggawe situs web kanggo kanca-kancaku pkg mudhun, supaya bisa gampang ndeleng conto lan dokumentasi.

Parsing 25TB nggunakake AWK lan R

Caching pinter

Apa aku sinau: Yen data wis disiapake kanthi apik, cache bakal gampang!

Wiwit siji saka workflows utama Applied model analisis padha kanggo paket SNP, Aku mutusaké kanggo nggunakake binning kanggo kauntungan. Nalika ngirim data liwat SNP, kabeh informasi saka grup (bin) ditempelake menyang obyek bali. Yaiku, pitakon lawas bisa (ing teori) nyepetake pangolahan pitakon anyar.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Nalika mbangun paket, aku mbukak akeh pathokan kanggo mbandhingake kacepetan nalika nggunakake macem-macem cara. Aku nyaranake supaya ora nglirwakake iki, amarga kadhangkala asil ora dikarepke. Tuladhane, dplyr::filter luwih cepet tinimbang njupuk baris nggunakake panyaring basis indeksasi, lan njupuk kolom siji saka pigura data sing disaring luwih cepet tinimbang nggunakake sintaks indeksasi.

Wigati dimangerteni manawa obyek kasebut prev_snp_results ngandhut kunci snps_in_bin. Iki minangka susunan kabeh SNP unik ing grup (bin), ngidini sampeyan mriksa kanthi cepet yen sampeyan wis duwe data saka pitakon sadurunge. Iku uga nggampangake ngubengi kabeh SNP ing grup (bin) kanthi kode iki:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

Saiki kita bisa (lan wis wiwit serius) mbukak model lan skenario sing sadurunge ora bisa diakses. Sing paling apik yaiku kanca-kanca laboratorium ora kudu mikir babagan komplikasi. Dheweke mung duwe fungsi sing bisa digunakake.

Lan sanajan paket kasebut ora menehi katrangan, aku nyoba nggawe format data kanthi gampang supaya bisa ngerti yen aku tiba-tiba ilang sesuk ...

Kacepetan wis tambah ketara. Biasane kita mindhai fragmen genom sing penting kanthi fungsional. Sadurunge, kita ora bisa nindakake iki (pranyata larang banget), nanging saiki, thanks kanggo struktur grup (bin) lan caching, panjalukan kanggo siji SNP njupuk rata-rata kurang saka 0,1 detik, lan panggunaan data dadi. murah yen biaya kanggo S3 yaiku kacang.

kesimpulan

Artikel iki dudu tuntunan babar pisan. Solusi kasebut dadi individu, lan meh mesthi ora optimal. Luwih, iku travelogue. Aku pengin wong liya ngerti manawa keputusan kasebut ora katon kanthi lengkap ing sirah, minangka asil saka nyoba lan kesalahan. Uga, yen sampeyan nggoleki ilmuwan data, elinga yen nggunakake alat kasebut kanthi efektif mbutuhake pengalaman, lan pengalaman mbutuhake dhuwit. Aku seneng yen aku duwe sarana kanggo mbayar, nanging akeh liyane sing bisa nindakake proyek sing padha luwih apik tinimbang aku ora bakal duwe kesempatan amarga ora duwe dhuwit kanggo nyoba.

Piranti data gedhe serbaguna. Yen sampeyan duwe wektu, sampeyan meh bisa nulis solusi sing luwih cepet nggunakake teknik reresik, panyimpenan, lan ekstraksi data sing cerdas. Pungkasane, ana analisis biaya-manfaat.

Apa aku sinau:

  • ora ana cara sing murah kanggo ngurai 25 TB sekaligus;
  • Ati-ati karo ukuran file Parquet lan organisasi;
  • Partisi ing Spark kudu seimbang;
  • Umumé, aja nyoba nggawe partisi 2,5 yuta;
  • Ngurutake isih angel, kaya nyetel Spark;
  • kadhangkala data khusus mbutuhake solusi khusus;
  • Spark aggregation cepet, nanging partisi isih larang;
  • aja turu nalika mulang sampeyan dhasar, ana sing bisa uga wis ngrampungake masalah sampeyan ing taun 1980-an;
  • gnu parallel - iki bab gaib, saben wong kudu nggunakake;
  • Spark seneng data sing ora dikompres lan ora seneng nggabungake partisi;
  • Spark duwe overhead banget nalika ngrampungake masalah sing gampang;
  • Susunan asosiatif AWK banget efisien;
  • sampeyan bisa hubungi stdin и stdout saka script R, lan mulane nggunakake ing pipeline;
  • Thanks kanggo implementasi path pinter, S3 bisa ngolah akeh file;
  • Alesan utama kanggo mbuang wektu prematur ngoptimalake cara panyimpenan;
  • aja nyoba ngoptimalake tugas kanthi manual, supaya komputer nindakaken;
  • API kudu prasaja kanggo ease lan keluwesan panggunaan;
  • Yen data wis disiapake kanthi apik, cache bakal gampang!

Source: www.habr.com

Add a comment