使用 AWK 和 R 解析 25TB

使用 AWK 和 R 解析 25TB
如何閱讀這篇文章:對於文字如此長和混亂,我深表歉意。為了節省您的時間,我在每一章的開頭都會介紹“我學到了什麼”,用一兩句話總結了本章的精髓。

“請告訴我解決方案!” 如果你只是想知道我從哪裡來,那麼請跳到「變得更有創意」一章,但我認為閱讀有關失敗的內容更有趣、更有用。

我最近的任務是建立一個處理大量原始 DNA 序列(技術上是 SNP 晶片)的流程。需要快速獲取有關給定遺傳位置(稱為 SNP)的數據,以用於後續建模和其他任務。使用 R 和 AWK,我能夠以自然的方式清理和組織數據,從而大大加快查詢處理速度。這對我來說並不容易,需要多次迭代。這篇文章將幫助您避免我的一些錯誤,並向您展示我的最終結果。

首先,一些介紹性的解釋。

數據

我們大學的遺傳資訊處理中心為我們提供了25TB TSV形式的數據。我收到的檔案被分成 5 個包,透過 Gzip 壓縮,每個包包含大約 240 個 2,5GB 的檔案。每一行包含來自一個人的一個 SNP 的資料。總共傳輸了約 60 萬個 SNP 和約 30 萬人的資料。除了 SNP 資訊外,文件還包含許多列,其中的數字反映了各種特徵,例如讀取強度、不同等位基因的頻率等。總共大約有 XNUMX 個具有唯一值的欄位。

目標

與任何資料管理專案一樣,最重要的是確定如何使用資料。在這種情況下 我們主要會根據SNP選擇SNP的模式和工作流程。也就是說,我們一次只需要一個 SNP 的資料。我必須學習如何盡可能輕鬆、快速且廉價地檢索與 2,5 萬個 SNP 之一相關的所有記錄。

如何不這樣做

引用一句合適的陳腔濫調:

我並沒有失敗一千次,我只是發現了一千種方法來避免以查詢友好的格式解析一堆資料。

第一次嘗試

我學到了什麼:沒有便宜的方法可以一次解析 25 TB。

在范德比爾特大學學習了「大數據處理高級方法」課程後,我確信這個竅門已經掌握了。設定 Hive 伺服器來運行所有數據並報告結果可能需要一兩個小時。由於我們的資料儲存在AWS S3中,因此我使用了該服務 雅典娜,它允許您將 Hive SQL 查詢應用於 S3 資料。您不需要設定/啟動 Hive 集群,而且您也只需為您要尋找的數據付費。

После того как я показал Athena свои данные и их формат, я прогнал несколько тестов с подобными запросами:

select * from intensityData limit 10;

並很快收到了結構良好的結果。準備好。

直到我們嘗試在工作中使用這些數據...

我被要求提取所有 SNP 資訊來測試模型。我運行了查詢:


select * from intensityData 
where snp = 'rs123456';

……然後開始等待。經過 4 分鐘和超過 5 TB 的請求數據,我收到了結果。 Athena 按發現的資料量收費,每 TB 20 美元。因此,這個單一請求花費了 38 美元和 50 分鐘的等待時間。為了在所有數據上運行模型,我們等待了XNUMX年並支付了XNUMX萬美元,顯然這不適合我們。

有必要使用Parquet...

我學到了什麼:請注意 Parquet 檔案的大小及其組織。

我首先嘗試透過將所有 TSV 轉換為 鑲木地板文件。它們很方便處理大型資料集,因為其中的資訊以列形式儲存:每一列都位於自己的記憶體/磁碟區中,這與文字檔案不同,文字檔案中的行包含每列的元素。如果您需要尋找某些內容,只需閱讀所需的列即可。另外,每個文件在列中儲存一定範圍的值,因此如果您要尋找的值不在該列的範圍內,Spark 不會浪費時間掃描整個文件。

我運行了一個簡單的任務 AWS膠水 將我們的 TSV 轉換為 Parquet 並將新檔案放入 Athena。大約花了5個小時。但當我運行該請求時,完成該請求所需的時間大約相同,但花費的資金卻少了一些。事實上,Spark 試圖優化該任務,只是簡單地解壓縮了一個 TSV 區塊並將其放入自己的 Parquet 區塊中。而且由於每個區塊都足夠大,可以容納許多人的全部記錄,因此每個文件都包含所有 SNP,因此 Spark 必須打開所有文件才能提取所需的資訊。

有趣的是,Parquet 的預設(也是建議的)壓縮類型 snappy 是不可分割的。因此,每個執行器都專注於解壓縮和下載完整的 3,5 GB 資料集的任務。

使用 AWK 和 R 解析 25TB

讓我們了解一下問題

我學到了什麼:排序很困難,尤其是資料是分散式的。

我覺得現在我明白問題的本質了。我只需要按 SNP 列對資料進行排序,而不是按人排序。然後幾個SNP將儲存在一個單獨的資料區塊中,然後Parquet的「僅當值在範圍內時才開啟」的「智慧」功能將顯示出它的全部榮耀。不幸的是,對分散在叢集中的數十億行進行排序被證明是一項艱鉅的任務。

AWS 絕對不想因為「我是一個注意力不集中的學生」的原因而退款。我在 Amazon Glue 上運行排序後,它運行了 2 天然後崩潰了。

那分區呢?

我學到了什麼:Spark中的分區必須是平衡的。

然後我就想到了在染色體上劃分資料的想法。其中有 23 個(如果考慮粒線體 DNA 和未映射的區域,還有更多)。
這將允許您將資料分割成更小的區塊。如果只在 Glue 腳本中的 Spark 匯出函數中新增一行 partition_by = "chr", то данные должны быть разложены по бакетам (buckets).

使用 AWK 和 R 解析 25TB
基因組由許多稱為染色體的片段組成。

不幸的是,它沒有起作用。染色體有不同的大小,這意味著不同的資訊量。這意味著 Spark 發送給工作人員的任務不平衡並且完成緩慢,因為某些節點提前完成並處於空閒狀態。不過,任務還是完成了。但當要求取得一個 SNP 時,這種不平衡再次引發了問題。在較大染色體(即我們想要獲取數據的位置)上處理 SNP 的成本僅降低了約 10 倍。很多,但還不夠。

如果我們把它分成更小的部分呢?

我學到了什麼:根本不要嘗試進行 2,5 萬個分區。

我決定全力以赴,對每個SNP進行分區。這確保了分區的大小相同。 這是一個壞主意。我用了膠水並添加了一條無辜的線 partition_by = 'snp'。任務開始並開始執行。一天後我檢查發現S3仍然沒有任何寫入,所以我終止了該任務。看起來 Glue 正在將中間文件寫入 S3 中的隱藏位置,很多文件,可能有幾百萬個。結果,我的錯誤造成了一千多美元的損失,而且我的導師也不高興。

分區+排序

我學到了什麼:排序仍然很困難,調優 Spark 也很困難。

我上次的分區嘗試涉及對染色體進行分區,然後對每個分區進行排序。理論上,這會加快每個查詢的速度,因為所需的 SNP 資料必須位於給定範圍內的幾個 Parquet 區塊內。不幸的是,即使對分區資料進行排序也是一項艱鉅的任務。因此,我轉而使用 EMR 來建立自訂集群,並使用八個功能強大的實例 (C5.4xl) 和 Sparklyr 來創建更靈活的工作流程...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

……然而,任務還沒完成。我用不同的方式配置了它:增加每個查詢執行器的記憶體分配,使用記憶體量大的節點,使用廣播變數(broadcastingvariables),但每次這些結果都是半途而廢,逐漸執行器開始失敗,直到一切都停止。

我變得更有創意

我學到了什麼:有時特殊的數據需要特殊的解決方案。

每個 SNP 都有一個位置值。這是一個對應於其染色體上的鹼基數量的數字。這是組織數據的一種很好且自然的方式。起初我想按每個染色體的區域進行分區。例如,位置 1 - 2000、2001 - 4000 等。但問題是 SNP 並不是均勻分佈在染色體上,因此群體大小會大不相同。

使用 AWK 和 R 解析 25TB

結果,我將職位細分為類別(排名)。使用已經下載的數據,我發出了獲取唯一 SNP、它們的位置和染色體清單的請求。然後,我對每條染色體內的數據進行排序,並將 SNP 收集到給定大小的組(箱)中。假設每個有 1000 個 SNP。這給了我 SNP 與每個染色體組的關係。

最後,我製作了75個SNP的組(bin),原因將在下面解釋。

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

第一次嘗試使用 Spark

我學到了什麼:Spark聚合很快,但分區仍然很昂貴。

我想將這個小(2,5 萬行)資料幀讀入 Spark,將其與原始資料合併,然後按新新增的列對其進行分區 bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

我用了 sdf_broadcast(),因此 Spark 知道它應該將資料幀發送到所有節點。如果資料量較小且所有任務都需要,這非常有用。否則,Spark 會嘗試變得智慧並根據需要分發數據,這可能會導致速度變慢。

再說一次,我的想法不起作用:任務工作了一段時間,完成了聯合,然後,就像分區啟動的執行器一樣,它們開始失敗。

新增AWK

我學到了什麼:在教授基礎知識時不要睡覺。當然,早在 1980 世紀 XNUMX 年代就有人已經解決了您的問題。

到目前為止,我所有 Spark 失敗的原因都是叢集中的資料混亂。也許透過預處理可以改善這種情況。我決定嘗試將原始文字資料拆分為染色體列,希望為 Spark 提供「預分區」資料。

我在StackOverflow上搜尋如何按列值拆分,發現 這麼好的答案。 使用 AWK,您可以透過將文字文件寫入腳本中來按列值拆分文字文件,而不是將結果傳送到 stdout.

我寫了一個 Bash 腳本來嘗試。下載了其中一個打包的 TSV,然後使用 gzip 並發送至 awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

有效!

填充核心

我學到了什麼: gnu parallel - 這是一個神奇的東西,每個人都應該使用它。

分離的過程非常緩慢,當我開始時 htop為了檢查功能強大(且昂貴)的 EC2 執行個體的使用情況,結果發現我只使用了一個核心和大約 200 MB 的記憶體。為了解決問題並且不損失很多錢,我們必須弄清楚如何並行化工作。幸運的是,在一本絕對精彩的書中 命令行中的數據科學 我找到了 Jeron Janssens 寫的關於平行化的一章。從中我了解到 gnu parallel,一種在 Unix 中實作多執行緒的非常靈活的方法。

使用 AWK 和 R 解析 25TB
當我開始使用新進程進行分割時,一切都很好,但仍然存在瓶頸 - 將 S3 物件下載到磁碟的速度不是很快,並且沒有完全並行化。為了解決這個問題,我這樣做了:

  1. 我發現可以直接在管道中實現 S3 下載階段,完全消除磁碟上的中間儲存。這意味著我可以避免將原始資料寫入磁碟,並在 AWS 上使用更小、因此更便宜的儲存。
  2. 團隊 aws configure set default.s3.max_concurrent_requests 50 大幅增加了 AWS CLI 使用的執行緒數量(預設有 10 個)。
  3. 我切換到針對網路速度進行了最佳化的 EC2 實例,名稱中帶有字母 n。我發現使用 n 實例時處理能力的損失可以透過載入速度的提高來補償。對於大多數任務,我使用 c5n.4xl。
  4. 改變了 gzippigz,這是一個 gzip 工具,可以做很酷的事情來並行化最初非並行的解壓縮檔案任務(這幫助最少)。

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

這些步驟相互結合,可以讓一切工作非常快速。透過提高下載速度並消除磁碟寫入,我現在可以在短短幾個小時內處理 5 TB 的套件。

這則推文應該要提到「TSV」。唉。

使用新解析的數據

我學到了什麼:Spark喜歡未壓縮的數據,不喜歡合併分割區。

現在資料以未打包(讀取:共享)和半組織格式儲存在 S3 中,我可以再次返回 Spark。等待我的是一個驚喜:我再次未能達到我想要的目標!很難準確地告訴 Spark 資料是如何分區的。即使當我這樣做時,結果發現分區太多(95),當我使用 coalesce 將它們的數量減少到合理的限度,這破壞了我的分區。我確信這個問題可以解決,但經過幾天的搜索,我找不到解決方案。我最終完成了 Spark 中的所有任務,儘管這花了一些時間,而且我分割的 Parquet 檔案也不是很小(~200 KB)。然而,數據正是需要的地方。

使用 AWK 和 R 解析 25TB
太小了而且不均勻,太棒了!

測試本地 Spark 查詢

我學到了什麼:Spark 在解決簡單問題時開銷太大。

透過以巧妙的格式下載數據,我能夠測試速度。設定一個 R 腳本來執行本地 Spark 伺服器,然後從指定的 Parquet 群組儲存(bin)載入 Spark 資料幀。我嘗試加載所有數據,但無法讓 Sparklyr 識別分區。

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

執行耗時 29,415 秒。好多了,但對於任何東西的大規模測試來說都不太好。此外,我無法透過快取來加快速度,因為當我嘗試在記憶體中快取資料幀時,Spark 總是崩潰,即使我為重量小於 50 的資料集分配了超過 15 GB 的記憶體。

返回 AWK

我學到了什麼:AWK 中的關聯數組非常有效率。

我意識到我可以達到更高的速度。我記得在一次美妙的 Bruce Barnett 的 AWK 教程 我讀到了一個很酷的功能,叫做“關聯數組」本質上,這些是鍵值對,由於某種原因,它們在 AWK 中的呼叫方式不同,因此我對它們沒有多想。 羅曼·切普利亞卡 回想起術語“關聯數組”比術語“鍵值對”要古老得多。即使你 在 Google Ngram 中尋找鍵值,你不會在那裡看到這個術語,但你會發現關聯數組!此外,「鍵值對」通常與資料庫相關聯,因此將其與雜湊圖​​進行比較更有意義。我意識到我可以使用這些關聯數組將我的 SNP 與 bin 表和原始資料關聯起來,而無需使用 Spark。

為此,我在 AWK 腳本中使用了區塊 BEGIN。這是在第一行資料傳遞到腳本主體之前執行的一段程式碼。

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

團隊 while(getline...) 載入 CSV 群組 (bin) 中的所有行,將第一列(SNP 名稱)設定為關聯數組的鍵 bin 並將第二個值(組)作為值。然後在區塊中 { },在主文件的所有行上執行,每一行都被發送到輸出文件,該文件根據其群組(bin)接收唯一的名稱: ..._bin_"bin[$1]"_....

變量 batch_num и chunk_id 匹配管道提供的數據,避免競爭條件,並且每個執行緒都在運行 parallel,寫入自己獨特的文件。

由於我將所有原始資料分散到先前使用 AWK 進行實驗時留下的染色體上的資料夾中,現在我可以編寫另一個 Bash 腳本來一次處理一個染色體,並將更深層的分區資料傳送到 S3。

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

該腳本有兩個部分 parallel.

在第一部分中,從包含所需染色體資訊的所有檔案中讀取數據,然後將該數據分佈在線程中,線程將檔案分佈到適當的群組(bin)中。為了避免多個執行緒寫入相同檔案時出現競爭情況,AWK 傳遞檔案名稱以將資料寫入不同的位置,例如 chr_10_bin_52_batch_2_aa.csv。結果,在磁碟上建立了許多小檔案(為此我使用了 TB 級 EBS 磁碟區)。

第二段輸送機 parallel 遍歷組 (bin) 並將其各個文件合併到通用 CSV c 中 cat然後將它們發送出去。

用 R 廣播?

我學到了什麼: 可以聯繫 stdin и stdout 來自 R 腳本,因此在管道中使用它。

您可能已經注意到 Bash 腳本中的這一行: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R...。它將所有串聯的群組檔案 (bin) 轉換為下面的 R 腳本。 {} 是一種特殊的技術 parallel,它將發送到指定流的任何資料直接插入命令本身。選項 {#} 提供唯一的線程ID,並且 {%} 代表作業槽號(重複,但絕不同時)。所有選項的清單可以在以下位置找到 文檔。

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

當一個變數 file("stdin") 傳送至 readr::read_csv,翻譯成 R 腳本的資料被載入到一個框架中,然後採用以下形式 .rds- 檔案使用 aws.s3 直接寫入S3。

RDS 類似於 Parquet 的初級版本,沒有揚聲器存儲的裝飾。

完成 Bash 腳本後我得到了一個包 .rds- 位於 S3 中的文件,這使我能夠使用高效的壓縮和內建類型。

儘管使用了R煞車,一切都進展得很快。毫不奇怪,R 中讀取和寫入資料的部分經過了高度最佳化。在對一條中等大小的染色體進行測試後,該工作在 C5n.4xl 實例上在大約兩個小時內完成。

S3 的限制

我學到了什麼:得益於智慧路徑實施,S3 可以處理許多文件。

我擔心 S3 是否能夠處理傳輸到它的大量檔案。我可以使檔案名稱有意義,但 S3 將如何找到它們?

使用 AWK 和 R 解析 25TB
S3中的資料夾只是為了顯示,實際上系統對符號不感興趣 /. 來自 S3 常見問題頁面。

看起來,S3 將特定文件的路徑表示為某種哈希表或基於文件的資料庫中的簡單鍵。儲存桶可以被視為一張表,檔案可以被認為是該表中的記錄。

由於速度和效率對於亞馬遜的盈利非常重要,因此這種「密鑰即檔案路徑」系統得到了極大的優化也就不足為奇了。我試著找一個平衡點:這樣我就不必發出大量的 get 請求,但又可以快速執行請求。事實證明,最好製作20萬個左右的bin文件。我認為如果我們繼續優化,我們可以實現速度的提高(例如,為資料製作一個特殊的桶,從而減少查找表的大小)。但沒有時間或金錢進行進一步的實驗。

交叉相容性怎麼樣?

我學到的知識:浪費時間的首要原因是過早優化儲存方法。

此時,問自己非常重要:“為什麼使用專有文件格式?”原因在於載入速度(gzip 壓縮的 CSV 檔案載入時間延長了 7 倍)以及與我們工作流程的兼容性。我可能會重新考慮 R 是否可以在沒有 Spark 載入的情況下輕鬆載入 Parquet(或 Arrow)檔案。我們實驗室中的每個人都使用 R,如果我需要將數據轉換為另一種格式,我仍然有原始文字數據,因此我可以再次運行管道。

分工

我學到了什麼:不要嘗試手動最佳化作業,讓計算機來完成。

我已經在一條染色體上調試了工作流程,現在我需要處理所有其他數據。
我想提高幾個 EC2 執行個體進行轉換,但同時我擔心不同處理作業之間的負載非常不平衡(就像 Spark 遭受不平衡分區的困擾一樣)。此外,我對每個染色體增加一個實例不感興趣,因為對於 AWS 帳戶來說,預設限制為 10 個實例。

然後我決定用 R 寫一個腳本來優化處理作業。

首先,我請S3計算每條染色體佔用了多少儲存空間。

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

然後我編寫了一個函數,它會取得總大小,打亂染色體的順序,將它們分組 num_jobs 並告訴您所有加工作業的尺寸有何不同。

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

然後我使用 purrr 進行了一千次洗牌,並選擇了最好的。

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

所以我最後完成了一組大小非常相似的任務。然後剩下的就是將我之前的 Bash 腳本包裝在一個大循環中 for。這個優化寫了大概10分鐘。如果任務不平衡,這比我手動建立任務的花費要少得多。因此,我認為我的初步優化是正確的。

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

最後我加入關閉命令:

sudo shutdown -h now

....一切順利!使用 AWS CLI,我使用以下選項引發了實例 user_data 給他們一個任務的 Bash 腳本進行處理。它們會自動運行和關閉,因此我無需為額外的處理能力付費。

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

我們打包吧!

我學到了什麼:為了使用的便利性和靈活性,API應該簡單。

最後我得到了正確位置和形式的數據。剩下的就是盡可能簡化使用資料的流程,讓我的同事更容易使用。我想製作一個簡單的 API 來建立請求。如果將來我決定從 .rds 到 Parquet 文件,那麼這應該是我的問題,而不是我的同事的問題。為此,我決定製作一個內部 R 包。

建立並記錄一個非常簡單的包,其中僅包含一些圍繞函數組織的資料存取函數 get_snp。我還為同事做了一個網站 pkgdown,以便他們可以輕鬆查看範例和文件。

使用 AWK 和 R 解析 25TB

智慧型快取

我學到了什麼:如果你的資料準備充分,快取就會很容易!

由於主要工作流程之一將相同的分析模型應用於 SNP 包,因此我決定使用分箱來發揮我的優勢。透過 SNP 傳輸資料時,群組 (bin) 中的所有資訊都會附加到傳回的物件中。也就是說,舊查詢(理論上)可以加快新查詢的處理速度。

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

在建置套件時,我運行了許多基準測試來比較使用不同方法時的速度。我建議不要忽視這一點,因為有時結果是意想不到的。例如, dplyr::filter 比使用基於索引的過濾捕獲行要快得多,並且從過濾後的資料幀中檢索單個列比使用索引語法要快得多。

請注意該對象 prev_snp_results 包含密鑰 snps_in_bin。這是一個群組 (bin) 中所有唯一 SNP 的數組,可讓您快速檢查是否已擁有先前查詢的資料。它還可以輕鬆地使用以下程式碼循環遍歷組(bin)中的所有 SNP:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

現在我們可以(並且已經開始認真地)運行我們以前無法訪問的模型和場景。最好的事情是我的實驗室同事不必考慮任何併發症。他們只是有一個有效的功能。

雖然這個包沒有告訴他們細節,但我試圖讓資料格式足夠簡單,以便如果我明天突然消失,他們可以弄清楚...

速度明顯提高了。我們通常掃描具有重要功能的基因組片段。我們以前做不到這一點(結果發現成本太高),但現在,得益於組(bin)結構和緩存,請求一個SNP平均需要不到0,1秒,並且數據使用量如此之多。S3 的成本很低。

結論

本文根本不是指南。事實證明,該解決方案是因人而異的,而且幾乎肯定不是最佳的。相反,它是一篇遊記。我希望其他人明白,這樣的決定似乎不是在腦中完全形成的,它們是反覆試驗的結果。另外,如果您正在尋找資料科學家,請記住,有效使用這些工具需要經驗,而經驗需要花錢。我很高興我有能力支付費用,但許多其他能比我做得更好的人將永遠沒有機會,因為缺乏資金甚至嘗試。

大數據工具用途廣泛。如果您有時間,您幾乎肯定可以使用智慧資料清理、儲存和提取技術編寫更快的解決方案。最終歸結為成本效益分析。

我學到的是:

  • 沒有廉價的方法可以一次解析 25 TB;
  • 請注意 Parquet 檔案的大小及其組織;
  • Spark中的分區必須是平衡的;
  • 一般來說,永遠不要嘗試創建 2,5 萬個分區;
  • 排序仍然很困難,設定 Spark 也很困難;
  • 有時特殊的數據需要特殊的解決方案;
  • Spark聚合很快,但分區仍然很昂貴;
  • 當他們教你基礎知識時不要睡覺,早在 1980 世紀 XNUMX 年代就有人可能已經解決了你的問題;
  • gnu parallel - 這是一個神奇的東西,每個人都應該使用它;
  • Spark喜歡未壓縮的數據,不喜歡合併分割區;
  • Spark在解決簡單問題時開銷太大;
  • AWK 的關聯數組非常有效率;
  • 你可以聯繫 stdin и stdout 來自 R 腳本,因此在管道中使用它;
  • 由於智慧路徑的實現,S3可以處理許多文件;
  • 浪費時間的主要原因是過早優化你的儲存方法;
  • 不要嘗試手動優化任務,讓計算機來完成;
  • API應該簡單,以便使用方便和靈活;
  • 如果您的資料準備充分,快取就會很容易!

來源: www.habr.com

添加評論