Strömma kolumndata med Apache Arrow

Översättningen av artikeln förbereddes speciellt för kursens studenter Dataingenjör.

Strömma kolumndata med Apache Arrow

Under de senaste veckorna har vi Nong Li lagt till Apache -pil binärt strömformat, som kompletterar det redan befintliga random access/IPC-filformatet. Vi har Java- och C++-implementationer och Python-bindningar. I den här artikeln kommer jag att förklara hur formatet fungerar och visa hur du kan uppnå mycket hög datagenomströmning för en pandas DataFrame.

Strömmande kolumndata

En vanlig fråga jag får från Arrow-användare är den höga kostnaden för att migrera stora tabelluppsättningar från ett rad- eller postorienterat format till ett kolumnformat. För multi-gigabyte datamängder kan transponering i minne eller på disk vara överväldigande.

För strömmande data, oavsett om källdata är rad eller kolumn, är ett alternativ att skicka små satser av rader, som var och en innehåller en kolumnlayout internt.

I Apache Arrow kallas en samling kolumnära arrayer i minnet som representerar en tabellbit en postbatch. För att representera en enda datastruktur i en logisk tabell kan du samla flera uppsättningar poster.

I det befintliga "random access"-filformatet skriver vi metadata som innehåller tabellschemat och blocklayouten i slutet av filen, vilket gör att du kan välja valfri batch av poster eller vilken kolumn som helst från datasetet mycket billigt. I ett streamingformat skickar vi en serie meddelanden: ett schema och sedan en eller flera poster.

De olika formaten ser ut ungefär som den här bilden:

Strömma kolumndata med Apache Arrow

Strömmande data i PyArrow: applikation

För att visa dig hur det fungerar skapar jag ett exempel på en datauppsättning som representerar en enskild strömbit:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Anta nu att vi vill skriva 1 GB data, bestående av 1 MB bitar vardera, för totalt 1024 bitar. Låt oss först skapa den första 1MB dataramen med 16 kolumner:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Sedan konverterar jag dem till pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nu ska jag skapa en utdataström som kommer att skriva till RAM och skapa StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Sedan kommer vi att skriva 1024 bitar, som så småningom kommer att utgöra en datauppsättning på 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Eftersom vi skrev i RAM, kan vi få hela strömmen i en buffert:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Eftersom dessa data finns i minnet erhålls läsning av batcher av pilposter genom en nollkopieringsoperation. Jag öppnar StreamReader, läser in data pyarrow.Tableoch sedan konvertera dem till DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Allt detta är naturligtvis bra, men du kanske har frågor. Hur snabbt händer det? Hur påverkar chunk-storleken pandas DataFrame-hämtningsprestanda?

Streamingprestanda

När storleken på strömmande bitar minskar, ökar kostnaden för att rekonstruera en sammanhängande kolumnär DataFrame i pandor på grund av ineffektiva cacheåtkomstscheman. Det finns också en del omkostnader från att arbeta med C++-datastrukturer och arrayer och deras minnesbuffertar.

För 1 MB enligt ovan, på min bärbara dator (Quad-core Xeon E3-1505M) visar det sig:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Det visar sig att den effektiva genomströmningen är 7.75 Gb/s för att återställa en 1 GB DataFrame från 1024 1 MB bitar. Vad händer om vi använder större eller mindre bitar? Här är resultaten du får:

Strömma kolumndata med Apache Arrow

Prestanda sjunker avsevärt från 256K till 64K bitar. Jag blev förvånad över att 1MB-bitar bearbetades snabbare än 16MB-bitar. Det är värt att göra en mer grundlig studie och förstå om det är en normalfördelning eller om något annat är inblandat.

I den nuvarande implementeringen av formatet komprimeras inte data i princip, så storleken i minnet och "på tråden" är ungefär densamma. Komprimering kan bli ett alternativ i framtiden.

Totalt

Strömmande kolumndata kan vara ett effektivt sätt att överföra stora datamängder till kolumnanalysverktyg som pandor i små bitar. Datatjänster som använder radorienterad lagring kan överföra och överföra små bitar av data som är mer bekväma för din processors L2- och L3-cache.

Fullständig kod

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Källa: will.com

Lägg en kommentar