使用 Apache Arrow 流式传输列数据

文章的翻译是专门为课程的学生准备的 数据工程师.

使用 Apache Arrow 流式传输列数据

在过去的几周里,我们 李农 添加到 阿帕奇箭 二进制流格式,补充现有的随机访问/IPC 文件格式。 我们有 Java 和 C++ 实现以及 Python 绑定。 在本文中,我将解释该格式的工作原理,并展示如何为 pandas DataFrame 实现非常高的数据吞吐量。

流式列数据

我从 Arrow 用户那里得到的一个常见问题是,将大型表格数据集从面向行或面向记录的格式迁移到列格式的成本很高。 对于数 GB 的数据集,在内存或磁盘上进行转置可能会让人不知所措。

对于流数据,无论源数据是行还是列,一种选择是发送小批量的行,每行内部包含一个列布局。

在 Apache Arrow 中,表示表块的内存中柱状数组的集合称为记录批处理。 要表示逻辑表的单个数据结构,您可以收集多个记录包。

在现有的“随机访问”文件格式中,我们在文件末尾写入包含表架构和块布局的元数据,这使您可以非常便宜地从数据集中选择任何一批记录或任何列。 在流格式中,我们发送一系列消息:一个模式,然后是一批或多批记录。

不同的格式看起来像这张图片:

使用 Apache Arrow 流式传输列数据

PyArrow 中的流式数据:应用程序

为了向您展示它是如何工作的,我将创建一个代表单个流块的示例数据集:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

现在,假设我们要写入 1 GB 的数据,每个数据块由 1 MB 的块组成,总共有 1024 个块。 首先,让我们创建第一个 1MB 的 16 列数据框:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

然后我将它们转换为 pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

现在我将创建一个输出流,它将写入 RAM 并创建 StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

然后我们将写入 1024 个块,最终将组成一个 1GB 的数据集:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

由于我们在 RAM 中写入,我们可以在一个缓冲区中获取整个流:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

由于此数据在内存中,因此读取批次的 Arrow 记录是通过零复制操作获得的。 我打开 StreamReader,将数据读入 pyarrow.Table然后将它们转换为 DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

当然,这一切都很好,但您可能会有疑问。 它发生的速度有多快? 块大小如何影响 pandas DataFrame 检索性能?

流媒体性能

随着流块大小的减小,由于低效的缓存访问方案,在 pandas 中重建连续的列式 DataFrame 的成本会增加。 使用 C++ 数据结构和数组及其内存缓冲区也会产生一些开销。

对于上述 1 MB,在我的笔记本电脑(四核 Xeon E3-1505M)上结果如下:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

结果表明,从 7.75 个 1 MB 块中恢复 1024 GB DataFrame 的有效吞吐量为 1 Gb/s。 如果我们使用更大或更小的块会发生什么? 以下是您得到的结果:

使用 Apache Arrow 流式传输列数据

性能从 256K 显着下降到 64K 块。 我很惊讶 1MB 块的处理速度比 16MB 块快。 值得进行更彻底的研究并了解这是正态分布还是涉及其他因素。

在目前的格式实现中,原则上数据没有被压缩,因此内存中的大小和“线上”的大小大致相同。 压缩在未来可能成为一种选择。

流式传输列数据是一种将大型数据集以小块形式传输到列分析工具(如 pandas)的有效方式。 使用面向行存储的数据服务可以传输和转置更方便处理器的 L2 和 L3 缓存的小数据块。

完整代码

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

来源: habr.com

添加评论