Die Übersetzung des Artikels wurde speziell für die Studierenden des Kurses erstellt
In den letzten Wochen haben wir
Spaltendaten streamen
Eine häufige Frage, die ich von Arrow-Benutzern erhalte, betrifft die hohen Kosten für die Migration großer Mengen tabellarischer Daten von einem zeilen- oder datensatzorientierten Format in ein spaltenorientiertes Format. Bei Datensätzen mit mehreren Gigabyte kann die Transponierung im Speicher oder auf der Festplatte eine überwältigende Aufgabe sein.
Um Daten zu streamen, unabhängig davon, ob es sich bei den Quelldaten um Zeilen- oder Spaltendaten handelt, besteht eine Möglichkeit darin, kleine Zeilenstapel zu senden, die jeweils ein Spaltenlayout enthalten.
In Apache Arrow wird die Sammlung von speicherinternen Spaltenarrays, die einen Tabellenblock darstellen, als Datensatzstapel bezeichnet. Um eine einzelne Datenstruktur einer logischen Tabelle darzustellen, können mehrere Datensatzstapel gesammelt werden.
Im bestehenden „Random Access“-Dateiformat zeichnen wir Metadaten mit dem Tabellenschema und den Blockspeicherorten am Ende der Datei auf, sodass Sie äußerst kostengünstig jeden Stapel von Datensätzen oder jede Spalte aus einem Datensatz auswählen können. Im Streaming-Format senden wir eine Reihe von Nachrichten: eine Gliederung und dann einen oder mehrere Datensatzstapel.
Die verschiedenen Formate sehen etwa so aus:
Streaming-Daten in PyArrow: Anwendung
Um Ihnen zu zeigen, wie das funktioniert, erstelle ich einen Beispieldatensatz, der einen einzelnen Stream-Block darstellt:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Nehmen wir nun an, wir möchten 1 GB Daten schreiben, die aus Blöcken von jeweils 1 MB bestehen, also insgesamt 1024 Blöcke. Erstellen wir zunächst den ersten 1-MB-Datenrahmen mit 16 Spalten:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Dann konvertiere ich sie in pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Jetzt werde ich einen Ausgabestream erstellen, der in den RAM schreibt und erstellt StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Dann schreiben wir 1024 Chunks, was letztendlich einem 1-GB-Datensatz entspricht:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Da wir in den RAM geschrieben haben, können wir den gesamten Stream in einem Puffer abrufen:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Da sich diese Daten im Speicher befinden, ist das Lesen von Stapeln von Arrow-Datensätzen ein Vorgang ohne Kopie. Ich öffne StreamReader und lese Daten ein pyarrow.Table
, und konvertieren Sie sie dann in DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Das alles ist natürlich gut, aber Sie haben vielleicht Fragen. Wie schnell geht das? Wie wirkt sich die Blockgröße auf die Abrufleistung von Pandas DataFrame aus?
Streaming-Leistung
Wenn die Größe des Streaming-Blocks abnimmt, steigen die Kosten für die Rekonstruktion eines zusammenhängenden spaltenförmigen DataFrames in Pandas aufgrund ineffizienter Cache-Zugriffsmuster. Außerdem entsteht durch die Arbeit mit C++-Datenstrukturen und -Arrays sowie deren Speicherpuffern ein gewisser Mehraufwand.
Für 1 MB, wie oben, auf meinem Laptop (Quad-Core Xeon E3-1505M) ergibt sich:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Es stellt sich heraus, dass der effektive Durchsatz 7.75 GB/s beträgt, um einen 1-GB-DataFrame aus 1024 1-MB-Blöcken wiederherzustellen. Was passiert, wenn wir größere oder kleinere Stücke verwenden? Das sind die Ergebnisse:
Die Leistung sinkt erheblich von 256 auf 64 Blöcke. Ich war überrascht, dass 1-MB-Blöcke schneller verarbeitet wurden als 16-MB-Blöcke. Es lohnt sich, eine gründlichere Untersuchung durchzuführen und zu verstehen, ob es sich um eine Normalverteilung handelt oder ob etwas anderes im Spiel ist.
In der aktuellen Implementierung des Formats werden die Daten grundsätzlich nicht komprimiert, sodass die Größe im Speicher und „in den Leitungen“ ungefähr gleich ist. In Zukunft könnte die Komprimierung eine zusätzliche Option sein.
Ergebnis
Das Streamen von spaltenorientierten Daten kann eine effektive Möglichkeit sein, große Datensätze in kleinen Teilen in spaltenorientierte Analysetools wie Pandas einzuspeisen. Datendienste, die zeilenorientierten Speicher verwenden, können kleine Datenblöcke übertragen und transponieren, die für die L2- und L3-Caches Ihres Prozessors besser geeignet sind.
Vollständiger Code
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: habr.com