Streaming di dati di colonna cù Apache Arrow

A traduzzione di l'articulu hè stata preparata apposta per i studienti di u corsu Ingegnere di dati.

Streaming di dati di colonna cù Apache Arrow

In l'ultime settimane, avemu Nong Li aghjuntu à Apache Arrow formatu di flussu binariu, chì cumplementa l'accessu aleatoriu esistente / furmatu di u schedariu IPC. Avemu implementazioni Java è C++ è associazioni Python. In questu articulu, vi spiegheraghju cumu u formatu travaglia è mostra cumu pudete ottene un throughput di dati assai altu per un panda DataFrame.

Streaming di dati di colonna

Una quistione cumuna ch'e aghju da l'utilizatori di Arrow hè l'altu costu di a migrazione di grandi datasets tabulari da un formatu orientatu à fila o registru à un formatu di colonna. Per i datasets multi-gigabyte, a trasposizione in memoria o nantu à u discu pò esse abbastanti.

Per i dati in streaming, sia chì i dati fonte sò fila o colonna, una opzione hè di mandà picculi lotti di fila, ognuna chì cuntene un layout di colonna internamente.

In Apache Arrow, una cullizzioni di matrici di colonna in memoria chì rapprisentanu un pezzu di tavula hè chjamata batch record. Per rapprisintà una sola struttura di dati di una tavola logica, pudete cullà parechji setti di registri.

In u formatu di schedariu esistenti "accessu aleatoriu", scrivemu metadata chì cuntene u schema di a tavula è u schema di bloccu à a fine di u schedariu, chì permette di selezziunà qualsiasi batch of records or any column from the dataset assai prezzu. In un formatu di streaming, mandemu una seria di missaghji: un schema, è dopu unu o più batch of records.

I diversi formati pareanu qualcosa cum'è sta stampa:

Streaming di dati di colonna cù Apache Arrow

Streaming di dati in PyArrow: applicazione

Per mostrà cumu funziona, aghju da creà un esempiu di dataset chì rapprisenta un unicu pezzu di flussu:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Avà, supponi chì vulemu scrive 1 GB di dati, custituiti da 1 MB chunks ognunu, per un totale di 1024 chunks. Prima, creemu u primu quadru di dati 1MB cù 16 colonne:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Allora li cunvertisce pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Avà criaraghju un flussu di output chì scriverà in RAM è creà StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Allora scriveremu 1024 pezzi, chì eventualmente custituiscenu un dataset di 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Siccomu avemu scrittu in RAM, pudemu uttene tuttu u flussu in un buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Perchè sta dati hè in memoria, a lettura di batchs di registri Arrow hè ottenuta da una operazione zero-copia. Apertu StreamReader, leghje e dati in pyarrow.Tablee poi li cunvertisce à DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tuttu chistu, sicuru, hè bonu, ma pudete avè dumande. Quantu veloce succede? Cumu a dimensione di chunk influenza u rendiment di ricuperazione di DataFrame di panda?

Prestazione di streaming

Cume a dimensione di u chunk di streaming diminuisce, u costu di ricustruisce un DataFrame columnar contiguu in panda aumenta per via di schemi d'accessu di cache inefficienti. Ci hè ancu qualchì overhead da travaglià cù strutture di dati C++ è array è i so buffer di memoria.

Per 1 MB cum'è sopra, nantu à u mo laptop (Quad-core Xeon E3-1505M) risulta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Risulta chì u throughput effettivu hè di 7.75 Gb / s per restaurà un DataFrame 1 GB da 1024 pezzi di 1 MB. Chì succede se usemu pezzi più grande o più chjucu? Eccu i risultati chì avete:

Streaming di dati di colonna cù Apache Arrow

U rendiment scende significativamente da 256K à 64K chunks. Eru surprised chì i pezzi 1MB sò stati processati più veloce di i pezzi 16MB. Hè vale a pena di fà un studiu più approfonditu è ​​capisce s'ellu hè una distribuzione normale o qualcosa altru hè implicatu.

In l'implementazione attuale di u furmatu, i dati ùn sò micca cumpressi in principiu, cusì a dimensione in memoria è "nantu à u filu" hè apprussimatamente a stessa. A cumpressione pò diventà una opzione in u futuru.

U risultatu

Streaming di dati di colonna pò esse un modu efficaci per trasfiriri grandi datasets à strumenti di analisi di colonna cum'è panda in picculi pezzi. I servizii di dati chì utilizanu l'almacenamiento orientatu à a fila ponu trasferisce è traspone picculi pezzi di dati chì sò più convenienti per a cache L2 è L3 di u processatore.

Codice cumpletu

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Add a comment