A traduzzione di l'articulu hè stata preparata apposta per i studienti di u corsu
In l'ultime settimane, avemu
Streaming di dati di colonna
Una quistione cumuna ch'e aghju da l'utilizatori di Arrow hè l'altu costu di a migrazione di grandi datasets tabulari da un formatu orientatu à fila o registru à un formatu di colonna. Per i datasets multi-gigabyte, a trasposizione in memoria o nantu à u discu pò esse abbastanti.
Per i dati in streaming, sia chì i dati fonte sò fila o colonna, una opzione hè di mandà picculi lotti di fila, ognuna chì cuntene un layout di colonna internamente.
In Apache Arrow, una cullizzioni di matrici di colonna in memoria chì rapprisentanu un pezzu di tavula hè chjamata batch record. Per rapprisintà una sola struttura di dati di una tavola logica, pudete cullà parechji setti di registri.
In u formatu di schedariu esistenti "accessu aleatoriu", scrivemu metadata chì cuntene u schema di a tavula è u schema di bloccu à a fine di u schedariu, chì permette di selezziunà qualsiasi batch of records or any column from the dataset assai prezzu. In un formatu di streaming, mandemu una seria di missaghji: un schema, è dopu unu o più batch of records.
I diversi formati pareanu qualcosa cum'è sta stampa:
Streaming di dati in PyArrow: applicazione
Per mostrà cumu funziona, aghju da creà un esempiu di dataset chì rapprisenta un unicu pezzu di flussu:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Avà, supponi chì vulemu scrive 1 GB di dati, custituiti da 1 MB chunks ognunu, per un totale di 1024 chunks. Prima, creemu u primu quadru di dati 1MB cù 16 colonne:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Allora li cunvertisce pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Avà criaraghju un flussu di output chì scriverà in RAM è creà StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Allora scriveremu 1024 pezzi, chì eventualmente custituiscenu un dataset di 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Siccomu avemu scrittu in RAM, pudemu uttene tuttu u flussu in un buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Perchè sta dati hè in memoria, a lettura di batchs di registri Arrow hè ottenuta da una operazione zero-copia. Apertu StreamReader, leghje e dati in pyarrow.Table
e poi li cunvertisce à DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Tuttu chistu, sicuru, hè bonu, ma pudete avè dumande. Quantu veloce succede? Cumu a dimensione di chunk influenza u rendiment di ricuperazione di DataFrame di panda?
Prestazione di streaming
Cume a dimensione di u chunk di streaming diminuisce, u costu di ricustruisce un DataFrame columnar contiguu in panda aumenta per via di schemi d'accessu di cache inefficienti. Ci hè ancu qualchì overhead da travaglià cù strutture di dati C++ è array è i so buffer di memoria.
Per 1 MB cum'è sopra, nantu à u mo laptop (Quad-core Xeon E3-1505M) risulta:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Risulta chì u throughput effettivu hè di 7.75 Gb / s per restaurà un DataFrame 1 GB da 1024 pezzi di 1 MB. Chì succede se usemu pezzi più grande o più chjucu? Eccu i risultati chì avete:
U rendiment scende significativamente da 256K à 64K chunks. Eru surprised chì i pezzi 1MB sò stati processati più veloce di i pezzi 16MB. Hè vale a pena di fà un studiu più approfonditu è capisce s'ellu hè una distribuzione normale o qualcosa altru hè implicatu.
In l'implementazione attuale di u furmatu, i dati ùn sò micca cumpressi in principiu, cusì a dimensione in memoria è "nantu à u filu" hè apprussimatamente a stessa. A cumpressione pò diventà una opzione in u futuru.
U risultatu
Streaming di dati di colonna pò esse un modu efficaci per trasfiriri grandi datasets à strumenti di analisi di colonna cum'è panda in picculi pezzi. I servizii di dati chì utilizanu l'almacenamiento orientatu à a fila ponu trasferisce è traspone picculi pezzi di dati chì sò più convenienti per a cache L2 è L3 di u processatore.
Codice cumpletu
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com