Transmissió de dades de columna amb Apache Arrow

La traducció de l'article s'ha elaborat expressament per als alumnes del curs Enginyer de dades.

Transmissió de dades de columna amb Apache Arrow

Durant les últimes setmanes hem fet Nong Li afegit a Apache Arrow format de streaming binari, que complementa el format de fitxer d'accés aleatori/IPC existent. Tenim implementacions Java i C++ i enllaços Python. En aquest article, explicaré com funciona el format i mostraré com podeu aconseguir un rendiment de dades molt elevat per a un DataFrame pandas.

Transmissió de dades de la columna

Una pregunta habitual que rebo dels usuaris d'Arrow és l'alt cost de migrar grans conjunts de dades tabulars d'un format orientat a files o registres a un format orientat a columnes. Per a conjunts de dades de diversos gigabytes, la transposició a la memòria o al disc pot ser una tasca aclaparadora.

Per transmetre dades, independentment de si les dades d'origen són fila o columna, una opció és enviar petits lots de files, cadascuna conté un disseny de columna a l'interior.

A Apache Arrow, la col·lecció de matrius de columnes en memòria que representen un tros de taula s'anomena lot de registres. Per representar una única estructura de dades d'una taula lògica, es poden recollir diversos lots de registres.

En el format de fitxer d'"accés aleatori" existent, registrem les metadades que contenen l'esquema de la taula i la disposició del bloc al final del fitxer, cosa que us permet seleccionar de manera molt econòmica qualsevol lot de registres o qualsevol columna d'un conjunt de dades. En format de streaming, enviem una sèrie de missatges: un esquema i després un o més lots de registres.

Els diferents formats semblen així:

Transmissió de dades de columna amb Apache Arrow

Transmissió de dades a PyArrow: aplicació

Per mostrar-vos com funciona això, crearé un exemple de conjunt de dades que representa un únic fragment de flux:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Ara, suposem que volem escriure 1 GB de dades, que consisteixen en fragments d'1 MB cadascun, per a un total de 1024 fragments. Per començar, creem el primer marc de dades d'1 MB amb 16 columnes:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Llavors els convertí pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Ara crearé un flux de sortida que escriurà a la memòria RAM i crearà StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

A continuació, escriurem 1024 fragments, que en última instància suposaran un conjunt de dades d'1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Com que vam escriure a la memòria RAM, podem obtenir tot el flux en un sol buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Com que aquestes dades es troben a la memòria, llegir lots de registres Arrow és una operació de còpia zero. Obro StreamReader, llegeixo dades pyarrow.Table, i després convertir-los a DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tot això, per descomptat, és bo, però és possible que tingueu preguntes. Amb quina rapidesa passa això? Com afecta la mida del tros al rendiment de la recuperació de Pandas DataFrame?

Rendiment de streaming

A mesura que la mida del fragment de transmissió disminueix, el cost de reconstruir un DataFrame columnar contigu en pandes augmenta a causa dels patrons d'accés a la memòria cau ineficients. També hi ha una mica de sobrecàrrega per treballar amb estructures i matrius de dades C++ i els seus buffers de memòria.

Per a 1 MB, com l'anterior, al meu ordinador portàtil (Quad-core Xeon E3-1505M) resulta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Resulta que el rendiment efectiu és de 7.75 GB/s per restaurar un DataFrame d'1 GB a partir de 1024 fragments d'1 MB. Què passa si fem servir trossos més grans o més petits? Aquests són els resultats:

Transmissió de dades de columna amb Apache Arrow

El rendiment baixa significativament de 256K a 64K trossos. Em va sorprendre que els trossos d'1 MB es processessin més ràpidament que els de 16 MB. Val la pena fer un estudi més exhaustiu i entendre si es tracta d'una distribució normal o si hi ha alguna cosa més en joc.

En la implementació actual del format, les dades no es comprimeixen en principi, de manera que la mida a la memòria i "en els cables" és aproximadament la mateixa. En el futur, la compressió pot convertir-se en una opció addicional.

Total

La transmissió de dades en columna pot ser una manera eficaç d'alimentar grans conjunts de dades a eines d'anàlisi de columnes com ara els pandes en petits trossos. Els serveis de dades que utilitzen emmagatzematge orientat a files poden transferir i transposar petits blocs de dades que són més convenients per a les memòries cau L2 i L3 del vostre processador.

Codi complet

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Font: www.habr.com

Afegeix comentari