La traducció de l'article s'ha elaborat expressament per als alumnes del curs
Durant les últimes setmanes hem fet
Transmissió de dades de la columna
Una pregunta habitual que rebo dels usuaris d'Arrow és l'alt cost de migrar grans conjunts de dades tabulars d'un format orientat a files o registres a un format orientat a columnes. Per a conjunts de dades de diversos gigabytes, la transposició a la memòria o al disc pot ser una tasca aclaparadora.
Per transmetre dades, independentment de si les dades d'origen són fila o columna, una opció és enviar petits lots de files, cadascuna conté un disseny de columna a l'interior.
A Apache Arrow, la col·lecció de matrius de columnes en memòria que representen un tros de taula s'anomena lot de registres. Per representar una única estructura de dades d'una taula lògica, es poden recollir diversos lots de registres.
En el format de fitxer d'"accés aleatori" existent, registrem les metadades que contenen l'esquema de la taula i la disposició del bloc al final del fitxer, cosa que us permet seleccionar de manera molt econòmica qualsevol lot de registres o qualsevol columna d'un conjunt de dades. En format de streaming, enviem una sèrie de missatges: un esquema i després un o més lots de registres.
Els diferents formats semblen així:
Transmissió de dades a PyArrow: aplicació
Per mostrar-vos com funciona això, crearé un exemple de conjunt de dades que representa un únic fragment de flux:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Ara, suposem que volem escriure 1 GB de dades, que consisteixen en fragments d'1 MB cadascun, per a un total de 1024 fragments. Per començar, creem el primer marc de dades d'1 MB amb 16 columnes:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Llavors els convertí pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Ara crearé un flux de sortida que escriurà a la memòria RAM i crearà StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
A continuació, escriurem 1024 fragments, que en última instància suposaran un conjunt de dades d'1 GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Com que vam escriure a la memòria RAM, podem obtenir tot el flux en un sol buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Com que aquestes dades es troben a la memòria, llegir lots de registres Arrow és una operació de còpia zero. Obro StreamReader, llegeixo dades pyarrow.Table
, i després convertir-los a DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Tot això, per descomptat, és bo, però és possible que tingueu preguntes. Amb quina rapidesa passa això? Com afecta la mida del tros al rendiment de la recuperació de Pandas DataFrame?
Rendiment de streaming
A mesura que la mida del fragment de transmissió disminueix, el cost de reconstruir un DataFrame columnar contigu en pandes augmenta a causa dels patrons d'accés a la memòria cau ineficients. També hi ha una mica de sobrecàrrega per treballar amb estructures i matrius de dades C++ i els seus buffers de memòria.
Per a 1 MB, com l'anterior, al meu ordinador portàtil (Quad-core Xeon E3-1505M) resulta:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Resulta que el rendiment efectiu és de 7.75 GB/s per restaurar un DataFrame d'1 GB a partir de 1024 fragments d'1 MB. Què passa si fem servir trossos més grans o més petits? Aquests són els resultats:
El rendiment baixa significativament de 256K a 64K trossos. Em va sorprendre que els trossos d'1 MB es processessin més ràpidament que els de 16 MB. Val la pena fer un estudi més exhaustiu i entendre si es tracta d'una distribució normal o si hi ha alguna cosa més en joc.
En la implementació actual del format, les dades no es comprimeixen en principi, de manera que la mida a la memòria i "en els cables" és aproximadament la mateixa. En el futur, la compressió pot convertir-se en una opció addicional.
Total
La transmissió de dades en columna pot ser una manera eficaç d'alimentar grans conjunts de dades a eines d'anàlisi de columnes com ara els pandes en petits trossos. Els serveis de dades que utilitzen emmagatzematge orientat a files poden transferir i transposar petits blocs de dades que són més convenients per a les memòries cau L2 i L3 del vostre processador.
Codi complet
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Font: www.habr.com