Streaming dei dati delle colonne con Apache Arrow

La traduzione dell'articolo è stata preparata appositamente per gli studenti del corso Ingegnere dei dati.

Streaming dei dati delle colonne con Apache Arrow

Nelle ultime settimane abbiamo Nong Li aggiunto a Freccia di Apache formato di streaming binario, che integra il formato di file ad accesso casuale/IPC esistente. Abbiamo implementazioni Java e C++ e collegamenti Python. In questo articolo spiegherò come funziona il formato e mostrerò come ottenere un throughput di dati molto elevato per un DataFrame panda.

Streaming dei dati della colonna

Una domanda comune che ricevo dagli utenti di Arrow riguarda il costo elevato della migrazione di grandi set di dati tabulari da un formato orientato alle righe o ai record a un formato orientato alle colonne. Per set di dati multi-gigabyte, la trasposizione in memoria o su disco può essere un compito arduo.

Per eseguire lo streaming dei dati, indipendentemente dal fatto che i dati di origine siano righe o colonne, un'opzione è inviare piccoli batch di righe, ciascuna contenente un layout colonnare al suo interno.

In Apache Arrow, la raccolta di array di colonne in memoria che rappresentano un blocco di tabella è chiamata batch di record. Per rappresentare un'unica struttura dati di una tabella logica, è possibile raccogliere diversi lotti di record.

Nel formato file esistente ad "accesso casuale", registriamo i metadati contenenti lo schema della tabella e le posizioni dei blocchi alla fine del file, consentendoti di selezionare in modo estremamente economico qualsiasi batch di record o qualsiasi colonna da un set di dati. In formato streaming, inviamo una serie di messaggi: una bozza e poi uno o più batch di record.

I diversi formati assomigliano a questo:

Streaming dei dati delle colonne con Apache Arrow

Streaming di dati in PyArrow: applicazione

Per mostrarti come funziona, creerò un set di dati di esempio che rappresenta un singolo blocco di flusso:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Ora, supponiamo di voler scrivere 1 GB di dati, costituiti da blocchi da 1 MB ciascuno, per un totale di 1024 blocchi. Per iniziare, creiamo il primo frame di dati da 1 MB con 16 colonne:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Poi li converto in pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Ora creerò un flusso di output che scriverà nella RAM e creerà StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Quindi scriveremo 1024 blocchi, che alla fine equivarranno a un set di dati da 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Dato che abbiamo scritto nella RAM, possiamo ottenere l'intero flusso in un buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Poiché questi dati sono in memoria, la lettura di batch di record Arrow è un'operazione a copia zero. Apro StreamReader, leggo i dati pyarrow.Table, quindi convertirli in DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tutto questo è, ovviamente, buono, ma potresti avere delle domande. Quanto velocemente accade? In che modo la dimensione del blocco influisce sulle prestazioni di recupero di DataFrame dei panda?

Prestazioni in streaming

Man mano che la dimensione del blocco di streaming diminuisce, il costo di ricostruzione di un DataFrame colonnare contiguo nei panda aumenta a causa di modelli di accesso alla cache inefficienti. C'è anche un certo sovraccarico derivante dall'utilizzo delle strutture di dati e degli array C++ e dei relativi buffer di memoria.

Per 1 MB, come sopra, sul mio laptop (Quad-core Xeon E3-1505M) risulta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Risulta che il throughput effettivo è di 7.75 GB/s per ripristinare un DataFrame da 1 GB da 1024 blocchi da 1 MB. Cosa succede se usiamo pezzi più grandi o più piccoli? Questi sono i risultati:

Streaming dei dati delle colonne con Apache Arrow

Le prestazioni scendono in modo significativo da blocchi da 256K a 64K. Sono rimasto sorpreso dal fatto che i blocchi da 1 MB siano stati elaborati più velocemente dei blocchi da 16 MB. Vale la pena condurre uno studio più approfondito e capire se si tratta di una distribuzione normale o se c'è qualcos'altro in gioco.

Nell'attuale implementazione del formato, i dati in linea di principio non sono compressi, quindi la dimensione in memoria e "nei cavi" è approssimativamente la stessa. In futuro, la compressione potrebbe diventare un'opzione aggiuntiva.

risultato

Lo streaming di dati colonnari può essere un modo efficace per alimentare grandi set di dati in strumenti di analisi colonnare come i panda in piccoli blocchi. I servizi dati che utilizzano l'archiviazione orientata alle righe possono trasferire e trasporre piccole porzioni di dati più convenienti per le cache L2 e L3 del processore.

Codice completo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Fonte: habr.com

Aggiungi un commento