Apache Arrow を使用した列データのストリーミング

記事の翻訳はコースの学生向けに特別に用意されました データエンジニア.

Apache Arrow を使用した列データのストリーミング

過去数週間にわたって、私たちは ノン・リー に追加 アパッチアロー 既存のランダム アクセス/IPC ファイル形式を補完するバイナリ ストリーミング形式。 Java と C++ の実装と Python バインディングがあります。 この記事では、この形式がどのように機能するかを説明し、pandas DataFrame で非常に高いデータ スループットを実現する方法を示します。

列データのストリーミング

Arrow ユーザーからよく寄せられる質問は、大規模な表形式データのセットを行またはレコード指向の形式から列指向の形式に移行するのにコストがかかるというものです。 マルチギガバイトのデータセットの場合、メモリ内またはディスク上での転置は膨大な作業になる可能性があります。

データをストリーミングするには、ソース データが行であるか列であるかに関係なく、行の小さなバッチを送信し、各行の内部に列状のレイアウトを含めることが XNUMX つのオプションです。

Apache Arrow では、テーブル チャンクを表すメモリ内の列配列のコレクションをレコード バッチと呼びます。 論理テーブルの単一のデータ構造を表すために、レコードのいくつかのバッチを収集できます。

既存の「ランダム アクセス」ファイル形式では、テーブル スキーマとブロックの位置を含むメタデータがファイルの最後に記録されるため、データ セットからレコードのバッチや列を非常に安価に選択できます。 ストリーミング形式では、アウトラインと XNUMX つ以上のレコードのバッチという一連のメッセージを送信します。

さまざまな形式は次のようになります。

Apache Arrow を使用した列データのストリーミング

PyArrow でのデータのストリーミング: アプリケーション

これがどのように機能するかを示すために、単一のストリーム チャンクを表すサンプル データセットを作成します。

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

ここで、それぞれ 1 MB のチャンク、合計 1 のチャンクで構成される 1024 GB のデータを書き込むとします。 まず、1 列を持つ最初の 16 MB データ フレームを作成しましょう。

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

次に、それらを次のように変換します pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

ここで、RAM に書き込み、 StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

次に、1024 個のチャンクを書き込み、最終的には 1 GB のデータ セットになります。

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

RAM に書き込んだため、ストリーム全体を XNUMX つのバッファーで取得できます。

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

このデータはメモリ内にあるため、Arrow レコードのバッチの読み取りはゼロコピー操作です。 StreamReaderを開き、データを読み取ります pyarrow.Table、そしてそれらをに変換します DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

もちろん、これはすべて良いことですが、疑問があるかもしれません。 これはどのくらいの速さで起こりますか? チャンク サイズは pandas DataFrame の取得パフォーマンスにどのように影響しますか?

ストリーミングパフォーマンス

ストリーミング チャンク サイズが減少すると、キャッシュ アクセス パターンが非効率になるため、pandas で連続した列形式の DataFrame を再構築するコストが増加します。 また、C++ データ構造、配列、およびそれらのメモリ バッファーの操作によるオーバーヘッドも発生します。

上記のように、1 MB の場合、私のラップトップ (クアッドコア Xeon E3-1505M) では次のようになります。

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75 個の 1MB チャンクから 1024GB データフレームを復元する場合の実効スループットは 1 GB/秒であることがわかります。 より大きなチャンクまたはより小さなチャンクを使用するとどうなりますか? 結果は次のとおりです。

Apache Arrow を使用した列データのストリーミング

パフォーマンスは 256K チャンクから 64K チャンクに大幅に低下します。 1 MB チャンクの方が 16 MB チャンクよりも高速に処理されたことに驚きました。 より徹底的な調査を実施し、これが正規分布なのか、それとも何か別の要因があるのか​​を理解する価値があります。

このフォーマットの現在の実装では、原則としてデータは圧縮されていないため、メモリ内のサイズと「ワイヤ上の」サイズはほぼ同じです。 将来的には、圧縮が追加のオプションになる可能性があります。

合計

列形式データのストリーミングは、大規模なデータ セットを小さなチャンクでパンダなどの列形式分析ツールにフィードする効果的な方法となります。 行指向ストレージを使用するデータ サービスは、プロセッサの L2 および L3 キャッシュにとってより便利な小さなデータ チャンクを転送および転置できます。

完全なコード

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

出所: habr.com

コメントを追加します