Spaltendaten mit Apache Arrow streamen

Die Übersetzung des Artikels wurde speziell für die Studierenden des Kurses erstellt Dateningenieur.

Spaltendaten mit Apache Arrow streamen

In den letzten Wochen haben wir Nong Li hinzugefügt zu Apache-Pfeil binäres Streaming-Format, das das bestehende Direktzugriffs-/IPC-Dateiformat ergänzt. Wir verfügen über Java- und C++-Implementierungen sowie Python-Anbindungen. In diesem Artikel erkläre ich die Funktionsweise des Formats und zeige, wie man einen sehr hohen Datendurchsatz für einen Pandas DataFrame erreichen kann.

Spaltendaten streamen

Eine häufige Frage, die ich von Arrow-Benutzern erhalte, betrifft die hohen Kosten für die Migration großer Mengen tabellarischer Daten von einem zeilen- oder datensatzorientierten Format in ein spaltenorientiertes Format. Bei Datensätzen mit mehreren Gigabyte kann die Transponierung im Speicher oder auf der Festplatte eine überwältigende Aufgabe sein.

Um Daten zu streamen, unabhängig davon, ob es sich bei den Quelldaten um Zeilen- oder Spaltendaten handelt, besteht eine Möglichkeit darin, kleine Zeilenstapel zu senden, die jeweils ein Spaltenlayout enthalten.

In Apache Arrow wird die Sammlung von speicherinternen Spaltenarrays, die einen Tabellenblock darstellen, als Datensatzstapel bezeichnet. Um eine einzelne Datenstruktur einer logischen Tabelle darzustellen, können mehrere Datensatzstapel gesammelt werden.

Im bestehenden „Random Access“-Dateiformat zeichnen wir Metadaten mit dem Tabellenschema und den Blockspeicherorten am Ende der Datei auf, sodass Sie äußerst kostengünstig jeden Stapel von Datensätzen oder jede Spalte aus einem Datensatz auswählen können. Im Streaming-Format senden wir eine Reihe von Nachrichten: eine Gliederung und dann einen oder mehrere Datensatzstapel.

Die verschiedenen Formate sehen etwa so aus:

Spaltendaten mit Apache Arrow streamen

Streaming-Daten in PyArrow: Anwendung

Um Ihnen zu zeigen, wie das funktioniert, erstelle ich einen Beispieldatensatz, der einen einzelnen Stream-Block darstellt:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Nehmen wir nun an, wir möchten 1 GB Daten schreiben, die aus Blöcken von jeweils 1 MB bestehen, also insgesamt 1024 Blöcke. Erstellen wir zunächst den ersten 1-MB-Datenrahmen mit 16 Spalten:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Dann konvertiere ich sie in pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Jetzt werde ich einen Ausgabestream erstellen, der in den RAM schreibt und erstellt StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Dann schreiben wir 1024 Chunks, was letztendlich einem 1-GB-Datensatz entspricht:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Da wir in den RAM geschrieben haben, können wir den gesamten Stream in einem Puffer abrufen:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Da sich diese Daten im Speicher befinden, ist das Lesen von Stapeln von Arrow-Datensätzen ein Vorgang ohne Kopie. Ich öffne StreamReader und lese Daten ein pyarrow.Table, und konvertieren Sie sie dann in DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Das alles ist natürlich gut, aber Sie haben vielleicht Fragen. Wie schnell geht das? Wie wirkt sich die Blockgröße auf die Abrufleistung von Pandas DataFrame aus?

Streaming-Leistung

Wenn die Größe des Streaming-Blocks abnimmt, steigen die Kosten für die Rekonstruktion eines zusammenhängenden spaltenförmigen DataFrames in Pandas aufgrund ineffizienter Cache-Zugriffsmuster. Außerdem entsteht durch die Arbeit mit C++-Datenstrukturen und -Arrays sowie deren Speicherpuffern ein gewisser Mehraufwand.

Für 1 MB, wie oben, auf meinem Laptop (Quad-Core Xeon E3-1505M) ergibt sich:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Es stellt sich heraus, dass der effektive Durchsatz 7.75 GB/s beträgt, um einen 1-GB-DataFrame aus 1024 1-MB-Blöcken wiederherzustellen. Was passiert, wenn wir größere oder kleinere Stücke verwenden? Das sind die Ergebnisse:

Spaltendaten mit Apache Arrow streamen

Die Leistung sinkt erheblich von 256 auf 64 Blöcke. Ich war überrascht, dass 1-MB-Blöcke schneller verarbeitet wurden als 16-MB-Blöcke. Es lohnt sich, eine gründlichere Untersuchung durchzuführen und zu verstehen, ob es sich um eine Normalverteilung handelt oder ob etwas anderes im Spiel ist.

In der aktuellen Implementierung des Formats werden die Daten grundsätzlich nicht komprimiert, sodass die Größe im Speicher und „in den Leitungen“ ungefähr gleich ist. In Zukunft könnte die Komprimierung eine zusätzliche Option sein.

Ergebnis

Das Streamen von spaltenorientierten Daten kann eine effektive Möglichkeit sein, große Datensätze in kleinen Teilen in spaltenorientierte Analysetools wie Pandas einzuspeisen. Datendienste, die zeilenorientierten Speicher verwenden, können kleine Datenblöcke übertragen und transponieren, die für die L2- und L3-Caches Ihres Prozessors besser geeignet sind.

Vollständiger Code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: habr.com

Kommentar hinzufügen