使用 AWK 和 R 解析 25TB

使用 AWK 和 R 解析 25TB
如何阅读这篇文章:对于文字如此长和混乱,我深表歉意。 为了节省您的时间,我在每一章的开头都会介绍“我学到了什么”,用一两句话总结了本章的精髓。

“请告诉我解决方案!” 如果你只是想知道我从哪里来,那么请跳到“变得更有创造力”一章,但我认为阅读有关失败的内容更有趣、更有用。

我最近的任务是建立一个处理大量原始 DNA 序列(技术上是 SNP 芯片)的流程。 需要快速获取有关给定遗传位置(称为 SNP)的数据,以用于后续建模和其他任务。 使用 R 和 AWK,我能够以自然的方式清理和组织数据,从而大大加快查询处理速度。 这对我来说并不容易,需要多次迭代。 这篇文章将帮助您避免我的一些错误,并向您展示我的最终结果。

首先,一些介绍性的解释。

数据

我们大学的遗传信息处理中心为我们提供了25TB TSV形式的数据。 我收到的文件被分成 5 个包,通过 Gzip 压缩,每个包包含大约 240 个 2,5GB 的文件。 每一行包含来自一个人的一个 SNP 的数据。 总共传输了约 60 万个 SNP 和约 30 万人的数据。 除了 SNP 信息外,文件还包含许多列,其中的数字反映了各种特征,例如读取强度、不同等位基因的频率等。 总共大约有 XNUMX 个具有唯一值的列。

目标

与任何数据管理项目一样,最重要的是确定如何使用数据。 在这种情况下 我们主要会根据SNP选择SNP的模型和工作流程。 也就是说,我们一次只需要一个 SNP 的数据。 我必须学习如何尽可能轻松、快速且廉价地检索与 2,5 万个 SNP 之一相关的所有记录。

如何不这样做

引用一句合适的陈词滥调:

我并没有失败一千次,我只是发现了一千种方法来避免以查询友好的格式解析一堆数据。

先试试

我学到了什么:没有便宜的方法可以一次解析 25 TB。

在范德比尔特大学学习了“大数据处理高级方法”课程后,我确信这个窍门已经掌握了。 设置 Hive 服务器来运行所有数据并报告结果可能需要一两个小时。 由于我们的数据存储在AWS S3中,因此我使用了该服务 雅典娜,它允许您将 Hive SQL 查询应用于 S3 数据。 您不需要设置/启动 Hive 集群,而且您也只需为您要查找的数据付费。

在向 Athena 展示我的数据及其格式后,我使用如下查询运行了一些测试:

select * from intensityData limit 10;

并很快收到了结构良好的结果。 准备好。

直到我们尝试在工作中使用这些数据......

我被要求提取所有 SNP 信息来测试模型。 我运行了查询:


select * from intensityData 
where snp = 'rs123456';

……然后开始等待。 经过 4 分钟和超过 5 TB 的请求数据,我收到了结果。 Athena 按发现的数据量收费,每 TB 20 美元。 因此,这个单一请求花费了 38 美元和 50 分钟的等待时间。 为了在所有数据上运行模型,我们等待了XNUMX年并支付了XNUMX万美元,显然这不适合我们。

有必要使用Parquet...

我学到了什么:请注意 Parquet 文件的大小及其组织。

我首先尝试通过将所有 TSV 转换为 镶木地板文件。 它们很方便处理大型数据集,因为其中的信息以列形式存储:每一列都位于其自己的内存/磁盘段中,这与文本文件不同,文本文件中的行包含每列的元素。 如果您需要查找某些内容,只需阅读所需的列即可。 另外,每个文件在列中存储一定范围的值,因此如果您要查找的值不在该列的范围内,Spark 不会浪费时间扫描整个文件。

我运行了一个简单的任务 AWS胶水 将我们的 TSV 转换为 Parquet 并将新文件放入 Athena。 大约花了5个小时。 但当我运行该请求时,完成该请求所需的时间大约相同,但花费的资金却少了一些。 事实上,Spark 试图优化该任务​​,只是简单地解压了一个 TSV 块并将其放入自己的 Parquet 块中。 而且由于每个块都足够大,可以包含许多人的全部记录,因此每个文件都包含所有 SNP,因此 Spark 必须打开所有文件才能提取所需的信息。

有趣的是,Parquet 的默认(也是推荐的)压缩类型 snappy 是不可分割的。 因此,每个执行器都专注于解压和下载完整的 3,5 GB 数据集的任务。

使用 AWK 和 R 解析 25TB

让我们了解一下问题

我学到了什么:排序很困难,尤其是数据是分布式的。

我觉得现在我明白了问题的本质。 我只需要按 SNP 列对数据进行排序,而不是按人排序。 然后几个SNP将被存储在一个单独的数据块中,然后Parquet的“仅当值在范围内时才打开”的“智能”功能将大显身手。 不幸的是,对分散在集群中的数十亿行进行排序被证明是一项艰巨的任务。

AWS 绝对不想因为“我是一个注意力不集中的学生”的原因而退款。 我在 Amazon Glue 上运行排序后,它运行了 2 天然后崩溃了。

那分区呢?

我学到了什么:Spark中的分区必须是平衡的。

然后我就想到了在染色体上划分数据的想法。 其中有 23 个(如果考虑线粒体 DNA 和未映射的区域,还有更多)。
这将允许您将数据分割成更小的块。 如果只在 Glue 脚本中的 Spark 导出函数中添加一行 partition_by = "chr",那么数据应该被分成桶。

使用 AWK 和 R 解析 25TB
基因组由许多称为染色体的片段组成。

不幸的是,它没有起作用。 染色体有不同的大小,这意味着不同的信息量。 这意味着 Spark 发送给工作人员的任务不平衡并且完成缓慢,因为某些节点提前完成并处于空闲状态。 不过,任务还是完成了。 但当要求获取一个 SNP 时,这种不平衡再次引发了问题。 在较大染色体(即我们想要获取数据的位置)上处理 SNP 的成本仅降低了约 10 倍。 很多,但还不够。

如果我们把它分成更小的部分呢?

我学到了什么:根本不要尝试进行 2,5 万个分区。

我决定全力以赴,对每个SNP进行分区。 这确保了分区的大小相同。 这是一个坏主意。 我用了胶水并添加了一条无辜的线 partition_by = 'snp'。 任务开始并开始执行。 一天后我检查发现S3仍然没有任何写入,所以我终止了该任务。 看起来 Glue 正在将中间文件写入 S3 中的隐藏位置,很多文件,可能有几百万个。 结果,我的错误造成了一千多美元的损失,而且我的导师也不高兴。

分区+排序

我学到了什么:排序仍然很困难,调优 Spark 也很困难。

我上次的分区尝试涉及对染色体进行分区,然后对每个分区进行排序。 理论上,这会加快每个查询的速度,因为所需的 SNP 数据必须位于给定范围内的几个 Parquet 块内。 不幸的是,即使对分区数据进行排序也是一项艰巨的任务。 因此,我转而使用 EMR 来构建自定义集群,并使用八个功能强大的实例 (C5.4xl) 和 Sparklyr 来创建更灵活的工作流程...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

……然而,任务还没有完成。 我用不同的方式进行了配置:增加每个查询执行器的内存分配,使用内存量大的节点,使用广播变量(broadcastingvariables),但每次这些结果都是半途而废,逐渐执行器开始失败,直到一切都停止。

我变得更有创意

我学到了什么:有时特殊的数据需要特殊的解决方案。

每个 SNP 都有一个位置值。 这是一个对应于其染色体上的碱基数量的数字。 这是组织数据的一种很好且自然的方式。 起初我想按每个染色体的区域进行分区。 例如,位置 1 - 2000、2001 - 4000 等。 但问题是 SNP 并不是均匀分布在染色体上,因此群体大小会有很大差异。

使用 AWK 和 R 解析 25TB

结果,我将职位细分为类别(排名)。 使用已经下载的数据,我发出了获取唯一 SNP、它们的位置和染色体列表的请求。 然后,我对每条染色体内的数据进行排序,并将 SNP 收集到给定大小的组(箱)中。 假设每个有 1000 个 SNP。 这给了我 SNP 与每个染色体组的关系。

最后,我制作了75个SNP的组(bin),原因将在下面解释。

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

第一次尝试使用 Spark

我学到了什么:Spark聚合很快,但分区仍然很昂贵。

我想将这个小(2,5 万行)数据帧读入 Spark,将其与原始数据合并,然后按新添加的列对其进行分区 bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

我用了 sdf_broadcast(),因此 Spark 知道它应该将数据帧发送到所有节点。 如果数据量较小且所有任务都需要,这非常有用。 否则,Spark 会尝试变得智能并根据需要分发数据,这可能会导致速度变慢。

再说一次,我的想法不起作用:任务工作了一段时间,完成了联合,然后,就像分区启动的执行器一样,它们开始失败。

添加AWK

我学到了什么:在教授基础知识时不要睡觉。 当然,早在 1980 世纪 XNUMX 年代就有人已经解决了您的问题。

到目前为止,我所有 Spark 失败的原因都是集群中的数据混乱。 也许通过预处理可以改善这种情况。 我决定尝试将原始文本数据拆分为染色体列,因此我希望为 Spark 提供“预分区”数据。

我在StackOverflow上搜索了如何按列值拆分,发现 这么好的答案。 使用 AWK,您可以通过将文本文件写入脚本中来按列值拆分文本文件,而不是将结果发送到 stdout.

我写了一个 Bash 脚本来尝试一下。 下载了其中一个打包的 TSV,然后使用 gzip 并发送至 awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

有效!

填充核心

我学到了什么: gnu parallel - 这是一个神奇的东西,每个人都应该使用它。

分离的过程非常缓慢,当我开始时 htop为了检查功能强大(且昂贵)的 EC2 实例的使用情况,结果发现我只使用了一个核心和大约 200 MB 的内存。 为了解决问题并且不损失很多钱,我们必须弄清楚如何并行化工作。 幸运的是,在一本绝对精彩的书中 命令行中的数据科学 我找到了 Jeron Janssens 写的关于并行化的一章。 从中我了解到 gnu parallel,一种在 Unix 中实现多线程的非常灵活的方法。

使用 AWK 和 R 解析 25TB
当我开始使用新进程进行分区时,一切都很好,但仍然存在瓶颈 - 将 S3 对象下载到磁盘的速度不是很快,并且没有完全并行化。 为了解决这个问题,我这样做了:

  1. 我发现可以直接在管道中实现 S3 下载阶段,完全消除磁盘上的中间存储。 这意味着我可以避免将原始数据写入磁盘,并在 AWS 上使用更小、因此更便宜的存储。
  2. 团队 aws configure set default.s3.max_concurrent_requests 50 大大增加了 AWS CLI 使用的线程数量(默认情况下有 10 个)。
  3. 我切换到针对网络速度进行了优化的 EC2 实例,名称中带有字母 n。 我发现使用 n 实例时处理能力的损失可以通过加载速度的提高得到补偿。 对于大多数任务,我使用 c5n.4xl。
  4. 改变了 gzippigz,这是一个 gzip 工具,可以做很酷的事情来并行化最初非并行的解压缩文件任务(这帮助最少)。

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

这些步骤相互结合,可以使一切工作非常快速。 通过提高下载速度并消除磁盘​​写入,我现在可以在短短几个小时内处理 5 TB 的包。

这条推文应该提到“TSV”。 唉。

使用新解析的数据

我学到了什么:Spark喜欢未压缩的数据,不喜欢合并分区。

现在数据以解包(读:共享)和半有序格式存储在 S3 中,我可以再次返回 Spark。 等待我的是一个惊喜:我再次未能达到我想要的目标! 很难准确地告诉 Spark 数据是如何分区的。 即使当我这样做时,结果发现分区太多(95),当我使用 coalesce 将它们的数量减少到合理的限度,这破坏了我的分区。 我确信这个问题可以解决,但经过几天的搜索,我找不到解决方案。 我最终完成了 Spark 中的所有任务,尽管这花了一些时间,而且我分割的 Parquet 文件也不是很小(~200 KB)。 然而,数据正是需要的地方。

使用 AWK 和 R 解析 25TB
太小了,凹凸不平,太棒了!

测试本地 Spark 查询

我学到了什么:Spark 在解决简单问题时开销太大。

通过以巧妙的格式下载数据,我能够测试速度。 设置一个 R 脚本来运行本地 Spark 服务器,然后从指定的 Parquet 组存储(bin)加载 Spark 数据帧。 我尝试加载所有数据,但无法让 Sparklyr 识别分区。

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

执行耗时 29,415 秒。 好多了,但对于任何东西的大规模测试来说都不太好。 此外,我无法通过缓存来加快速度,因为当我尝试在内存中缓存数据帧时,Spark 总是崩溃,即使我为重量小于 50 的数据集分配了超过 15 GB 的内存。

返回 AWK

我学到了什么:AWK 中的关联数组非常高效。

我意识到我可以达到更高的速度。 我记得在一次美妙的 Bruce Barnett 的 AWK 教程 我读到了一个很酷的功能,叫做“关联数组” 本质上,这些是键值对,由于某种原因,它们在 AWK 中的调用方式不同,因此我对它们没有多想。 罗曼·切普利亚卡 回想起术语“关联数组”比术语“键值对”要古老得多。 即使你 在 Google Ngram 中查找键值,你不会在那里看到这个术语,但你会发现关联数组! 此外,“键值对”通常与数据库相关联,因此将其与哈希图进行比较更有意义。 我意识到我可以使用这些关联数组将我的 SNP 与 bin 表和原始数据关联起来,而无需使用 Spark。

为此,我在 AWK 脚本中使用了块 BEGIN。 这是在第一行数据传递到脚本主体之前执行的一段代码。

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

团队 while(getline...) 加载 CSV 组 (bin) 中的所有行,将第一列(SNP 名称)设置为关联数组的键 bin 并将第二个值(组)作为值。 然后在区块中 { },在主文件的所有行上执行,每一行都被发送到输出文件,该文件根据其组(bin)接收唯一的名称: ..._bin_"bin[$1]"_....

变量 batch_num и chunk_id 匹配管道提供的数据,避免竞争条件,并且每个执行线程都在运行 parallel,写入其自己独特的文件。

由于我将所有原始数据分散到之前使用 AWK 进行实验时留下的染色体上的文件夹中,现在我可以编写另一个 Bash 脚本来一次处理一个染色体,并将更深层次的分区数据发送到 S3。

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

该脚本有两个部分 parallel.

在第一部分中,从包含所需染色体信息的所有文件中读取数据,然后将该数据分布在线程中,线程将文件分布到适当的组(bin)中。 为了避免多个线程写入同一文件时出现竞争情况,AWK 传递文件名以将数据写入不同的位置,例如 chr_10_bin_52_batch_2_aa.csv。 结果,在磁盘上创建了许多小文件(为此我使用了 TB 级 EBS 卷)。

第二段输送机 parallel 遍历组 (bin) 并将其各个文件合并到通用 CSV c 中 cat然后将它们发送出去。

用 R 广播?

我学到了什么: 可以联系 stdin и stdout 来自 R 脚本,因此在管道中使用它。

您可能已经注意到 Bash 脚本中的这一行: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R...。 它将所有串联的组文件 (bin) 转换为下面的 R 脚本。 {} 是一种特殊的技术 parallel,它将发送到指定流的任何数据直接插入命令本身。 选项 {#} 提供唯一的线程ID,并且 {%} 代表作业槽号(重复,但绝不同时)。 所有选项的列表可以在以下位置找到 文档。

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

当一个变量 file("stdin") 传送至 readr::read_csv,翻译成 R 脚本的数据被加载到一个框架中,然后采用以下形式 .rds- 文件使用 aws.s3 直接写入S3。

RDS 类似于 Parquet 的初级版本,没有扬声器存储的装饰。

完成 Bash 脚本后我得到了一个包 .rds- 位于 S3 中的文件,这使我能够使用高效的压缩和内置类型。

尽管使用了R刹车,一切都进展得很快。 毫不奇怪,R 中读取和写入数据的部分经过了高度优化。 在对一条中等大小的染色体进行测试后,该工作在 C5n.4xl 实例上在大约两个小时内完成。

S3 的限制

我学到了什么:得益于智能路径实施,S3 可以处理许多文件。

我担心 S3 是否能够处理传输到它的大量文件。 我可以使文件名有意义,但 S3 将如何查找它们?

使用 AWK 和 R 解析 25TB
S3中的文件夹只是为了显示,实际上系统对符号不感兴趣 /. 来自 S3 常见问题解答页面。

看起来,S3 将特定文件的路径表示为某种哈希表或基于文档的数据库中的简单键。 存储桶可以被认为是一张表,文件可以被认为是该表中的记录。

由于速度和效率对于亚马逊的盈利非常重要,因此这种“密钥即文件路径”系统得到了极大的优化也就不足为奇了。 我试图找到一个平衡点:这样我就不必发出大量的 get 请求,但又可以快速执行请求。 事实证明,最好制作20万个左右的bin文件。 我认为如果我们继续优化,我们可以实现速度的提高(例如,为数据制作一个特殊的桶,从而减少查找表的大小)。 但没有时间或金钱进行进一步的实验。

交叉兼容性怎么样?

我学到的知识:浪费时间的首要原因是过早地优化存储方法。

此时,问自己非常重要:“为什么使用专有文件格式?” 原因在于加载速度(gzip 压缩的 CSV 文件加载时间延长了 7 倍)以及与我们工作流程的兼容性。 我可能会重新考虑 R 是否可以在没有 Spark 加载的情况下轻松加载 Parquet(或 Arrow)文件。 我们实验室中的每个人都使用 R,如果我需要将数据转换为另一种格式,我仍然有原始文本数据,因此我可以再次运行管道。

分工

我学到了什么:不要尝试手动优化作业,让计算机来完成。

我已经在一条染色体上调试了工作流程,现在我需要处理所有其他数据。
我想提高几个 EC2 实例进行转换,但同时我担心不同处理作业之间的负载非常不平衡(就像 Spark 遭受不平衡分区的困扰一样)。 此外,我对为每条染色体增加一个实例不感兴趣,因为对于 AWS 账户来说,默认限制为 10 个实例。

然后我决定用 R 编写一个脚本来优化处理作业。

首先,我让S3计算每条染色体占用了多少存储空间。

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

然后我编写了一个函数,它获取总大小,打乱染色体的顺序,将它们分组 num_jobs 并告诉您所有加工作业的尺寸有何不同。

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

然后我使用 purrr 进行了一千次洗牌,并选择了最好的。

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

所以我最终完成了一组大小非常相似的任务。 然后剩下的就是将我之前的 Bash 脚本包装在一个大循环中 for。 这个优化写了大概10分钟。 如果任务不平衡,这比我手动创建任务的花费要少得多。 因此,我认为我的初步优化是正确的。

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

最后我添加关闭命令:

sudo shutdown -h now

...一切顺利! 使用 AWS CLI,我使用以下选项引发了实例 user_data 给他们提供任务的 Bash 脚本进行处理。 它们会自动运行和关闭,因此我无需为额外的处理能力付费。

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

我们打包吧!

我学到了什么:为了使用的方便性和灵活性,API应该简单。

最后我得到了正确位置和形式的数据。 剩下的就是尽可能简化使用数据的过程,让我的同事更容易使用。 我想制作一个简单的 API 来创建请求。 如果将来我决定从 .rds 到 Parquet 文件,那么这应该是我的问题,而不是我的同事的问题。 为此,我决定制作一个内部 R 包。

构建并记录一个非常简单的包,其中仅包含一些围绕函数组织的数据访问函数 get_snp。 我还为同事做了一个网站 pkgdown,以便他们可以轻松查看示例和文档。

使用 AWK 和 R 解析 25TB

智能缓存

我学到了什么:如果你的数据准备充分,缓存就会很容易!

由于主要工作流程之一将相同的分析模型应用于 SNP 包,因此我决定使用分箱来发挥我的优势。 通过 SNP 传输数据时,组 (bin) 中的所有信息都会附加到返回的对象中。 也就是说,旧查询(理论上)可以加快新查询的处理速度。

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

在构建包时,我运行了许多基准测试来比较使用不同方法时的速度。 我建议不要忽视这一点,因为有时结果是意想不到的。 例如, dplyr::filter 比使用基于索引的过滤捕获行要快得多,并且从过滤后的数据帧中检索单个列比使用索引语法要快得多。

请注意该对象 prev_snp_results 包含密钥 snps_in_bin。 这是一个组 (bin) 中所有唯一 SNP 的数组,可让您快速检查是否已拥有先前查询的数据。 它还可以轻松地使用以下代码循环遍历组(bin)中的所有 SNP:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

结果

现在我们可以(并且已经开始认真地)运行我们以前无法访问的模型和场景。 最好的事情是我的实验室同事不必考虑任何并发症。 他们只是有一个有效的功能。

虽然这个包没有告诉他们细节,但我试图让数据格式足够简单,以便如果我明天突然消失,他们可以弄清楚......

速度明显提高了。 我们通常扫描具有重要功能的基因组片段。 以前我们做不到这一点(结果发现成本太高),但现在,得益于组(bin)结构和缓存,请求一个SNP平均需要不到0,1秒,并且数据使用量如此之多。 S3 的成本很低。

结论

本文根本不是指南。 事实证明,该解决方案是因人而异的,而且几乎肯定不是最佳的。 相反,它是一篇游记。 我希望其他人明白,这样的决定似乎并不是在头脑中完全形成的,它们是反复试验的结果。 另外,如果您正在寻找数据科学家,请记住,有效使用这些工具需要经验,而经验需要花钱。 我很高兴我有能力支付费用,但许多其他能比我做得更好的人将永远没有机会,因为缺乏资金甚至尝试。

大数据工具用途广泛。 如果您有时间,您几乎肯定可以使用智能数据清理、存储和提取技术编写更快的解决方案。 最终归结为成本效益分析。

我学到的是:

  • 没有廉价的方法可以一次解析 25 TB;
  • 请注意 Parquet 文件的大小及其组织;
  • Spark中的分区必须是平衡的;
  • 一般来说,永远不要尝试创建 2,5 万个分区;
  • 排序仍然很困难,设置 Spark 也很困难;
  • 有时特殊的数据需要特殊的解决方案;
  • Spark聚合很快,但分区仍然很昂贵;
  • 当他们教你基础知识时不要睡觉,早在 1980 世纪 XNUMX 年代就有人可能已经解决了你的问题;
  • gnu parallel - 这是一个神奇的东西,每个人都应该使用它;
  • Spark喜欢未压缩的数据,不喜欢合并分区;
  • Spark在解决简单问题时开销太大;
  • AWK 的关联数组非常高效;
  • 你可以联系 stdin и stdout 来自 R 脚本,因此在管道中使用它;
  • 由于智能路径的实现,S3可以处理许多文件;
  • 浪费时间的主要原因是过早地优化你的存储方法;
  • 不要尝试手动优化任务,让计算机来完成;
  • API应该简单,以便使用方便和灵活;
  • 如果您的数据准备充分,缓存就会很容易!

来源: habr.com

添加评论