我最近的任務是建立一個處理大量原始 DNA 序列(技術上是 SNP 晶片)的流程。需要快速獲取有關給定遺傳位置(稱為 SNP)的數據,以用於後續建模和其他任務。使用 R 和 AWK,我能夠以自然的方式清理和組織數據,從而大大加快查詢處理速度。這對我來說並不容易,需要多次迭代。這篇文章將幫助您避免我的一些錯誤,並向您展示我的最終結果。
首先,一些介紹性的解釋。
數據
我們大學的遺傳資訊處理中心為我們提供了25TB TSV形式的數據。我收到的檔案被分成 5 個包,透過 Gzip 壓縮,每個包包含大約 240 個 2,5GB 的檔案。每一行包含來自一個人的一個 SNP 的資料。總共傳輸了約 60 萬個 SNP 和約 30 萬人的資料。除了 SNP 資訊外,文件還包含許多列,其中的數字反映了各種特徵,例如讀取強度、不同等位基因的頻率等。總共大約有 XNUMX 個具有唯一值的欄位。
目標
與任何資料管理專案一樣,最重要的是確定如何使用資料。在這種情況下 我們主要會根據SNP選擇SNP的模式和工作流程。也就是說,我們一次只需要一個 SNP 的資料。我必須學習如何盡可能輕鬆、快速且廉價地檢索與 2,5 萬個 SNP 之一相關的所有記錄。
然後我就想到了在染色體上劃分資料的想法。其中有 23 個(如果考慮粒線體 DNA 和未映射的區域,還有更多)。
這將允許您將資料分割成更小的區塊。如果只在 Glue 腳本中的 Spark 匯出函數中新增一行 partition_by = "chr", то данные должны быть разложены по бакетам (buckets).
基因組由許多稱為染色體的片段組成。
不幸的是,它沒有起作用。染色體有不同的大小,這意味著不同的資訊量。這意味著 Spark 發送給工作人員的任務不平衡並且完成緩慢,因為某些節點提前完成並處於空閒狀態。不過,任務還是完成了。但當要求取得一個 SNP 時,這種不平衡再次引發了問題。在較大染色體(即我們想要獲取數據的位置)上處理 SNP 的成本僅降低了約 10 倍。很多,但還不夠。
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe
"awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
我意識到我可以達到更高的速度。我記得在一次美妙的 Bruce Barnett 的 AWK 教程 我讀到了一個很酷的功能,叫做“關聯數組」本質上,這些是鍵值對,由於某種原因,它們在 AWK 中的呼叫方式不同,因此我對它們沒有多想。 羅曼·切普利亞卡 回想起術語“關聯數組”比術語“鍵值對”要古老得多。即使你 在 Google Ngram 中尋找鍵值,你不會在那裡看到這個術語,但你會發現關聯數組!此外,「鍵值對」通常與資料庫相關聯,因此將其與雜湊圖進行比較更有意義。我意識到我可以使用這些關聯數組將我的 SNP 與 bin 表和原始資料關聯起來,而無需使用 Spark。
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
由於速度和效率對於亞馬遜的盈利非常重要,因此這種「密鑰即檔案路徑」系統得到了極大的優化也就不足為奇了。我試著找一個平衡點:這樣我就不必發出大量的 get 請求,但又可以快速執行請求。事實證明,最好製作20萬個左右的bin文件。我認為如果我們繼續優化,我們可以實現速度的提高(例如,為資料製作一個特殊的桶,從而減少查找表的大小)。但沒有時間或金錢進行進一步的實驗。
由於主要工作流程之一將相同的分析模型應用於 SNP 包,因此我決定使用分箱來發揮我的優勢。透過 SNP 傳輸資料時,群組 (bin) 中的所有資訊都會附加到傳回的物件中。也就是說,舊查詢(理論上)可以加快新查詢的處理速度。
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
請注意該對象 prev_snp_results 包含密鑰 snps_in_bin。這是一個群組 (bin) 中所有唯一 SNP 的數組,可讓您快速檢查是否已擁有先前查詢的資料。它還可以輕鬆地使用以下程式碼循環遍歷組(bin)中的所有 SNP:
# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}