Kolomgegevens streamen met Apache Arrow

De vertaling van het artikel is speciaal gemaakt voor de studenten van de cursus Gegevens ingenieur.

Kolomgegevens streamen met Apache Arrow

De afgelopen weken hebben wij Nong Li toegevoegd aan Apache-pijl binaire stream-indeling, als aanvulling op de reeds bestaande willekeurige toegang/IPC-bestandsindeling. We hebben Java- en C++-implementaties en Python-bindingen. In dit artikel leg ik uit hoe het formaat werkt en laat ik zien hoe je een zeer hoge datadoorvoer kunt bereiken voor een Panda's DataFrame.

Kolomgegevens streamen

Een veelgestelde vraag die ik van Arrow-gebruikers krijg, is de hoge kosten van het migreren van grote datasets in tabelvorm van een rij- of recordgeoriënteerd formaat naar een kolomformaat. Voor datasets van meerdere gigabytes kan transponeren in het geheugen of op schijf overweldigend zijn.

Voor streaming gegevens, ongeacht of de brongegevens rijen of kolommen zijn, is een optie om kleine batches rijen te verzenden, die elk een interne kolomindeling bevatten.

In Apache Arrow wordt een verzameling kolomarrays in het geheugen die een tabelblok vertegenwoordigen, een recordbatch genoemd. Om een ​​enkele gegevensstructuur van een logische tabel weer te geven, kunt u verschillende sets records verzamelen.

In het bestaande "random access" bestandsformaat schrijven we metadata met het tabelschema en de bloklay-out aan het einde van het bestand, waardoor u zeer goedkoop elke batch records of elke kolom uit de dataset kunt selecteren. In een streaming-indeling sturen we een reeks berichten: een schema en vervolgens een of meer batches records.

De verschillende formaten zien er ongeveer zo uit als op deze afbeelding:

Kolomgegevens streamen met Apache Arrow

Gegevens streamen in PyArrow: applicatie

Om u te laten zien hoe het werkt, maak ik een voorbeeld van een dataset die een enkele stream-brok vertegenwoordigt:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Stel nu dat we 1 GB aan gegevens willen schrijven, bestaande uit elk 1 MB chunks, voor een totaal van 1024 chunks. Laten we eerst het eerste dataframe van 1 MB maken met 16 kolommen:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Dan zet ik ze om naar pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nu zal ik een uitvoerstroom maken die naar RAM zal schrijven en creëren StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Vervolgens schrijven we 1024 chunks, die uiteindelijk een dataset van 1 GB zullen vormen:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Omdat we in RAM schreven, kunnen we de hele stream in één buffer krijgen:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Omdat deze gegevens zich in het geheugen bevinden, wordt het lezen van batches van Arrow-records verkregen door een kopieervrije bewerking. Ik open StreamReader, lees gegevens in pyarrow.Tableen zet ze vervolgens om naar DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Dit alles is natuurlijk goed, maar het kan zijn dat u vragen heeft. Hoe snel gaat het? Welke invloed heeft de chunk-grootte op de prestaties van het ophalen van panda's DataFrame?

Streamingprestaties

Naarmate de streaming chunk-grootte afneemt, nemen de kosten van het reconstrueren van een aaneengesloten kolomvormig DataFrame in panda's toe als gevolg van inefficiënte schema's voor cachetoegang. Er is ook enige overhead bij het werken met C++-gegevensstructuren en -arrays en hun geheugenbuffers.

Voor 1 MB zoals hierboven, op mijn laptop (Quad-core Xeon E3-1505M) blijkt:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Het blijkt dat de effectieve doorvoer 7.75 Gb / s is voor het herstellen van een DataFrame van 1 GB uit 1024 brokken van 1 MB. Wat gebeurt er als we grotere of kleinere brokken gebruiken? Dit zijn de resultaten die u krijgt:

Kolomgegevens streamen met Apache Arrow

De prestaties dalen aanzienlijk van 256K naar 64K brokken. Ik was verrast dat brokken van 1 MB sneller werden verwerkt dan brokken van 16 MB. Het is de moeite waard om een ​​meer grondige studie te doen en te begrijpen of dit een normale verdeling is of dat er iets anders bij betrokken is.

In de huidige implementatie van het formaat worden de gegevens in principe niet gecomprimeerd, dus de grootte in het geheugen en "on the wire" is ongeveer hetzelfde. Compressie kan in de toekomst een optie worden.

Totaal

Het streamen van kolomgegevens kan een efficiënte manier zijn om grote gegevenssets in kleine stukjes over te dragen naar hulpmiddelen voor kolomanalyse, zoals panda's. Gegevensservices die rijgeoriënteerde opslag gebruiken, kunnen kleine stukjes gegevens overdragen en transponeren die handiger zijn voor de L2- en L3-cache van uw processor.

Volledige code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Bron: www.habr.com

Voeg een reactie