Streaming Kolonnendaten mat Apache Arrow

D'Iwwersetzung vum Artikel gouf speziell fir d'Schüler vum Cours virbereet Daten Ingenieur.

Streaming Kolonnendaten mat Apache Arrow

An de leschte Wochen hu mir Nong Li dobäi Apache Pfeil binäre Stream-Format, ergänzt dat scho existent zoufälleg Zougang / IPC Dateiformat. Mir hunn Java an C ++ Implementatiounen a Python Bindungen. An dësem Artikel wäert ech erklären wéi d'Format Wierker a weisen, wéi Dir ganz héich Daten Débit fir e pandas DataFrame erreechen kann.

Streaming Kolonnendaten

Eng gemeinsam Fro, déi ech vu Pfeil Benotzer kréien, sinn déi héich Käschte fir grouss tabulär Datesätz vun engem Zeil- oder Rekordorientéierten Format an e Kolonnformat ze migréieren. Fir Multi-Gigabyte Datesätz, Transposéiere an Erënnerung oder op Disk kann iwwerwältegend sinn.

Fir Streamingdaten, egal ob d'Quelldaten Zeil oder Kolonn sinn, ass eng Optioun kleng Chargen vu Reihen ze schécken, all mat engem Kolonnlayout intern.

An Apache Arrow gëtt eng Sammlung vun In-Memory columnar Arrays, déi en Dëschstéck representéieren, e Rekordbatch genannt. Fir eng eenzeg Datestruktur vun enger logescher Tabell ze representéieren, kënnt Dir verschidde Pakete vu Rekorder sammelen.

Am existente "zoufälleg Zougang" Dateiformat schreiwen mir Metadaten déi den Tabellschema an de Blocklayout um Enn vun der Datei enthalen, wat Iech erlaabt Iech all Batch vu Rekorder oder all Kolonn aus der Datesaz ganz bëlleg ze wielen. An engem Streaming Format schéckt mir eng Serie vu Messagen: e Schema, an dann een oder méi Chargen vun records.

Déi verschidde Formater kucken eppes wéi dëst Bild:

Streaming Kolonnendaten mat Apache Arrow

Streaming Daten an PyArrow: Applikatioun

Fir Iech ze weisen wéi et funktionnéiert, erstellen ech e Beispill Datesaz deen en eenzege Stream Stéck representéiert:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Elo, ugeholl, mir wëllen 1 GB vun Daten schreiwen, déi all aus 1 MB Stécker besteet, fir insgesamt 1024 Stécker. Als éischt, loosst eis den éischten 1MB Dateframe mat 16 Kolonnen erstellen:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Da konvertéieren ech se op pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Elo wäert ech en Output Stream erstellen deen op RAM schreift an erstellt StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Da schreiwen mir 1024 Stécker, déi schliisslech en 1GB Dataset ausmaachen:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Well mir am RAM geschriwwen hunn, kënne mir de ganze Stream an engem Puffer kréien:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Well dës Donnéeën an der Erënnerung sinn, gëtt d'Liesen vu Pfeilrecords duerch eng Nullkopie-Operatioun kritt. Ech oppen StreamReader, liesen Daten an pyarrow.Tablea konvertéiert se dann op DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

All dëst ass natierlech gutt, awer Dir hutt vläicht Froen. Wéi séier geschitt et? Wéi beaflosst d'Stéckgréisst d'Pandas DataFrame Retrieverleistung?

Streaming Leeschtung

Wéi d'Streaming Chunk Gréisst erofgeet, klammen d'Käschte fir e kontinuéierleche Kolumnär DataFrame a Pandas ze rekonstruéieren wéinst ineffizienten Cache Zougang Schemaen. Et gëtt och e puer Overhead vun der Aarbecht mat C ++ Datestrukturen an Arrays an hiren Erënnerungsbuffer.

Fir 1 MB wéi uewen, op mengem Laptop (Quad-Core Xeon E3-1505M) stellt sech eraus:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Et stellt sech eraus datt den effektiven Duerchgang 7.75 Gb / s ass fir en 1 GB DataFrame vun 1024 1 MB Stécker ze restauréieren. Wat geschitt wa mir méi grouss oder méi kleng Stécker benotzen? Hei sinn d'Resultater déi Dir kritt:

Streaming Kolonnendaten mat Apache Arrow

D'Performance fällt wesentlech vun 256K op 64K Stécker. Ech war iwwerrascht datt 1MB Stécker méi séier veraarbecht goufen wéi 16MB Stécker. Et ass derwäert eng méi grëndlech Etude ze maachen a verstoen ob dëst eng normal Verdeelung ass oder soss eppes involvéiert ass.

An der aktueller Ëmsetzung vum Format sinn d'Donnéeën am Prinzip net kompriméiert, sou datt d'Gréisst an der Erënnerung an "um Drot" ongeféier d'selwecht ass. Kompressioun kann an Zukunft eng Optioun ginn.

D 'Resultat

Streaming Kolonndaten kënnen en effiziente Wee sinn fir grouss Datesätz op Kolonnanalyse-Tools wéi Pandas a klenge Stécker ze transferéieren. Dateservicer, déi Rei-orientéiert Späichere benotzen, kënne kleng Stécker vun Daten transferéieren an transposéieren, déi méi praktesch sinn fir de L2- a L3-Cache vun Ärem Prozessor.

Voll Code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: will.com

Setzt e Commentaire