Menghuraikan 25TB menggunakan AWK dan R

Menghuraikan 25TB menggunakan AWK dan R
Bagaimana untuk membaca artikel ini: Saya memohon maaf kerana teks yang terlalu panjang dan bercelaru. Untuk menjimatkan masa anda, saya memulakan setiap bab dengan pengenalan "Apa yang Saya Pelajari", yang meringkaskan intipati bab itu dalam satu atau dua ayat.

"Tunjukkan saya penyelesaiannya!" Jika anda hanya ingin melihat dari mana saya berasal, kemudian langkau ke bab "Menjadi Lebih Inventif," tetapi saya fikir ia lebih menarik dan berguna untuk membaca tentang kegagalan.

Saya baru-baru ini ditugaskan untuk menyediakan proses untuk memproses sejumlah besar jujukan DNA mentah (secara teknikalnya cip SNP). Keperluannya adalah untuk mendapatkan data dengan cepat tentang lokasi genetik tertentu (dipanggil SNP) untuk pemodelan seterusnya dan tugas lain. Menggunakan R dan AWK, saya dapat membersihkan dan menyusun data secara semula jadi, dengan sangat mempercepatkan pemprosesan pertanyaan. Ini tidak mudah bagi saya dan memerlukan banyak lelaran. Artikel ini akan membantu anda mengelakkan beberapa kesilapan saya dan menunjukkan kepada anda apa yang telah saya lalui.

Pertama, beberapa penjelasan pengenalan.

Data

Pusat pemprosesan maklumat genetik universiti kami memberikan kami data dalam bentuk TSV 25 TB. Saya menerimanya dibahagikan kepada 5 pakej, dimampatkan oleh Gzip, setiap satunya mengandungi kira-kira 240 fail empat gigabait. Setiap baris mengandungi data untuk satu SNP daripada satu individu. Secara keseluruhan, data mengenai ~2,5 juta SNP dan ~60 ribu orang telah dihantar. Selain maklumat SNP, fail tersebut mengandungi banyak lajur dengan nombor yang mencerminkan pelbagai ciri, seperti keamatan bacaan, kekerapan alel berbeza, dsb. Secara keseluruhan terdapat kira-kira 30 lajur dengan nilai unik.

Matlamat

Seperti mana-mana projek pengurusan data, perkara yang paling penting ialah menentukan cara data akan digunakan. Dalam kes ini kami kebanyakannya akan memilih model dan aliran kerja untuk SNP berdasarkan SNP. Iaitu, kami hanya memerlukan data pada satu SNP pada satu masa. Saya terpaksa belajar cara mendapatkan semula semua rekod yang dikaitkan dengan salah satu daripada 2,5 juta SNP semudah, cepat dan semurah yang mungkin.

Bagaimana untuk tidak melakukan ini

Untuk memetik klise yang sesuai:

Saya tidak gagal seribu kali, saya hanya menemui seribu cara untuk mengelakkan menghuraikan sekumpulan data dalam format mesra pertanyaan.

Percubaan pertama

Apa yang saya pelajari: Tiada cara murah untuk menghuraikan 25 TB pada satu masa.

Setelah mengikuti kursus "Kaedah Lanjutan untuk Pemprosesan Data Besar" di Vanderbilt University, saya pasti bahawa helah itu ada di dalam beg. Ia mungkin akan mengambil masa satu atau dua jam untuk menyediakan pelayan Hive untuk menjalankan semua data dan melaporkan hasilnya. Memandangkan data kami disimpan dalam AWS S3, saya menggunakan perkhidmatan tersebut Athena, yang membolehkan anda menggunakan pertanyaan SQL Hive pada data S3. Anda tidak perlu menyediakan/menaikkan kluster Hive, dan anda juga membayar hanya untuk data yang anda cari.

Selepas saya menunjukkan kepada Athena data saya dan formatnya, saya menjalankan beberapa ujian dengan pertanyaan seperti ini:

select * from intensityData limit 10;

Dan dengan cepat menerima keputusan yang tersusun dengan baik. sedia.

Sehingga kami cuba menggunakan data dalam kerja kami...

Saya diminta untuk mengeluarkan semua maklumat SNP untuk menguji model. Saya menjalankan pertanyaan:


select * from intensityData 
where snp = 'rs123456';

... dan mula menunggu. Selepas lapan minit dan lebih daripada 4 TB data yang diminta, saya menerima hasilnya. Athena mengecaj mengikut volum data yang ditemui, $5 setiap terabait. Jadi permintaan tunggal ini berharga $20 dan lapan minit menunggu. Untuk menjalankan model pada semua data, kami terpaksa menunggu 38 tahun dan membayar $50 juta. Jelas sekali, ini tidak sesuai untuk kami.

Ia perlu menggunakan Parket...

Apa yang saya pelajari: Berhati-hati dengan saiz fail Parket anda dan organisasinya.

Saya mula-mula cuba membetulkan keadaan dengan menukar semua TSV kepada Fail parket. Ia mudah untuk bekerja dengan set data yang besar kerana maklumat di dalamnya disimpan dalam bentuk kolumnar: setiap lajur terletak dalam segmen memori/cakera sendiri, berbeza dengan fail teks, di mana baris mengandungi elemen setiap lajur. Dan jika anda perlu mencari sesuatu, maka baca sahaja lajur yang diperlukan. Selain itu, setiap fail menyimpan julat nilai dalam lajur, jadi jika nilai yang anda cari tidak berada dalam julat lajur, Spark tidak akan membuang masa mengimbas keseluruhan fail.

Saya menjalankan tugas yang mudah Gam AWS untuk menukar TSV kami kepada Parket dan menjatuhkan fail baharu ke Athena. Ia mengambil masa kira-kira 5 jam. Tetapi apabila saya menjalankan permintaan itu, ia mengambil masa yang sama dan sedikit wang untuk diselesaikan. Hakikatnya ialah Spark, cuba mengoptimumkan tugas, hanya membongkar satu ketul TSV dan meletakkannya di dalam ketulan Parketnya sendiri. Dan kerana setiap bahagian cukup besar untuk menyimpan keseluruhan rekod ramai orang, setiap fail mengandungi semua SNP, jadi Spark terpaksa membuka semua fail untuk mengekstrak maklumat yang diperlukan.

Menariknya, jenis mampatan lalai (dan disyorkan) Parket, tajam, tidak boleh dipisahkan. Oleh itu, setiap pelaksana tersekat pada tugas membongkar dan memuat turun set data 3,5 GB penuh.

Menghuraikan 25TB menggunakan AWK dan R

Mari kita fahami masalahnya

Apa yang saya pelajari: Menyusun adalah sukar, terutamanya jika data diedarkan.

Nampaknya saya kini memahami intipati masalah itu. Saya hanya perlu mengisih data mengikut lajur SNP, bukan mengikut orang. Kemudian beberapa SNP akan disimpan dalam ketulan data yang berasingan, dan kemudian fungsi "pintar" Parquet "terbuka hanya jika nilai berada dalam julat" akan menunjukkan dirinya dalam semua kegemilangannya. Malangnya, menyusun berbilion-bilion baris yang bertaburan merentasi gugusan terbukti menjadi tugas yang sukar.

AWS pastinya tidak mahu mengeluarkan bayaran balik atas sebab "Saya pelajar yang terganggu". Selepas saya menjalankan pengisihan pada Amazon Glue, ia berjalan selama 2 hari dan ranap.

Bagaimana pula dengan pembahagian?

Apa yang saya pelajari: Pembahagian dalam Spark mestilah seimbang.

Kemudian saya mendapat idea untuk membahagikan data dalam kromosom. Terdapat 23 daripadanya (dan beberapa lagi jika anda mengambil kira DNA mitokondria dan kawasan yang tidak dipetakan).
Ini akan membolehkan anda membahagikan data kepada bahagian yang lebih kecil. Jika anda menambah hanya satu baris pada fungsi eksport Spark dalam skrip Glue partition_by = "chr", maka data hendaklah dibahagikan kepada baldi.

Menghuraikan 25TB menggunakan AWK dan R
Genom terdiri daripada banyak serpihan yang dipanggil kromosom.

Malangnya, ia tidak berjaya. Kromosom mempunyai saiz yang berbeza, yang bermaksud jumlah maklumat yang berbeza. Ini bermakna bahawa tugasan yang dihantar oleh Spark kepada pekerja tidak seimbang dan diselesaikan dengan perlahan kerana beberapa nod selesai awal dan melahu. Walau bagaimanapun, tugasan telah diselesaikan. Tetapi apabila meminta satu SNP, ketidakseimbangan sekali lagi menyebabkan masalah. Kos pemprosesan SNP pada kromosom yang lebih besar (iaitu, di mana kita ingin mendapatkan data) hanya berkurangan kira-kira faktor 10. Banyak, tetapi tidak mencukupi.

Bagaimana jika kita membahagikannya kepada bahagian yang lebih kecil lagi?

Apa yang saya pelajari: Jangan sekali-kali cuba melakukan 2,5 juta partition sama sekali.

Saya memutuskan untuk keluar dan membahagikan setiap SNP. Ini memastikan bahawa sekatan mempunyai saiz yang sama. IA ADALAH IDEA YANG BURUK. Saya menggunakan Gam dan menambah baris yang tidak bersalah partition_by = 'snp'. Tugasan bermula dan mula dilaksanakan. Sehari kemudian saya menyemak dan melihat bahawa masih tiada apa-apa yang ditulis kepada S3, jadi saya membunuh tugas itu. Nampaknya Glue sedang menulis fail perantaraan ke lokasi tersembunyi dalam S3, banyak fail, mungkin beberapa juta. Akibatnya, kesilapan saya menelan belanja lebih daripada seribu ringgit dan tidak menggembirakan mentor saya.

Pembahagian + penyisihan

Apa yang saya pelajari: Menyusun masih sukar, seperti menala Spark.

Percubaan terakhir saya untuk membahagikan melibatkan saya membahagikan kromosom dan kemudian menyusun setiap bahagian. Secara teorinya, ini akan mempercepatkan setiap pertanyaan kerana data SNP yang diingini mestilah berada dalam beberapa bahagian Parket dalam julat tertentu. Malangnya, menyusun walaupun data yang dipisahkan ternyata menjadi tugas yang sukar. Akibatnya, saya bertukar kepada EMR untuk kluster tersuai dan menggunakan lapan kejadian berkuasa (C5.4xl) dan Sparklyr untuk mencipta aliran kerja yang lebih fleksibel...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... walau bagaimanapun, tugasan itu masih belum selesai. Saya mengkonfigurasinya dengan cara yang berbeza: meningkatkan peruntukan memori untuk setiap pelaksana pertanyaan, menggunakan nod dengan jumlah memori yang besar, menggunakan pembolehubah penyiaran (pembolehubah penyiaran), tetapi setiap kali ini ternyata menjadi separuh ukuran, dan secara beransur-ansur pelaksana bermula gagal sehingga semuanya berhenti.

Saya semakin kreatif

Apa yang saya pelajari: Kadangkala data khas memerlukan penyelesaian khas.

Setiap SNP mempunyai nilai kedudukan. Ini adalah nombor yang sepadan dengan bilangan asas di sepanjang kromosomnya. Ini adalah cara yang bagus dan semula jadi untuk mengatur data kami. Pada mulanya saya ingin membahagikan mengikut kawasan setiap kromosom. Contohnya, jawatan 1 - 2000, 2001 - 4000, dsb. Tetapi masalahnya ialah SNP tidak diagihkan sama rata merentas kromosom, jadi saiz kumpulan akan berbeza-beza.

Menghuraikan 25TB menggunakan AWK dan R

Akibatnya, saya mendapat pecahan jawatan mengikut kategori (pangkat). Menggunakan data yang telah dimuat turun, saya menjalankan permintaan untuk mendapatkan senarai SNP unik, kedudukan dan kromosom mereka. Kemudian saya mengisih data dalam setiap kromosom dan mengumpul SNP ke dalam kumpulan (bin) dengan saiz tertentu. Katakan 1000 SNP setiap satu. Ini memberi saya hubungan SNP-ke-kumpulan-per-kromosom.

Pada akhirnya, saya membuat kumpulan (bin) 75 SNP, sebabnya akan dijelaskan di bawah.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Cubaan pertama dengan Spark

Apa yang saya pelajari: Pengagregatan percikan adalah pantas, tetapi pembahagian masih mahal.

Saya ingin membaca bingkai data kecil (2,5 juta baris) ini ke dalam Spark, menggabungkannya dengan data mentah, dan kemudian membahagikannya dengan lajur yang baru ditambah bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

sudah biasa sdf_broadcast(), jadi Spark tahu bahawa ia harus menghantar bingkai data ke semua nod. Ini berguna jika data bersaiz kecil dan diperlukan untuk semua tugas. Jika tidak, Spark cuba menjadi pintar dan mengedarkan data mengikut keperluan, yang boleh menyebabkan kelembapan.

Dan sekali lagi, idea saya tidak berfungsi: tugas bekerja untuk beberapa waktu, menyelesaikan kesatuan, dan kemudian, seperti pelaksana yang dilancarkan dengan membahagikan, mereka mula gagal.

Menambah AWK

Apa yang saya pelajari: Jangan tidur apabila anda diajar perkara asas. Pasti seseorang telah menyelesaikan masalah anda pada tahun 1980-an.

Sehingga ke tahap ini, sebab semua kegagalan saya dengan Spark adalah gabungan data dalam kelompok. Mungkin keadaan boleh diperbaiki dengan pra-rawatan. Saya memutuskan untuk mencuba membahagikan data teks mentah kepada lajur kromosom dengan harapan dapat menyediakan Spark dengan data "pra-partition".

Saya mencari di StackOverflow untuk cara membahagikan mengikut nilai lajur dan dijumpai jawapan yang begitu hebat. Dengan AWK anda boleh membahagikan fail teks mengikut nilai lajur dengan menulisnya dalam skrip dan bukannya menghantar hasilnya ke stdout.

Saya menulis skrip Bash untuk mencubanya. Memuat turun salah satu TSV yang dibungkus, kemudian membongkarnya menggunakan gzip dan dihantar ke awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ianya berhasil!

Mengisi Teras

Apa yang saya pelajari: gnu parallel - ia adalah satu perkara yang ajaib, semua orang harus menggunakannya.

Perpisahan agak perlahan dan apabila saya mula htopuntuk menyemak penggunaan contoh EC2 yang berkuasa (dan mahal), ternyata saya hanya menggunakan satu teras dan kira-kira 200 MB memori. Untuk menyelesaikan masalah dan tidak kehilangan banyak wang, kami perlu memikirkan cara untuk menyelaraskan kerja. Nasib baik, dalam buku yang sangat menakjubkan Sains Data di Barisan Perintah Saya menjumpai bab oleh Jeron Janssens mengenai penyejajaran. Daripadanya saya belajar tentang gnu parallel, kaedah yang sangat fleksibel untuk melaksanakan multithreading dalam Unix.

Menghuraikan 25TB menggunakan AWK dan R
Apabila saya memulakan pembahagian menggunakan proses baharu, semuanya baik-baik saja, tetapi masih terdapat kesesakan - memuat turun objek S3 ke cakera tidak begitu pantas dan tidak selari sepenuhnya. Untuk membetulkannya, saya melakukan ini:

  1. Saya mendapati bahawa adalah mungkin untuk melaksanakan peringkat muat turun S3 secara langsung dalam perancangan, menghapuskan sepenuhnya storan perantaraan pada cakera. Ini bermakna saya boleh mengelak daripada menulis data mentah ke cakera dan menggunakan storan yang lebih kecil, dan oleh itu lebih murah, pada AWS.
  2. pasukan aws configure set default.s3.max_concurrent_requests 50 telah meningkatkan bilangan utas yang AWS CLI gunakan (secara lalai terdapat 10).
  3. Saya beralih kepada contoh EC2 yang dioptimumkan untuk kelajuan rangkaian, dengan huruf n dalam nama. Saya telah mendapati bahawa kehilangan kuasa pemprosesan apabila menggunakan n-contoh adalah lebih daripada dikompensasi oleh peningkatan kelajuan pemuatan. Untuk kebanyakan tugas saya menggunakan c5n.4xl.
  4. Berubah gzip pada pigz, ini ialah alat gzip yang boleh melakukan perkara yang menarik untuk menyelaraskan tugas yang pada mulanya tidak selari untuk menyahmampat fail (ini paling tidak membantu).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Langkah-langkah ini digabungkan antara satu sama lain untuk menjadikan semuanya berfungsi dengan cepat. Dengan meningkatkan kelajuan muat turun dan menghapuskan penulisan cakera, saya kini boleh memproses pakej 5 terabait dalam beberapa jam sahaja.

Tweet ini sepatutnya menyebut 'TSV'. Malangnya.

Menggunakan data yang baru dihuraikan

Apa yang saya pelajari: Spark suka data yang tidak dimampatkan dan tidak suka menggabungkan partition.

Kini data berada dalam S3 dalam format yang tidak dibungkus (baca: dikongsi) dan separa tersusun, dan saya boleh kembali ke Spark semula. Satu kejutan menanti saya: Saya sekali lagi gagal mencapai apa yang saya inginkan! Amat sukar untuk memberitahu Spark dengan tepat bagaimana data itu dibahagikan. Dan walaupun saya melakukan ini, ternyata terdapat terlalu banyak partition (95 ribu), dan apabila saya menggunakannya coalesce mengurangkan bilangan mereka kepada had yang munasabah, ini memusnahkan pembahagian saya. Saya pasti ini boleh diperbaiki, tetapi selepas beberapa hari mencari saya tidak dapat mencari penyelesaian. Saya akhirnya menyelesaikan semua tugas dalam Spark, walaupun ia mengambil sedikit masa dan fail Parket saya yang berpecah tidaklah terlalu kecil (~200 KB). Walau bagaimanapun, data adalah di mana ia diperlukan.

Menghuraikan 25TB menggunakan AWK dan R
Terlalu kecil dan tidak rata, hebat!

Menguji pertanyaan Spark tempatan

Apa yang saya pelajari: Spark mempunyai terlalu banyak overhed apabila menyelesaikan masalah mudah.

Dengan memuat turun data dalam format yang bijak, saya dapat menguji kelajuan. Sediakan skrip R untuk menjalankan pelayan Spark tempatan, dan kemudian memuatkan bingkai data Spark daripada storan kumpulan Parket yang ditentukan (bin). Saya cuba memuatkan semua data tetapi tidak dapat Sparklyr mengenali pembahagian.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Pelaksanaan mengambil masa 29,415 saat. Jauh lebih baik, tetapi tidak terlalu bagus untuk ujian besar-besaran apa-apa. Selain itu, saya tidak dapat mempercepatkan perkara dengan caching kerana apabila saya cuba mencache bingkai data dalam memori, Spark sentiasa ranap, walaupun apabila saya memperuntukkan lebih daripada 50 GB memori kepada set data yang beratnya kurang daripada 15.

Kembali ke AWK

Apa yang saya pelajari: Tatasusunan bersekutu dalam AWK sangat cekap.

Saya menyedari bahawa saya boleh mencapai kelajuan yang lebih tinggi. Saya ingat itu dalam indah Tutorial AWK oleh Bruce Barnett Saya membaca tentang ciri hebat yang dipanggil "tatasusunan bersekutu" Pada asasnya, ini adalah pasangan nilai kunci, yang atas sebab tertentu dipanggil secara berbeza dalam AWK, dan oleh itu saya entah bagaimana tidak terlalu memikirkannya. Roman Cheplyaka teringat bahawa istilah "tatasusunan bersekutu" jauh lebih tua daripada istilah "pasangan nilai kunci". Walaupun anda cari nilai kunci dalam Google Ngram, anda tidak akan melihat istilah ini di sana, tetapi anda akan menemui tatasusunan bersekutu! Di samping itu, "pasangan nilai kunci" paling kerap dikaitkan dengan pangkalan data, jadi lebih masuk akal untuk membandingkannya dengan peta cincang. Saya menyedari bahawa saya boleh menggunakan tatasusunan bersekutu ini untuk mengaitkan SNP saya dengan jadual tong dan data mentah tanpa menggunakan Spark.

Untuk melakukan ini, dalam skrip AWK saya menggunakan blok BEGIN. Ini ialah sekeping kod yang dilaksanakan sebelum baris pertama data dihantar ke badan utama skrip.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Pasukan while(getline...) memuatkan semua baris daripada kumpulan CSV (bin), tetapkan lajur pertama (nama SNP) sebagai kunci untuk tatasusunan bersekutu bin dan nilai kedua (kumpulan) sebagai nilai. Kemudian di blok { }, yang dilaksanakan pada semua baris fail utama, setiap baris dihantar ke fail output, yang menerima nama unik bergantung pada kumpulannya (bin): ..._bin_"bin[$1]"_....

Pembolehubah batch_num ΠΈ chunk_id memadankan data yang disediakan oleh saluran paip, mengelakkan keadaan perlumbaan dan setiap utas pelaksanaan berjalan parallel, menulis ke fail uniknya sendiri.

Memandangkan saya menyerakkan semua data mentah ke dalam folder pada kromosom yang tinggal daripada percubaan saya sebelum ini dengan AWK, kini saya boleh menulis skrip Bash lain untuk memproses satu kromosom pada satu masa dan menghantar data terbahagi yang lebih mendalam ke S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skrip mempunyai dua bahagian parallel.

Dalam bahagian pertama, data dibaca daripada semua fail yang mengandungi maklumat mengenai kromosom yang dikehendaki, kemudian data ini diedarkan merentasi benang, yang mengedarkan fail ke dalam kumpulan yang sesuai (bin). Untuk mengelakkan keadaan perlumbaan apabila beberapa utas menulis ke fail yang sama, AWK menghantar nama fail untuk menulis data ke tempat yang berbeza, mis. chr_10_bin_52_batch_2_aa.csv. Akibatnya, banyak fail kecil dibuat pada cakera (untuk ini saya menggunakan volum EBS terabait).

Penghantar dari bahagian kedua parallel melalui kumpulan (bin) dan menggabungkan fail individu mereka ke dalam CSV biasa c catdan kemudian menghantarnya untuk dieksport.

Bersiaran di R?

Apa yang saya pelajari: Anda boleh menghubungi stdin ΠΈ stdout daripada skrip R, dan oleh itu gunakannya dalam perancangan.

Anda mungkin perasan baris ini dalam skrip Bash anda: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ia menterjemah semua fail kumpulan bercantum (bin) ke dalam skrip R di bawah. {} adalah teknik khas parallel, yang memasukkan sebarang data yang dihantar ke aliran yang ditentukan terus ke dalam arahan itu sendiri. Pilihan {#} menyediakan ID urutan unik, dan {%} mewakili nombor slot kerja (berulang, tetapi tidak pernah serentak). Senarai semua pilihan boleh didapati dalam dokumentasi.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Apabila pembolehubah file("stdin") dihantar ke readr::read_csv, data yang diterjemahkan ke dalam skrip R dimuatkan ke dalam bingkai, yang kemudiannya dalam bentuk .rds-fail menggunakan aws.s3 ditulis terus kepada S3.

RDS adalah sesuatu seperti versi junior Parket, tanpa tambahan storan pembesar suara.

Selepas menamatkan skrip Bash saya mendapat satu berkas .rds-fail yang terletak di S3, yang membolehkan saya menggunakan jenis pemampatan dan terbina dalam yang cekap.

Walaupun menggunakan brek R, semuanya berfungsi dengan cepat. Tidak menghairankan, bahagian R yang membaca dan menulis data sangat dioptimumkan. Selepas menguji satu kromosom bersaiz sederhana, kerja itu selesai pada contoh C5n.4xl dalam masa kira-kira dua jam.

Batasan S3

Apa yang saya pelajari: Terima kasih kepada pelaksanaan laluan pintar, S3 boleh mengendalikan banyak fail.

Saya bimbang sama ada S3 akan dapat mengendalikan banyak fail yang dipindahkan kepadanya. Saya boleh membuat nama fail masuk akal, tetapi bagaimanakah S3 akan mencarinya?

Menghuraikan 25TB menggunakan AWK dan R
Folder dalam S3 hanya untuk pertunjukan, sebenarnya sistem tidak berminat dengan simbol tersebut /. Dari halaman Soalan Lazim S3.

Nampaknya S3 mewakili laluan ke fail tertentu sebagai kunci mudah dalam sejenis jadual cincang atau pangkalan data berasaskan dokumen. Baldi boleh dianggap sebagai jadual, dan fail boleh dianggap sebagai rekod dalam jadual itu.

Memandangkan kelajuan dan kecekapan adalah penting untuk mengaut keuntungan di Amazon, tidak hairanlah bahawa sistem laluan utama sebagai fail ini dioptimumkan. Saya cuba mencari keseimbangan: supaya saya tidak perlu membuat banyak permintaan, tetapi permintaan itu dilaksanakan dengan cepat. Ternyata yang terbaik adalah membuat kira-kira 20 ribu fail bin. Saya fikir jika kita terus mengoptimumkan, kita boleh mencapai peningkatan dalam kelajuan (contohnya, membuat baldi khas hanya untuk data, sekali gus mengurangkan saiz jadual carian). Tetapi tidak ada masa atau wang untuk eksperimen selanjutnya.

Bagaimana pula dengan keserasian silang?

Apa yang Saya Belajar: Punca nombor satu pembaziran masa ialah mengoptimumkan kaedah storan anda lebih awal.

Pada ketika ini, adalah sangat penting untuk bertanya kepada diri sendiri: "Mengapa menggunakan format fail proprietari?" Sebabnya terletak pada kelajuan pemuatan (fail CSV gzip mengambil masa 7 kali lebih lama untuk dimuatkan) dan keserasian dengan aliran kerja kami. Saya mungkin mempertimbangkan semula jika R boleh memuatkan fail Parket (atau Arrow) dengan mudah tanpa beban Spark. Semua orang dalam makmal kami menggunakan R, dan jika saya perlu menukar data kepada format lain, saya masih mempunyai data teks asal, jadi saya boleh menjalankan saluran paip sekali lagi.

Pembahagian kerja

Apa yang saya pelajari: Jangan cuba mengoptimumkan kerja secara manual, biarkan komputer melakukannya.

Saya telah menyahpepijat aliran kerja pada satu kromosom, sekarang saya perlu memproses semua data lain.
Saya ingin menaikkan beberapa contoh EC2 untuk penukaran, tetapi pada masa yang sama saya takut mendapat beban yang sangat tidak seimbang merentas kerja pemprosesan yang berbeza (sama seperti Spark mengalami sekatan tidak seimbang). Di samping itu, saya tidak berminat untuk menaikkan satu kejadian setiap kromosom, kerana untuk akaun AWS terdapat had lalai sebanyak 10 kejadian.

Kemudian saya memutuskan untuk menulis skrip dalam R untuk mengoptimumkan kerja pemprosesan.

Mula-mula, saya meminta S3 untuk mengira berapa banyak ruang simpanan yang diduduki setiap kromosom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Kemudian saya menulis fungsi yang mengambil jumlah saiz, mengocok susunan kromosom, membahagikannya kepada kumpulan num_jobs dan memberitahu anda betapa berbezanya saiz semua kerja pemprosesan.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Γ— 3]>

Kemudian saya berlari melalui seribu shuffle menggunakan purrr dan memilih yang terbaik.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Jadi saya berakhir dengan satu set tugasan yang sangat serupa dari segi saiz. Kemudian yang tinggal hanyalah membungkus skrip Bash saya yang terdahulu dalam gelung besar for. Pengoptimuman ini mengambil masa kira-kira 10 minit untuk menulis. Dan ini adalah lebih sedikit daripada yang saya belanjakan untuk membuat tugasan secara manual jika tugasan itu tidak seimbang. Oleh itu, saya fikir saya betul dengan pengoptimuman awal ini.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Pada akhirnya saya menambah arahan penutupan:

sudo shutdown -h now

... dan semuanya berjaya! Menggunakan AWS CLI, saya membangkitkan kejadian menggunakan pilihan user_data memberi mereka skrip Bash tentang tugas mereka untuk diproses. Mereka berjalan dan ditutup secara automatik, jadi saya tidak membayar untuk kuasa pemprosesan tambahan.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Jom berkemas!

Apa yang saya pelajari: API hendaklah ringkas demi kemudahan dan fleksibiliti penggunaan.

Akhirnya saya mendapat data di tempat dan borang yang betul. Yang tinggal hanyalah untuk memudahkan proses penggunaan data sebanyak mungkin untuk memudahkan rakan sekerja saya. Saya mahu membuat API mudah untuk membuat permintaan. Jika pada masa akan datang saya memutuskan untuk beralih daripada .rds kepada fail Parket, maka ini sepatutnya menjadi masalah untuk saya, bukan untuk rakan sekerja saya. Untuk ini saya memutuskan untuk membuat pakej R dalaman.

Bina dan dokumenkan pakej yang sangat mudah yang mengandungi hanya beberapa fungsi capaian data yang dianjurkan di sekeliling fungsi get_snp. Saya juga membuat laman web untuk rakan sekerja saya pkgdown, supaya mereka boleh melihat contoh dan dokumentasi dengan mudah.

Menghuraikan 25TB menggunakan AWK dan R

Caching pintar

Apa yang saya pelajari: Jika data anda disediakan dengan baik, caching akan menjadi mudah!

Memandangkan salah satu aliran kerja utama menggunakan model analisis yang sama pada pakej SNP, saya memutuskan untuk menggunakan binning untuk kelebihan saya. Apabila menghantar data melalui SNP, semua maklumat daripada kumpulan (bin) dilampirkan pada objek yang dikembalikan. Maksudnya, pertanyaan lama boleh (secara teori) mempercepatkan pemprosesan pertanyaan baharu.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Semasa membina pakej, saya menjalankan banyak penanda aras untuk membandingkan kelajuan apabila menggunakan kaedah yang berbeza. Saya mengesyorkan untuk tidak mengabaikan ini, kerana kadang-kadang hasilnya tidak dijangka. Sebagai contoh, dplyr::filter adalah lebih pantas daripada menangkap baris menggunakan penapisan berasaskan pengindeksan, dan mendapatkan semula satu lajur daripada bingkai data yang ditapis adalah lebih pantas daripada menggunakan sintaks pengindeksan.

Sila ambil perhatian bahawa objek prev_snp_results mengandungi kunci snps_in_bin. Ini ialah tatasusunan semua SNP unik dalam kumpulan (bin), membolehkan anda menyemak dengan cepat sama ada anda sudah mempunyai data daripada pertanyaan sebelumnya. Ia juga memudahkan untuk menggelungkan semua SNP dalam kumpulan (bin) dengan kod ini:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Penemuan

Kini kami boleh (dan telah mula serius) menjalankan model dan senario yang sebelum ini tidak boleh diakses oleh kami. Perkara yang terbaik ialah rakan-rakan makmal saya tidak perlu memikirkan sebarang komplikasi. Mereka hanya mempunyai fungsi yang berfungsi.

Dan walaupun pakej itu memberi mereka butiran, saya cuba membuat format data cukup mudah sehingga mereka dapat mengetahuinya jika saya tiba-tiba hilang esok...

Kelajuan telah meningkat dengan ketara. Kami biasanya mengimbas serpihan genom yang penting dari segi fungsi. Sebelum ini, kami tidak dapat melakukan ini (ternyata terlalu mahal), tetapi sekarang, terima kasih kepada struktur kumpulan (bin) dan caching, permintaan untuk satu SNP mengambil purata kurang daripada 0,1 saat, dan penggunaan data begitu rendah bahawa kos untuk S3 adalah kacang tanah.

Kesimpulan

Artikel ini bukan panduan sama sekali. Penyelesaiannya ternyata individu, dan hampir pasti tidak optimum. Sebaliknya, ia adalah travelog. Saya mahu orang lain memahami bahawa keputusan sedemikian tidak muncul sepenuhnya di kepala, ia adalah hasil percubaan dan kesilapan. Selain itu, jika anda mencari saintis data, perlu diingat bahawa menggunakan alat ini dengan berkesan memerlukan pengalaman dan pengalaman memerlukan wang. Saya gembira kerana saya mempunyai kemampuan untuk membayar, tetapi ramai lagi yang boleh melakukan kerja yang sama lebih baik daripada saya tidak akan mendapat peluang kerana kekurangan wang untuk mencuba.

Π˜Π½ΡΡ‚Ρ€ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ для Π±ΠΎΠ»ΡŒΡˆΠΈΡ… Π΄Π°Π½Π½Ρ‹Ρ… ΡƒΠ½ΠΈΠ²Π΅Ρ€ΡΠ°Π»ΡŒΠ½Ρ‹. Если Ρƒ вас Π΅ΡΡ‚ΡŒ врСмя, Ρ‚ΠΎ ΠΏΠΎΡ‡Ρ‚ΠΈ навСрняка смоТСтС Π½Π°ΠΏΠΈΡΠ°Ρ‚ΡŒ Π±ΠΎΠ»Π΅Π΅ быстроС Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅, ΠΏΡ€ΠΈΠΌΠ΅Π½ΠΈΠ² Β«ΡƒΠΌΠ½ΡƒΡŽΒ» очистку Π΄Π°Π½Π½Ρ‹Ρ…, Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅ ΠΈ ΠΌΠ΅Ρ‚ΠΎΠ΄ΠΈΠΊΠΈ извлСчСния. Π’ ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎΠΌ счСтС всС сводится ΠΊ Π°Π½Π°Π»ΠΈΠ·Ρƒ расходов ΠΈ Π²Ρ‹Π³ΠΎΠ΄.

Apa yang saya pelajari:

  • tidak ada cara murah untuk menghuraikan 25 TB pada satu masa;
  • berhati-hati dengan saiz fail Parket anda dan organisasinya;
  • Pembahagian dalam Spark mestilah seimbang;
  • Secara umum, jangan sekali-kali cuba membuat 2,5 juta partition;
  • Menyusun masih sukar, seperti menyediakan Spark;
  • kadangkala data khas memerlukan penyelesaian khas;
  • Pengagregatan percikan adalah pantas, tetapi pembahagian masih mahal;
  • jangan tidur apabila mereka mengajar anda perkara asas, seseorang mungkin telah menyelesaikan masalah anda pada tahun 1980-an;
  • gnu parallel - ini adalah perkara ajaib, semua orang harus menggunakannya;
  • Spark suka data tidak dimampatkan dan tidak suka menggabungkan partition;
  • Spark mempunyai terlalu banyak overhed apabila menyelesaikan masalah mudah;
  • Tatasusunan bersekutu AWK sangat cekap;
  • anda boleh hubungi stdin ΠΈ stdout daripada skrip R, dan oleh itu gunakannya dalam perancangan;
  • Terima kasih kepada pelaksanaan laluan pintar, S3 boleh memproses banyak fail;
  • Sebab utama untuk membuang masa adalah mengoptimumkan kaedah penyimpanan anda secara pramatang;
  • jangan cuba mengoptimumkan tugas secara manual, biarkan komputer melakukannya;
  • API hendaklah ringkas demi kemudahan dan fleksibiliti penggunaan;
  • Jika data anda disediakan dengan baik, caching akan menjadi mudah!

Sumber: www.habr.com

Tambah komen