AWK ず R を䜿甚しお 25TB を解析する

AWK ず R を䜿甚しお 25TB を解析する
この蚘事の読み方: 非垞に長くお混沌ずした文章になっおしたい申し蚳ありたせん。 時間を節玄するために、各章は「孊んだこず」の導入郚から始たり、その章の芁点を XNUMX  XNUMX 文で芁玄しおいたす。

「解決策を教えおください」 私の原点を知りたいだけなら、「もっず創意工倫する」の章たで飛ばしおください。ただし、倱敗に぀いお読んだほうが興味深く、圹に立぀ず思いたす。

私は最近、倧量の生の DNA 配列 (技術的には SNP チップ) を凊理するプロセスをセットアップする任務を負っおいたした。 必芁なのは、埌続のモデリングやその他のタスクのために、特定の遺䌝子䜍眮 (SNP ず呌ばれる) に関するデヌタを迅速に取埗するこずでした。 R ず AWK を䜿甚するず、自然な方法でデヌタを敎理しお敎理するこずができ、ク゚リ凊理が倧幅に高速化されたした。 これは私にずっお簡単ではなく、䜕床も繰り返す必芁がありたした。 この蚘事は、私の間違いのいく぀かを回避し、私が最終的にどうなったかを瀺すのに圹立ちたす。

たず、入門的な説明をしたす。

デヌタ

私たちの倧孊の遺䌝情報凊理センタヌからは、25 TB TSV の圢匏でデヌタが提䟛されたした。 私はそれらを 5 ぀のパッケヌゞに分割しお受け取り、Gzip で圧瞮したした。各パッケヌゞには玄 240 の 2,5 GB のファむルが含たれおいたした。 各行には、60 人の個人からの 30 ぀の SNP のデヌタが含たれおいたした。 合蚈で、玄 XNUMX 䞇の SNP ず玄 XNUMX 䞇人に関するデヌタが送信されたした。 ファむルには、SNP 情報に加えお、読み取り匷床、さたざたな察立遺䌝子の頻床など、さたざたな特性を反映する数倀が蚘茉された倚数の列が含たれおいたした。 䞀意の倀を持぀列は合蚈で玄 XNUMX 列ありたした。

目暙

他のデヌタ管理プロゞェクトず同様に、最も重芁なこずは、デヌタがどのように䜿甚されるかを決定するこずでした。 この堎合 䞻に SNP に基づいお SNP のモデルずワヌクフロヌを遞択したす。。 ぀たり、䞀床に 2,5 ぀の SNP に関するデヌタのみが必芁になりたす。 私は、XNUMX 䞇の SNP の XNUMX ぀に関連付けられたすべおのレコヌドを、できるだけ簡単、迅速、そしお安䟡に取埗する方法を孊ばなければなりたせんでした。

これをしない方法

適切な決たり文句を匕甚するず、次のようになりたす。

私は䜕千回も倱敗したわけではありたせん。ク゚リに適した圢匏で倧量のデヌタを解析するこずを回避する千の方法を発芋しただけです。

最初に詊す

䜕を孊んだのか: 䞀床に 25 TB を解析する安䟡な方法はありたせん。

ノァンダヌビルト倧孊で「ビッグ デヌタ凊理の高床な手法」コヌスを受講したので、私はそのコツがバッグの䞭にあるず確信しおいたした。 Hive サヌバヌをセットアップしおすべおのデヌタを実行し、結果をレポヌトするには、おそらく 3  XNUMX 時間かかりたす。 デヌタは AWS SXNUMX に保存されおいるため、サヌビスを䜿甚したした アテナこれにより、Hive SQL ク゚リを S3 デヌタに適甚できるようになりたす。 Hive クラスタヌをセットアップ/構築する必芁はなく、探しおいるデヌタに察しおのみ料金を支払いたす。

Athena にデヌタずその圢匏を瀺した埌、次のようなク゚リを䜿甚しおいく぀かのテストを実行したした。

select * from intensityData limit 10;

そしお、すぐによく構造化された結果を受け取りたした。 準備ができお。

デヌタを仕事に䜿おうずするたでは...

モデルをテストするためにすべおの SNP 情報を匕き出すように求められたした。 ク゚リを実行したした:


select * from intensityData 
where snp = 'rs123456';

...そしお埅ち始めたした。 4 分埌、5 TB を超えるデヌタが芁求された埌、結果を受け取りたした。 Athena は、芋぀かったデヌタの量に応じお 20 テラバむトあたり 38 ドルを請求したす。 ぀たり、この 50 回のリク゚ストには XNUMX ドルのコストず XNUMX 分の埅機時間がかかりたした。 すべおのデヌタに察しおモデルを実行するには、XNUMX 幎間埅機し、XNUMX 䞇ドルを支払わなければなりたせんでしたが、これは明らかに私たちには適しおいたせんでした。

寄朚现工を䜿甚する必芁がありたした...

䜕を孊んだのか: Parquet ファむルのサむズずその構成には泚意しおください。

私は最初に、すべおの TSV を次の圢匏に倉換するこずで状況を修正しようずしたした。 寄朚现工のファむル。 これらは、その䞭の情報が列圢匏で保存されるため、倧芏暡なデヌタ セットを扱うのに䟿利です。行に各列の芁玠が含たれるテキスト ファむルずは察照的に、各列は独自のメモリ/ディスク セグメントに存圚したす。 䜕かを芋぀ける必芁がある堎合は、必芁な列を読んでください。 さらに、各ファむルは列に倀の範囲を保存するため、探しおいる倀が列の範囲にない堎合でも、Spark はファむル党䜓のスキャンに時間を無駄にするこずはありたせん。

簡単なタスクを実行したした AWSグルヌ TSV を Parquet に倉換し、新しいファむルを Athena にドロップしたした。 箄5時間かかりたした。 しかし、リク゚ストを実行しおみるず、完了たでにかかる時間はほが同じで、費甚は少し少なくなりたした。 実際のずころ、Spark はタスクを最適化しようずしお、XNUMX ぀の TSV チャンクを解凍し、それを独自の Parquet チャンクに配眮しただけです。 たた、各チャンクは倚くの人の蚘録党䜓を含めるのに十分な倧きさで、各ファむルにはすべおの SNP が含たれおいたため、Spark は必芁な情報を抜出するためにすべおのファむルを開く必芁がありたした。

興味深いこずに、Parquet のデフォルト (および掚奚) 圧瞮タむプである snappy は分割可胜ではありたせん。 そのため、各実行者は、3,5 GB の完党なデヌタセットを解凍しおダりンロヌドするずいう䜜業に远われおいたした。

AWK ず R を䜿甚しお 25TB を解析する

問題を理解したしょう

䜕を孊んだのか: 特にデヌタが分散されおいる堎合、䞊べ替えは困難です。

今、私は問題の本質を理解したように思えたした。 デヌタを人ごずではなく、SNP 列ごずに䞊べ替えるだけで枈みたした。 次に、いく぀かの SNP が別のデヌタ チャンクに保存され、Parquet の「スマヌト」機胜「倀が範囲内にある堎合にのみオヌプン」がその栄光を発揮したす。 残念ながら、クラスタヌ内に散圚する数十億行を䞊べ替えるのは困難な䜜業であるこずが刀明したした。

AWS は、「集䞭力のない孊生だから」ずいう理由で返金を行うこずを絶察に望んでいたせん。 Amazon Glue で䞊べ替えを実行した埌、2 日間実行されおクラッシュしたした。

パヌティション分割に぀いおはどうですか?

䜕を孊んだのか: Spark のパヌティションはバランスをずる必芁がありたす。

そこで私は、デヌタを染色䜓で分割するずいうアむデアを思い぀きたした。 それらは 23 個ありたす (ミトコンドリア DNA ずマッピングされおいない領域を考慮するずさらにいく぀かありたす)。
これにより、デヌタをより小さなチャンクに分割できるようになりたす。 Glue スクリプトの Spark ゚クスポヌト関数に XNUMX 行だけ远加するず、 partition_by = "chr"、その埌、デヌタをバケットに分割する必芁がありたす。

AWK ず R を䜿甚しお 25TB を解析する
ゲノムは染色䜓ず呌ばれる倚数の断片から構成されおいたす。

残念ながら、うたくいきたせんでした。 染色䜓のサむズが異なるずいうこずは、情報量が異なるこずを意味したす。 これは、䞀郚のノヌドが早く終了しおアむドル状態になったため、Spark がワヌカヌに送信したタスクのバランスがずれず、完了に時間がかかるこずを意味したす。 ただし、タスクは完了したした。 しかし、10 ぀の SNP を芁求するず、䞍均衡が再び問題を匕き起こしたした。 より倧きな染色䜓 (぀たり、デヌタを取埗したい堎所) 䞊の SNP を凊理するコストは、玄 XNUMX 分の XNUMX しか枛少したせん。 たくさんありたすが、十分ではありたせん。

さらに现かく分割するずどうなるでしょうか

䜕を孊んだのか: 2,5 䞇パヌティションを決しお実行しないでください。

私は思い切っお各 SNP を分割するこずにしたした。 これにより、パヌティションのサむズが確実に同じになりたす。 それは悪い考えでした。 接着剀を䜿っお無邪気なラむンを远加したした partition_by = 'snp'。 タスクが開始され、実行が開始されたした。 3 日埌確認したずころ、ただ S3 に䜕も曞き蟌たれおいなかったので、タスクを匷制終了したした。 Glue が SXNUMX の隠し堎所に䞭間ファむルを曞き蟌んでいたようです。おそらく数癟䞇もの倧量のファむルが曞き蟌たれおいたした。 結果ずしお、私の間違いは千ドル以䞊の損害をもたらし、私の指導者を喜ばせるこずはできたせんでした。

パヌティショニング + ゜ヌト

䜕を孊んだのか: Spark のチュヌニングず同様に、䞊べ替えは䟝然ずしお困難です。

前回のパヌティション化の詊みでは、染色䜓をパヌティション化し、各パヌティションを䞊べ替えたした。 理論的には、必芁な SNP デヌタは指定された範囲内のいく぀かの Parquet チャンク内にある必芁があるため、これにより各ク゚リが高速化されたす。 残念ながら、分割されたデヌタであっおも䞊べ替えるのは困難な䜜業であるこずが刀明したした。 その結果、カスタム クラスタヌの EMR に切り替え、5.4 ぀の匷力なむンスタンス (CXNUMXxl) ず Sparklyr を䜿甚しお、より柔軟なワヌクフロヌを䜜成したした。

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...しかし、タスクはただ完了しおいたせんでした。 各ク゚リ実行プログラムのメモリ割り圓おを増やしたり、倧量のメモリを持぀ノヌドを䜿甚したり、ブロヌドキャスト倉数 (ブロヌドキャスト倉数) を䜿甚したり、さたざたな方法で構成したしたが、毎回これらは䞭途半端であるこずが刀明し、埐々に実行プログラムが起動し始めたした。すべおが止たるたで倱敗するこず。

より創造的になっおいきたす

䜕を孊んだのか: 特殊なデヌタには特殊な゜リュヌションが必芁な堎合がありたす。

各 SNP には䜍眮の倀がありたす。 これは、染色䜓䞊の塩基の数に察応する数倀です。 これはデヌタを敎理するための自然で優れた方法です。 最初は各染色䜓の領域ごずに分割したいず考えおいたした。 たずえば、䜍眮 1  2000、2001  4000 などです。 しかし問題は、SNP が染色䜓党䜓に均䞀に分垃しおいないため、グルヌプのサむズが倧きく異なるこずです。

AWK ず R を䜿甚しお 25TB を解析する

その結果、圹職をカテゎリヌランクに分類するこずにたどり着きたした。 すでにダりンロヌドしたデヌタを䜿甚しお、固有の SNP、その䜍眮、および染色䜓のリストを取埗するリク゚ストを実行したした。 次に、各染色䜓内のデヌタを䞊べ替え、SNP を所定のサむズのグルヌプ (ビン) に収集したした。 それぞれ 1000 の SNP があるずしたす。 これにより、SNP ず染色䜓ごずのグルヌプの関係がわかりたした。

最終的に 75 個の SNP のグルヌプ (bin) を䜜成したした。その理由は以䞋で説明したす。

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

たずはSparkで詊しおみる

䜕を孊んだのか: Spark の集玄は高速ですが、パヌティショニングには䟝然ずしおコストがかかりたす。

この小さな (2,5 䞇行) デヌタ フレヌムを Spark に読み蟌み、生デヌタず結合しお、新しく远加された列でパヌティション化したいず考えおいたした。 bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

䜿甚したした sdf_broadcast()したがっお、Spark はデヌタ フレヌムをすべおのノヌドに送信する必芁があるこずを認識したす。 これは、デヌタのサむズが小さく、すべおのタスクに必芁な堎合に䟿利です。 そうしないず、Spark は必芁に応じお賢くデヌタを配垃しようずするため、速床が䜎䞋する可胜性がありたす。

そしお再び、私のアむデアはうたくいきたせんでした。タスクはしばらくの間機胜し、結合を完了したしたが、その埌、パヌティショニングによっお起動された゚グれキュヌタず同様に、倱敗し始めたした。

AWKの远加

䜕を孊んだのか基本を教えおもらっおいる間は寝ないでください。 きっず誰かが 1980 幎代にすでにあなたの問題を解決しおいたした。

この時点たで、Spark でのすべおの倱敗の原因はクラスタヌ内のデヌタのごちゃ混ぜでした。 おそらく、前凊理によっお状況が改善される可胜性がありたす。 私は生のテキスト デヌタを染色䜓の列に分割しおみるこずにしたので、Spark に「事前に分割された」デヌタを提䟛したいず考えたした。

StackOverflowで列の倀で分割する方法を怜玢したずころ、 ずおも玠晎らしい答えです。 AWK を䜿甚するず、結果を送信するのではなく、スクリプトにテキスト ファむルを蚘述するこずで、列の倀ごずにテキスト ファむルを分割できたす。 stdout.

Bash スクリプトを曞いお詊しおみたした。 パッケヌゞ化された TSV の XNUMX ぀をダりンロヌドし、次のコマンドを䜿甚しお解凍したす。 gzip そしお送信されたした awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

動いた

コアを埋める

䜕を孊んだのか: gnu parallel - それは魔法のものです、誰もがそれを䜿うべきです。

別れはかなりゆっくりで、私が別れ始めたずき、 htop匷力な (そしお高䟡な) EC2 むンスタンスの䜿甚状況を確認するために、200 ぀のコアず玄 XNUMX MB のメモリのみを䜿甚しおいるこずが刀明したした。 倚額の損倱を出さずに問題を解決するには、䜜業を䞊列化する方法を芋぀ける必芁がありたした。 幞いなこずに、本圓に玠晎らしい本の䞭で コマンド ラむンでのデヌタ サむ゚ンス Jeron Janssens による䞊列化に関する章を芋぀けたした。 そこから私が孊んだこずは、 gnu parallel、Unix でマルチスレッドを実装するための非垞に柔軟な方法です。

AWK ず R を䜿甚しお 25TB を解析する
新しいプロセスを䜿甚しおパヌティショニングを開始したずきは、すべお問題ありたせんでしたが、ただボトルネックがありたした。S3 オブゞェクトのディスクぞのダりンロヌドはそれほど高速ではなく、完党に䞊列化されおいたせんでした。 これを修正するために、次のようにしたした。

  1. S3 ダりンロヌド ステヌゞをパむプラむンに盎接実装し、ディスク䞊の䞭間ストレヌゞを完党に排陀できるこずがわかりたした。 これは、生デヌタをディスクに曞き蟌む必芁がなく、AWS 䞊のさらに小さい、したがっお安䟡なストレヌゞを䜿甚できるこずを意味したす。
  2. チヌム aws configure set default.s3.max_concurrent_requests 50 AWS CLI が䜿甚するスレッドの数が倧幅に増加したした (デフォルトでは 10 です)。
  3. 名前に文字 n が含たれる、ネットワヌク速床に最適化された EC2 むンスタンスに切り替えたした。 n むンスタンスを䜿甚する堎合の凊理​​胜力の損倱は、読み蟌み速床の向䞊によっお十分に補われるこずがわかりたした。 ほずんどのタスクでは c5n.4xl を䜿甚したした。
  4. かわった gzip Ма pigz、これは、ファむルを解凍するずいう最初は䞊列化されおいなかったタスクを䞊列化する玠晎らしいこずを実行できる gzip ツヌルです (これは最も圹に立ちたせんでした)。

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

これらの手順を盞互に組み合わせるこずで、すべおが非垞に迅速に機胜したす。 ダりンロヌド速床を向䞊させ、ディスクぞの曞き蟌みを排陀したこずで、5 テラバむトのパッケヌゞをわずか数時間で凊理できるようになりたした。

このツむヌトには「TSV」に぀いお蚀及する必芁がありたした。 ああ、ああ。

新しく解析されたデヌタの䜿甚

䜕を孊んだのか: Spark は非圧瞮デヌタを奜み、パヌティションの結合を奜みたせん。

これで、デヌタがアンパック (共有: 共有) か぀半順序付けされた圢匏で S3 に保存され、再び Spark に戻るこずができたした。 驚きが私を埅っおいたした。私は再び望んでいたものを達成できたせんでした。 デヌタがどのように分割されおいるかを Spark に正確に䌝えるのは非垞に困難でした。 そしお、これを実行したずきでも、パヌティションが倚すぎるこずが刀明したした95。 coalesce その数を劥圓な制限たで枛らしたので、パヌティション分割が砎壊されたした。 これは修正できるず確信しおいたすが、数日間怜玢しおも解決策が芋぀かりたせんでした。 最終的には Spark ですべおのタスクを完了したしたが、時間がかかり、分割された Parquet ファむルはそれほど小さくありたせんでした (~200 KB)。 ただし、デヌタは必芁な堎所にありたした。

AWK ず R を䜿甚しお 25TB を解析する
小さすぎお凞凹しおお玠敵

ロヌカル Spark ク゚リのテスト

䜕を孊んだのか: 単玔な問題を解決する堎合、Spark にはオヌバヌヘッドが倚すぎたす。

デヌタを賢い圢匏でダりンロヌドするこずで、速床をテストするこずができたした。 ロヌカル Spark サヌバヌを実行するように R スクリプトを蚭定し、指定された Parquet グルヌプ ストレヌゞ (bin) から Spark デヌタ フレヌムをロヌドしたす。 すべおのデヌタをロヌドしようずしたしたが、Sparklyr にパヌティショニングを認識させるこずができたせんでした。

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

実行には 29,415 秒かかりたした。 はるかに優れおいたすが、䜕かを倧量にテストするにはあたり適しおいたせん。 さらに、デヌタ フレヌムをメモリにキャッシュしようずするず、重みが 50 未満のデヌタセットに 15 GB 以䞊のメモリを割り圓おた堎合でも、Spark が垞にクラッシュしたため、キャッシュを䜿甚しおも速床を䞊げるこずができたせんでした。

AWKに戻る

䜕を孊んだのか: AWK の連想配列は非垞に効率的です。

もっず高速に到達できるこずに気づきたした。 それを玠晎らしい圢で思い出したした Bruce Barnett による AWK チュヌトリアル 「」ずいう玠晎らしい機胜に぀いお読みたした。連想配列」 基本的に、これらはキヌず倀のペアですが、䜕らかの理由で AWK では異なる呌び方をしおいたので、私はそれらに぀いおあたり考えおいたせんでした。 ロヌマン・チェプリャカ 「連想配列」ずいう甚語は「キヌず倀のペア」ずいう甚語よりもはるかに叀いこずを思い出したした。 たずえあなたが Google Ngram で Key-Value を怜玢するこの甚語はそこにはありたせんが、連想配列は芋぀かりたす。 さらに、「キヌず倀のペア」はデヌタベヌスに関連付けられるこずが倚いため、ハッシュマップず比范するこずは非垞に合理的です。 これらの連想配列を䜿甚するず、Spark を䜿甚せずに SNP をビン テヌブルおよび生デヌタに関連付けるこずができるこずに気付きたした。

これを行うには、AWK スクリプトで次のブロックを䜿甚したした。 BEGIN。 これは、デヌタの最初の行がスクリプトの本䜓に枡される前に実行されるコヌドです。

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

チヌム while(getline...) CSV グルヌプ (bin) からすべおの行をロヌドし、最初の列 (SNP 名) を連想配列のキヌずしお蚭定したす bin 倀ずしお XNUMX 番目の倀 (グルヌプ) を指定したす。 それからブロックで { }これはメむン ファむルのすべおの行で実行され、各行は出力ファむルに送信され、グルヌプ (bin) に応じお䞀意の名前が付けられたす。 ..._bin_"bin[$1]"_....

倉数 batch_num О chunk_id パむプラむンによっお提䟛されるデヌタず䞀臎し、競合状態が回避され、各実行スレッドが実行されたす。 parallel、独自の䞀意のファむルに曞き蟌みたす。

AWK を䜿甚した前回の実隓で残った染色䜓䞊のフォルダヌにすべおの生デヌタを分散したため、䞀床に 3 ぀の染色䜓を凊理し、より深く分割されたデヌタを SXNUMX に送信する別の Bash スクリプトを䜜成できるようになりたした。

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

スクリプトには XNUMX ぀のセクションがありたす parallel.

最初のセクションでは、目的の染色䜓に関する情報を含むすべおのファむルからデヌタが読み取られ、次にこのデヌタがスレッド間で分散され、ファむルが適切なグルヌプ (bin) に分散されたす。 耇数のスレッドが同じファむルに曞き蟌むずきの競合状態を回避するために、AWK はファむル名を枡しおデヌタを別の堎所に曞き蟌みたす。 chr_10_bin_52_batch_2_aa.csv。 その結果、ディスク䞊に小さなファむルが倚数䜜成されたす (このために、テラバむトの EBS ボリュヌムを䜿甚したした)。

第二セクションからのコンベダヌ parallel グルヌプ (bin) を調べお、個々のファむルを共通の CSV c に結合したす。 catそしお゚クスポヌトのために送信したす。

Rで攟送

䜕を孊んだのか 連絡できる stdin О stdout R スクリプトから取埗するため、パむプラむンで䜿甚したす。

Bash スクリプトに次の行があるこずに気付いたかもしれたせん。 ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R...。 すべおの連結されたグルヌプ ファむル (bin) が以䞋の R スクリプトに倉換されたす。 {} 特殊なテクニックです parallel、指定されたストリヌムに送信するデヌタをコマンド自䜓に盎接挿入したす。 オプション {#} 䞀意のスレッド ID を提䟛し、 {%} ゞョブ スロット番号を衚したす (繰り返されたすが、同時には実行されたせん)。 すべおのオプションのリストは次の堎所にありたす。 ドキュメンテヌション。

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

倉数の堎合 file("stdin") に送信される readr::read_csv、R スクリプトに倉換されたデヌタはフレヌムにロヌドされ、次の圢匏になりたす。 .rds-ファむルを䜿甚しお aws.s3 S3 に盎接曞き蟌たれたす。

RDS は、スピヌカヌ ストレヌゞの䜙分な芁玠を省いた、Parquet のゞュニア バヌゞョンのようなものです。

Bash スクリプトを完了するず、バンドルが埗られたした .rds-ファむルは S3 にあり、効率的な圧瞮ず組み蟌みタむプを䜿甚できるようになりたした。

ブレヌキ R を䜿甚したにもかかわらず、すべおが非垞に迅速に機胜したした。 圓然のこずながら、デヌタの読み取りず曞き蟌みを行う R の郚分は高床に最適化されおいたす。 5 ぀の䞭型染色䜓でテストした埌、C4n.XNUMXxl むンスタンスでのゞョブは玄 XNUMX 時間で完了したした。

S3 の制限事項

䜕を孊んだのか: スマヌト パスの実装のおかげで、S3 は倚くのファむルを凊理できたす。

S3 に転送される倚数のファむルを S3 が凊理できるかどうかが心配でした。 意味のあるファむル名にするこずはできたすが、SXNUMX はどのようにファむル名を探すのでしょうか?

AWK ず R を䜿甚しお 25TB を解析する
S3 のフォルダヌは単なる芋せ物であり、実際にはシステムはこのシンボルには興味がありたせん。 /. S3 FAQ ペヌゞより。

S3 は、特定のファむルぞのパスを、䞀皮のハッシュ テヌブルたたはドキュメント ベヌスのデヌタベヌス内の単玔なキヌずしお衚しおいるようです。 バケットはテヌブルずしお考えるこずができ、ファむルはそのテヌブル内のレコヌドずしお考えるこずができたす。

Amazon で利益を䞊げるにはスピヌドず効率が重芁であるため、このファむルパスずしおのキヌシステムが異垞に最適化されおいるこずは驚くべきこずではありたせん。 倧量の取埗リク゚ストを䜜成する必芁がなく、リク゚ストが迅速に実行されるように、バランスを芋぀けようずしたした。 箄20䞇個のbinファむルを䜜成するのが最適であるこずが刀明したした。 最適化を続ければ、速床の向䞊が達成できるず思いたす (たずえば、デヌタ専甚の特別なバケットを䜜成しお、ルックアップ テヌブルのサむズを削枛するなど)。 しかし、さらなる実隓を行うための時間もお金もありたせんでした。

盞互互換性に぀いおはどうですか?

孊んだこず: 時間を無駄にする最倧の原因は、保管方法を時期尚早に最適化するこずです。

この時点で、「なぜ独自のファむル圢匏を䜿甚するのか?」ず自問するこずが非垞に重芁です。 その理由は、読み蟌み速床 (gzip 圧瞮された CSV ファむルの読み蟌みに 7 倍の時間がかかりたした) ずワヌクフロヌずの互換性にありたす。 R が Spark をロヌドせずに Parquet (たたは Arrow) ファむルを簡単にロヌドできるかどうか、再考するかもしれたせん。 私たちの研究宀では党員が R を䜿甚しおおり、デヌタを別の圢匏に倉換する必芁がある堎合でも、元のテキスト デヌタがただ残っおいるので、パむプラむンを再床実行するだけで枈みたす。

仕事の分割

䜕を孊んだのか: ゞョブを手動で最適化しようずせず、コンピュヌタヌに任せおください。

XNUMX ぀の染色䜓でワヌクフロヌをデバッグしたしたが、今床は他のすべおのデヌタを凊理する必芁がありたす。
倉換のために耇数の EC2 むンスタンスを起動したいず考えおいたしたが、同時に、(Spark が䞍均衡なパヌティションに悩たされたのず同じように) 異なる凊理ゞョブ間で非垞に䞍均衡な負荷が発生するのではないかず心配しおいたした。 さらに、AWS アカりントにはデフォルトでむンスタンス数が 10 個に制限されおいるため、染色䜓ごずに XNUMX ぀のむンスタンスを生成するこずに興味がありたせんでした。

そこで、凊理ゞョブを最適化するスクリプトを R で䜜成するこずにしたした。

たず、S3 に各染色䜓が占めるストレヌゞ容量を蚈算するように䟝頌したした。

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# 
 with 17 more rows

次に、合蚈サむズを取埗し、染色䜓の順序をシャッフルし、それらをグルヌプに分割する関数を曞きたした。 num_jobs すべおの凊理ゞョブのサむズがどのように異なるかを瀺したす。

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

次に、purrr を䜿甚しお XNUMX 回のシャッフルを実行し、最良のものを遞択したした。

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

そのため、最終的にサむズが非垞に䌌た䞀連のタスクが䜜成されたした。 その埌、残ったのは、前の Bash スクリプトを倧きなルヌプでラップするこずだけでした for。 この最適化の䜜成には玄 10 分かかりたした。 これは、タスクのバランスが厩れおいる堎合に手動でタスクを䜜成する堎合にかかる費甚よりもはるかに少なくなりたす。 したがっお、この予備的な最適化は正しかったず思いたす。

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

最埌にシャットダりンコマンドを远加したす。

sudo shutdown -h now

...そしおすべおがうたくいきたした AWS CLI を䜿甚しお、次のオプションを䜿甚しおむンスタンスを起動したした。 user_data 凊理するタスクの Bash スクリプトを圌らに枡したした。 それらは自動的に実行されシャットダりンされるため、远加の凊理胜力にお金を払う必芁はありたせんでした。

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

梱包したしょう

䜕を孊んだのか: 䜿いやすさず柔軟性を高めるために、API はシンプルである必芁がありたす。

぀いに、デヌタを正しい堎所ず圢匏で取埗できたした。 あずは、同僚が䜿いやすいように、デヌタを䜿甚するプロセスを可胜な限り簡玠化するこずだけでした。 リク゚ストを䜜成するためのシンプルな API を䜜成したいず思いたした。 将来的に切り替えるこずにした堎合、 .rds Parquet ファむルに倉換する堎合、これは同僚ではなく私にずっお問題になるはずです。 このために、内郚 R パッケヌゞを䜜成するこずにしたした。

関数を䞭心に線成されたいく぀かのデヌタ アクセス関数のみを含む非垞に単玔なパッケヌゞを構築しお文曞化する get_snp。 同僚向けのりェブサむトも䜜りたした パッケヌゞダりン、䟋やドキュメントを簡単に芋るこずができたす。

AWK ず R を䜿甚しお 25TB を解析する

スマヌトキャッシング

䜕を孊んだのか: デヌタが十分に準備されおいれば、キャッシュは簡単です。

䞻芁なワヌクフロヌの XNUMX ぀が同じ分析モデルを SNP パッケヌゞに適甚したため、ビニングを有利に䜿甚するこずにしたした。 SNP 経由でデヌタを送信する堎合、グルヌプ (bin) からのすべおの情報が返されるオブゞェクトに添付されたす。 ぀たり、叀いク゚リは (理論䞊は) 新しいク゚リの凊理を高速化できたす。

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

パッケヌゞを構築するずきに、さたざたな方法を䜿甚した堎合の速床を比范するために倚くのベンチマヌクを実行したした。 予期せぬ結果が生じる堎合があるため、これを無芖しないこずをお勧めしたす。 䟋えば、 dplyr::filter むンデックスベヌスのフィルタリングを䜿甚しお行をキャプチャするよりもはるかに高速であり、フィルタリングされたデヌタ フレヌムから単䞀の列を取埗するのは、むンデックス構文を䜿甚するよりもはるかに高速でした。

オブゞェクトに泚意しおください prev_snp_results キヌが含たれおいたす snps_in_bin。 これはグルヌプ (ビン) 内のすべおの䞀意の SNP の配列であり、以前のク゚リからのデヌタが既に存圚するかどうかをすばやく確認できたす。 たた、次のコヌドを䜿甚するず、グルヌプ (ビン) 内のすべおの SNP を簡単にルヌプできたす。

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

結果

今では、以前はアクセスできなかったモデルやシナリオを実行できるようになりたした (そしお本栌的に実行し始めおいたす)。 䞀番良いのは、研究宀の同僚が耇雑なこずを考える必芁がないこずです。 機胜する機胜があるだけです。

パッケヌゞでは詳现は省いおいたすが、明日私が突然いなくなった堎合でも理解できるように、デヌタ圢匏をシンプルにするよう努めたした...

速床が著しく向䞊したした。 私たちは通垞、機胜的に重芁なゲノム断片をスキャンしたす。 以前はこれを実行できたせんでした (コストが高すぎるこずが刀明したした)。しかし、珟圚はグルヌプ (ビン) 構造ずキャッシュのおかげで、0,1 ぀の SNP のリク゚ストにかかる時間は平均 3 秒未満で、デヌタ䜿甚量も倧幅に削枛されおいたす。 SXNUMX のコストはピヌナッツほど䜎いです。

たずめ

この蚘事はたったくガむドではありたせん。 解決策は個別のものであり、ほが確実に最適ではないこずが刀明したした。 ずいうより旅行蚘です。 他の人たちには、そのような決定は頭の䞭で完党に圢成されたものではなく、詊行錯誀の結果であるこずを理解しおもらいたいず思いたす。 たた、デヌタ サむ゚ンティストを探しおいる堎合は、これらのツヌルを効果的に䜿甚するには経隓が必芁であり、経隓には費甚がかかるこずに留意しおください。 私にはお金を払う䜙裕があったので幞せですが、私よりも同じ仕事ができる他の倚くの人は、お金がないために挑戊する機䌚すらないでしょう。

ビッグデヌタツヌルは倚甚途です。 時間があれば、スマヌトなデヌタ クリヌニング、保存、抜出手法を䜿甚しお、より高速な゜リュヌションを䜜成するこずがほが確実に可胜です。 最終的には費甚察効果の分析に行き着きたす。

私が孊んだこず:

  • 䞀床に 25 TB を解析する安䟡な方法はありたせん。
  • Parquet ファむルのサむズずその構成には泚意しおください。
  • Spark のパヌティションはバランスをずる必芁がありたす。
  • 䞀般に、2,5 䞇個のパヌティションを䜜成しようずしないでください。
  • Spark のセットアップず同様に、䞊べ替えは䟝然ずしお困難です。
  • 特殊なデヌタには特殊な゜リュヌションが必芁な堎合がありたす。
  • Spark の集玄は高速ですが、パヌティショニングには䟝然ずしおコストがかかりたす。
  • 基本を教えるずきは眠らないでください。おそらく 1980 幎代に誰かがすでにあなたの問題を解決しおいたした。
  • gnu parallel - これは魔法のようなものです。誰もがそれを䜿うべきです。
  • Spark は非圧瞮デヌタを奜み、パヌティションの結合を奜みたせん。
  • 単玔な問題を解決する堎合、Spark のオヌバヌヘッドは倚すぎたす。
  • AWK の連想配列は非垞に効率的です。
  • 連絡できる stdin О stdout R スクリプトから取埗するため、パむプラむンで䜿甚したす。
  • スマヌト パスの実装のおかげで、S3 は倚くのファむルを凊理できたす。
  • 時間を無駄にする䞻な理由は、保管方法を時期尚早に最適化するこずです。
  • タスクを手動で最適化しようずせず、コンピュヌタに任せおください。
  • 䜿いやすさず柔軟性を高めるために、API はシンプルである必芁がありたす。
  • デヌタが十分に準備されおいれば、キャッシュは簡単です。

出所 habr.com

コメントを远加したす