Oszlopos adatok streamelése Apache Arrow segítségével

A cikk fordítása kifejezetten a kurzus hallgatói számára készült Adatmérnök.

Oszlopos adatok streamelése Apache Arrow segítségével

Az elmúlt hetekben mi Nong Li hozzáadott Apache nyíl bináris streaming formátum, amely kiegészíti a meglévő véletlen hozzáférésű/IPC fájlformátumot. Java és C++ implementációink és Python kötéseink vannak. Ebben a cikkben elmagyarázom a formátum működését, és megmutatom, hogyan érhet el nagyon nagy adatátviteli sebességet egy pandas DataFrame-en.

Oszlopadatok streamelése

Gyakori kérdés, amit az Arrow felhasználóktól kapok, a táblázatos adatok nagy készleteinek sor- vagy rekordorientált formátumból oszloporientált formátumba való migrálásának magas költségei. A több gigabájtos adatkészletek esetében a memóriába vagy a lemezre történő transzponálás nehéz feladat lehet.

Az adatok streameléséhez, függetlenül attól, hogy a forrásadatok sorok vagy oszlopok, az egyik lehetőség a sorok kis kötegeinek küldése, amelyek mindegyike oszlopos elrendezést tartalmaz.

Az Apache Arrow programban a tábladarabot reprezentáló, memórián belüli oszloptömbök gyűjteményét rekord kötegnek nevezzük. Egy logikai tábla egyetlen adatszerkezetének megjelenítéséhez több rekord köteg gyűjthető.

A meglévő "véletlen hozzáférésű" fájlformátumban rögzítjük a tábla sémát és a blokkhelyeket tartalmazó metaadatokat a fájl végén, így rendkívül olcsón kiválasztható egy adathalmazból tetszőleges rekordköteg vagy oszlop. Streaming formátumban üzenetek sorozatát küldjük: egy vázlatot, majd egy vagy több rekord köteget.

A különböző formátumok valahogy így néznek ki:

Oszlopos adatok streamelése Apache Arrow segítségével

Adatok streamelése a PyArrow-ban: Alkalmazás

Hogy megmutassam ennek működését, létrehozok egy példaadatkészletet, amely egyetlen adatfolyam-darabot reprezentál:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Tegyük fel, hogy 1 GB adatot szeretnénk írni, amely egyenként 1 MB-os darabokból áll, összesen 1024 darabot. Kezdésként hozzuk létre az első 1 MB-os adatkeretet 16 oszloppal:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Aztán átalakítom őket pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Most létrehozok egy kimeneti adatfolyamot, amely a RAM-ba ír és létrehoz StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Ezután 1024 darabot írunk, ami végül egy 1 GB-os adatkészletet jelent:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Mivel a RAM-ba írtunk, a teljes adatfolyamot egy pufferben kaphatjuk meg:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Mivel ezek az adatok a memóriában vannak, az Arrow rekordok kötegeinek olvasása nulla másolati művelet. Megnyitom a StreamReader-t, beolvasom az adatokat pyarrow.Table, majd konvertálja őket a következőre DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Mindez természetesen jó, de lehetnek kérdései. Milyen gyorsan történik ez? Hogyan befolyásolja a darab mérete a pandák DataFrame visszakeresési teljesítményét?

Streaming teljesítmény

Ahogy a streaming csonk mérete csökken, a nem hatékony gyorsítótár-hozzáférési minták miatt nő az összefüggő oszlopos DataFrame rekonstrukciós költsége pandákban. A C++ adatstruktúrákkal és tömbökkel, valamint azok memóriapuffereivel való munka némi többletköltséggel is jár.

1 MB-ért, mint fent, a laptopomon (négymagos Xeon E3-1505M) kiderül:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Kiderült, hogy az effektív átviteli sebesség 7.75 GB/s az 1 GB-os DataFrame visszaállításához 1024 1 MB-os darabból. Mi történik, ha nagyobb vagy kisebb darabokat használunk? Ezek az eredmények:

Oszlopos adatok streamelése Apache Arrow segítségével

A teljesítmény jelentősen csökken 256 64-ról 1 16 darabra. Meglepett, hogy az XNUMX MB-os darabokat gyorsabban dolgozták fel, mint a XNUMX MB-os darabokat. Érdemes alaposabb vizsgálatot végezni, és megérteni, hogy ez normális eloszlásról van-e szó, vagy valami másról van szó.

A formátum jelenlegi megvalósításában az adatok elvileg nincsenek tömörítve, így a méret a memóriában és a „vezetékekben” megközelítőleg azonos. A jövőben a tömörítés további lehetőséggé válhat.

Teljes

Az oszlopos adatok streamelése hatékony módja lehet nagy adatkészletek oszlopos elemző eszközökbe, például pandákba való betáplálásának kis darabokban. A sororientált tárhelyet használó adatszolgáltatások kis adatdarabokat tudnak átvinni és transzponálni, amelyek kényelmesebbek a processzor L2 és L3 gyorsítótárai számára.

Teljes kód

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Forrás: will.com

Hozzászólás