使用 Apache Arrow 流式傳輸列數據

文章的翻譯是專門為課程的學生準備的 數據工程師.

使用 Apache Arrow 流式傳輸列數據

在過去的幾周里,我們 李農 添加到 阿帕奇箭 二進制流格式,補充現有的隨機訪問/IPC 文件格式。 我們有 Java 和 C++ 實現以及 Python 綁定。 在本文中,我將解釋該格式的工作原理,並展示如何為 pandas DataFrame 實現非常高的數據吞吐量。

流式傳輸列數據

我從 Arrow 用戶那裡得到的一個常見問題是,將大型表格數據集從面向行或面向記錄的格式遷移到列格式的成本很高。 對於數 GB 的數據集,在內存或磁盤上進行轉置可能會讓人不知所措。

對於流數據,無論源數據是行還是列,一種選擇是發送小批量的行,每行內部包含一個列佈局。

在 Apache Arrow 中,表示表塊的內存中柱狀數組的集合稱為記錄批處理。 要表示邏輯表的單個數據結構,您可以收集多個記錄包。

在現有的“隨機訪問”文件格式中,我們在文件末尾寫入包含表架構和塊佈局的元數據,這使您可以非常便宜地從數據集中選擇任何一批記錄或任何列。 在流格式中,我們發送一系列消息:一個模式,然後是一批或多批記錄。

不同的格式看起來像這張圖片:

使用 Apache Arrow 流式傳輸列數據

PyArrow 中的流式數據:應用程序

為了向您展示它是如何工作的,我將創建一個代表單個流塊的示例數據集:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

現在,假設我們要寫入 1 GB 的數據,每個數據塊由 1 MB 的塊組成,總共有 1024 個塊。 首先,讓我們創建第一個 1MB 的 16 列數據框:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

然後我將它們轉換為 pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

現在我將創建一個輸出流,它將寫入 RAM 並創建 StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

然後我們將寫入 1024 個塊,最終將組成一個 1GB 的數據集:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

由於我們在 RAM 中寫入,我們可以在一個緩衝區中獲取整個流:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

由於此數據在內存中,因此讀取批次的 Arrow 記錄是通過零複製操作獲得的。 我打開 StreamReader,將數據讀入 pyarrow.Table然後將它們轉換為 DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

當然,這一切都很好,但您可能會有疑問。 它發生的速度有多快? 塊大小如何影響 pandas DataFrame 檢索性能?

流媒體性能

隨著流塊大小的減小,由於低效的緩存訪問方案,在 pandas 中重建連續的列式 DataFrame 的成本會增加。 使用 C++ 數據結構和數組及其內存緩衝區也會產生一些開銷。

對於上述 1 MB,在我的筆記本電腦(四核 Xeon E3-1505M)上結果如下:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

結果表明,從 7.75 個 1 MB 塊中恢復 1024 GB DataFrame 的有效吞吐量為 1 Gb/s。 如果我們使用更大或更小的塊會發生什麼? 以下是您得到的結果:

使用 Apache Arrow 流式傳輸列數據

性能從 256K 顯著下降到 64K 塊。 我很驚訝 1MB 塊的處理速度比 16MB 塊快。 值得進行更徹底的研究並了解這是正態分佈還是涉及其他因素。

在目前的格式實現中,原則上數據沒有被壓縮,因此內存中的大小和“線上”的大小大致相同。 壓縮在未來可能成為一種選擇。

流式傳輸列數據是一種將大型數據集以小塊形式傳輸到列分析工具(如 pandas)的有效方式。 使用面向行存儲的數據服務可以傳輸和轉置更方便處理器的 L2 和 L3 緩存的小數據塊。

完整代碼

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

來源: www.habr.com

添加評論