D'Iwwersetzung vum Artikel gouf speziell fir d'Schüler vum Cours virbereet
An de leschte Wochen hu mir
Streaming Kolonnendaten
Eng gemeinsam Fro, déi ech vu Pfeil Benotzer kréien, sinn déi héich Käschte fir grouss tabulär Datesätz vun engem Zeil- oder Rekordorientéierten Format an e Kolonnformat ze migréieren. Fir Multi-Gigabyte Datesätz, Transposéiere an Erënnerung oder op Disk kann iwwerwältegend sinn.
Fir Streamingdaten, egal ob d'Quelldaten Zeil oder Kolonn sinn, ass eng Optioun kleng Chargen vu Reihen ze schécken, all mat engem Kolonnlayout intern.
An Apache Arrow gëtt eng Sammlung vun In-Memory columnar Arrays, déi en Dëschstéck representéieren, e Rekordbatch genannt. Fir eng eenzeg Datestruktur vun enger logescher Tabell ze representéieren, kënnt Dir verschidde Pakete vu Rekorder sammelen.
Am existente "zoufälleg Zougang" Dateiformat schreiwen mir Metadaten déi den Tabellschema an de Blocklayout um Enn vun der Datei enthalen, wat Iech erlaabt Iech all Batch vu Rekorder oder all Kolonn aus der Datesaz ganz bëlleg ze wielen. An engem Streaming Format schéckt mir eng Serie vu Messagen: e Schema, an dann een oder méi Chargen vun records.
Déi verschidde Formater kucken eppes wéi dëst Bild:
Streaming Daten an PyArrow: Applikatioun
Fir Iech ze weisen wéi et funktionnéiert, erstellen ech e Beispill Datesaz deen en eenzege Stream Stéck representéiert:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Elo, ugeholl, mir wëllen 1 GB vun Daten schreiwen, déi all aus 1 MB Stécker besteet, fir insgesamt 1024 Stécker. Als éischt, loosst eis den éischten 1MB Dateframe mat 16 Kolonnen erstellen:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Da konvertéieren ech se op pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Elo wäert ech en Output Stream erstellen deen op RAM schreift an erstellt StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Da schreiwen mir 1024 Stécker, déi schliisslech en 1GB Dataset ausmaachen:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Well mir am RAM geschriwwen hunn, kënne mir de ganze Stream an engem Puffer kréien:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Well dës Donnéeën an der Erënnerung sinn, gëtt d'Liesen vu Pfeilrecords duerch eng Nullkopie-Operatioun kritt. Ech oppen StreamReader, liesen Daten an pyarrow.Table
a konvertéiert se dann op DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
All dëst ass natierlech gutt, awer Dir hutt vläicht Froen. Wéi séier geschitt et? Wéi beaflosst d'Stéckgréisst d'Pandas DataFrame Retrieverleistung?
Streaming Leeschtung
Wéi d'Streaming Chunk Gréisst erofgeet, klammen d'Käschte fir e kontinuéierleche Kolumnär DataFrame a Pandas ze rekonstruéieren wéinst ineffizienten Cache Zougang Schemaen. Et gëtt och e puer Overhead vun der Aarbecht mat C ++ Datestrukturen an Arrays an hiren Erënnerungsbuffer.
Fir 1 MB wéi uewen, op mengem Laptop (Quad-Core Xeon E3-1505M) stellt sech eraus:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Et stellt sech eraus datt den effektiven Duerchgang 7.75 Gb / s ass fir en 1 GB DataFrame vun 1024 1 MB Stécker ze restauréieren. Wat geschitt wa mir méi grouss oder méi kleng Stécker benotzen? Hei sinn d'Resultater déi Dir kritt:
D'Performance fällt wesentlech vun 256K op 64K Stécker. Ech war iwwerrascht datt 1MB Stécker méi séier veraarbecht goufen wéi 16MB Stécker. Et ass derwäert eng méi grëndlech Etude ze maachen a verstoen ob dëst eng normal Verdeelung ass oder soss eppes involvéiert ass.
An der aktueller Ëmsetzung vum Format sinn d'Donnéeën am Prinzip net kompriméiert, sou datt d'Gréisst an der Erënnerung an "um Drot" ongeféier d'selwecht ass. Kompressioun kann an Zukunft eng Optioun ginn.
D 'Resultat
Streaming Kolonndaten kënnen en effiziente Wee sinn fir grouss Datesätz op Kolonnanalyse-Tools wéi Pandas a klenge Stécker ze transferéieren. Dateservicer, déi Rei-orientéiert Späichere benotzen, kënne kleng Stécker vun Daten transferéieren an transposéieren, déi méi praktesch sinn fir de L2- a L3-Cache vun Ärem Prozessor.
Voll Code
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: will.com