使用 Apache Arrow 串流傳輸列數據

文章的翻譯是專門為課程的學生準備的 數據工程師.

使用 Apache Arrow 串流傳輸列數據

過去幾週,我們 農力 添加到 阿帕奇箭 二進位流格式,補充現有的隨機存取/IPC檔案格式。我們有 Java 和 C++ 實作以及 Python 綁定。在本文中,我將解釋該格式的工作原理,並展示如何為 pandas DataFrame 實現非常高的資料吞吐量。

列式資料流

Arrow 使用者經常問我的一個問題是,將大型表格資料集從行式或記錄式格式遷移到列式格式的成本很高。對於數 GB 的資料集,在記憶體或磁碟上轉置可能是一項艱鉅的任務。

對於流數據,無論來源資料是面向行還是面向列,一種選擇是發送小批量的行,每行內部包含面向列的佈局。

在 Apache Arrow 中,表示表格塊的記憶體中列式陣列的集合稱為記錄批次。為了表示邏輯表的單一資料結構,可以收集多批記錄。

在現有的「隨機存取」文件格式中,我們在文件末尾寫入包含表模式和塊位置的元數據,從而允許您以極其低廉的成本從數據集中選擇任何一批記錄或任何列。在串流格式中,我們傳送一系列訊息:一個模式,然後是一批或多批記錄。

不同的格式看起來像這樣:

使用 Apache Arrow 串流傳輸列數據

PyArrow 中的串流資料:應用程式

為了向您展示這是如何運作的,我將建立一個代表一個流塊的範例資料集:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

現在,假設我們要寫入 1GB 的數據,每個區塊由 1MB 組成,總共 1024 個區塊。首先,讓我們建立第一個資料框,大小為 1MB,有 16 個欄位:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

然後我將它們轉換為 pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

現在我將創建一個輸出流,它將寫入 RAM 並創建 StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

然後我們將寫入 1024 個區塊,最終組成 1GB 的資料集:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

由於我們寫入了 RAM,因此我們可以在一個緩衝區中獲取整個流:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

由於這些資料位於記憶體中,因此讀取 Arrow 記錄包是零拷貝操作。我打開StreamReader,讀取數據 pyarrow.Table,然後將其轉換為 DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

當然,這一切都是好的,但您可能會有疑問。這發生得有多快?區塊大小如何影響 Pandas DataFrame 檢索效能?

串流媒體效能

隨著流塊大小的減小,由於快取存取模式低效,在 Pandas 中重建連續列式 DataFrame 的成本會增加。使用 C++ 資料結構和陣列及其記憶體緩衝區也會產生一些開銷。

如上所述,對於 1 MB,在我的筆記型電腦(四核心 Xeon E3-1505M)上,我得到:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

這使得從 7.75 個 1MB 區塊重建 1024GB DataFrame 的有效吞吐量達到 1 Gbps。如果我們使用更大或更小的塊會發生什麼?這些是您將獲得的結果:

使用 Apache Arrow 串流傳輸列數據

性能從 256K 到 64K 塊顯著下降。令我驚訝的是,1MB 區塊的處理速度比 16MB 區塊的處理速度更快。值得進行更徹底的調查,看看這是否是正常分佈,或者是否有其他因素在起作用。

在該格式的目前實作中,資料原則上不被壓縮,因此記憶體中和「線路上」的大小大致相同。壓縮將來可能會成為可選功能。

流式列式資料可以成為一種有效的方法,以小塊形式將大型資料集輸入到 Pandas 等列式分析工具中。使用行式儲存的資料服務可以傳輸和轉置小塊數據,這對於 CPU 的 L2 和 L3 快取來說更方便。

完整代碼

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

來源: www.habr.com