Streame kolonnedata med Apache Arrow

Oversettelsen av artikkelen ble utarbeidet spesielt for studentene på kurset Dataingeniør.

Streame kolonnedata med Apache Arrow

I løpet av de siste ukene har vi Nong Li lagt til Apache pil binært strømformat, som supplerer det allerede eksisterende random access/IPC-filformatet. Vi har Java- og C++-implementeringer og Python-bindinger. I denne artikkelen skal jeg forklare hvordan formatet fungerer og vise hvordan du kan oppnå svært høy datagjennomstrømning for en pandas DataFrame.

Streaming av kolonnedata

Et vanlig spørsmål jeg får fra Arrow-brukere er de høye kostnadene ved å migrere store tabellformede datasett fra et rad- eller postorientert format til et kolonneformat. For multi-gigabyte datasett kan transponering i minne eller på disk være overveldende.

For strømmedata, enten kildedataene er rad eller kolonne, er ett alternativ å sende små grupper med rader, som hver inneholder et kolonneoppsett internt.

I Apache Arrow kalles en samling av kolonneformede arrays i minnet som representerer en tabelldel en postbatch. For å representere en enkelt datastruktur i en logisk tabell, kan du samle flere sett med poster.

I det eksisterende "random access"-filformatet skriver vi metadata som inneholder tabellskjemaet og blokkoppsettet på slutten av filen, som lar deg velge en hvilken som helst batch av poster eller hvilken som helst kolonne fra datasettet veldig billig. I et streamingformat sender vi en serie meldinger: et skjema og deretter en eller flere grupper med poster.

De forskjellige formatene ser omtrent slik ut som dette bildet:

Streame kolonnedata med Apache Arrow

Streame data i PyArrow: applikasjon

For å vise deg hvordan det fungerer, lager jeg et eksempeldatasett som representerer en enkelt strømklump:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Anta nå at vi ønsker å skrive 1 GB data, bestående av 1 MB biter hver, for totalt 1024 biter. Først, la oss lage den første 1 MB dataramme med 16 kolonner:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Så konverterer jeg dem til pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nå skal jeg lage en utdatastrøm som vil skrive til RAM og lage StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Deretter vil vi skrive 1024 biter, som til slutt vil utgjøre et 1 GB datasett:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Siden vi skrev i RAM, kan vi få hele strømmen i en buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Fordi disse dataene er i minnet, oppnås lesing av grupper med pilposter ved en nullkopioperasjon. Jeg åpner StreamReader, leser data inn pyarrow.Tableog deretter konvertere dem til DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Alt dette er selvfølgelig bra, men du kan ha spørsmål. Hvor fort skjer det? Hvordan påvirker chunk-størrelsen pandas DataFrame-henting?

Strømmeytelse

Etter hvert som størrelsen på strømmingsdelen reduseres, øker kostnadene ved å rekonstruere en sammenhengende kolonneformet DataFrame i pandaer på grunn av ineffektive hurtigbuffertilgangsordninger. Det er også noen overhead ved å jobbe med C++-datastrukturer og -matriser og deres minnebuffere.

For 1 MB som ovenfor, på min bærbare datamaskin (Quad-core Xeon E3-1505M) viser det seg:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Det viser seg at den effektive gjennomstrømningen er 7.75 Gb/s for å gjenopprette en 1 GB DataFrame fra 1024 1 MB biter. Hva skjer hvis vi bruker større eller mindre biter? Her er resultatene du får:

Streame kolonnedata med Apache Arrow

Ytelsen synker betydelig fra 256K til 64K biter. Jeg ble overrasket over at 1MB-biter ble behandlet raskere enn 16MB-biter. Det er verdt å gjøre en grundigere undersøkelse og forstå om dette er en normalfordeling eller noe annet er involvert.

I den nåværende implementeringen av formatet er dataene i prinsippet ikke komprimert, så størrelsen i minnet og "på ledningen" er omtrent den samme. Komprimering kan bli et alternativ i fremtiden.

Total

Streaming av kolonnedata kan være en effektiv måte å overføre store datasett til kolonneanalyseverktøy som pandaer i små biter. Datatjenester som bruker radorientert lagring kan overføre og transponere små biter av data som er mer praktisk for prosessorens L2- og L3-cache.

Full kode

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Kilde: www.habr.com

Legg til en kommentar