De vertaling van het artikel is speciaal gemaakt voor de studenten van de cursus
De afgelopen weken hebben wij
Kolomgegevens streamen
Een veelgestelde vraag die ik van Arrow-gebruikers krijg, is de hoge kosten van het migreren van grote datasets in tabelvorm van een rij- of recordgeoriënteerd formaat naar een kolomformaat. Voor datasets van meerdere gigabytes kan transponeren in het geheugen of op schijf overweldigend zijn.
Voor streaming gegevens, ongeacht of de brongegevens rijen of kolommen zijn, is een optie om kleine batches rijen te verzenden, die elk een interne kolomindeling bevatten.
In Apache Arrow wordt een verzameling kolomarrays in het geheugen die een tabelblok vertegenwoordigen, een recordbatch genoemd. Om een enkele gegevensstructuur van een logische tabel weer te geven, kunt u verschillende sets records verzamelen.
In het bestaande "random access" bestandsformaat schrijven we metadata met het tabelschema en de bloklay-out aan het einde van het bestand, waardoor u zeer goedkoop elke batch records of elke kolom uit de dataset kunt selecteren. In een streaming-indeling sturen we een reeks berichten: een schema en vervolgens een of meer batches records.
De verschillende formaten zien er ongeveer zo uit als op deze afbeelding:
Gegevens streamen in PyArrow: applicatie
Om u te laten zien hoe het werkt, maak ik een voorbeeld van een dataset die een enkele stream-brok vertegenwoordigt:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Stel nu dat we 1 GB aan gegevens willen schrijven, bestaande uit elk 1 MB chunks, voor een totaal van 1024 chunks. Laten we eerst het eerste dataframe van 1 MB maken met 16 kolommen:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Dan zet ik ze om naar pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nu zal ik een uitvoerstroom maken die naar RAM zal schrijven en creëren StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Vervolgens schrijven we 1024 chunks, die uiteindelijk een dataset van 1 GB zullen vormen:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Omdat we in RAM schreven, kunnen we de hele stream in één buffer krijgen:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Omdat deze gegevens zich in het geheugen bevinden, wordt het lezen van batches van Arrow-records verkregen door een kopieervrije bewerking. Ik open StreamReader, lees gegevens in pyarrow.Table
en zet ze vervolgens om naar DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Dit alles is natuurlijk goed, maar het kan zijn dat u vragen heeft. Hoe snel gaat het? Welke invloed heeft de chunk-grootte op de prestaties van het ophalen van panda's DataFrame?
Streamingprestaties
Naarmate de streaming chunk-grootte afneemt, nemen de kosten van het reconstrueren van een aaneengesloten kolomvormig DataFrame in panda's toe als gevolg van inefficiënte schema's voor cachetoegang. Er is ook enige overhead bij het werken met C++-gegevensstructuren en -arrays en hun geheugenbuffers.
Voor 1 MB zoals hierboven, op mijn laptop (Quad-core Xeon E3-1505M) blijkt:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Het blijkt dat de effectieve doorvoer 7.75 Gb / s is voor het herstellen van een DataFrame van 1 GB uit 1024 brokken van 1 MB. Wat gebeurt er als we grotere of kleinere brokken gebruiken? Dit zijn de resultaten die u krijgt:
De prestaties dalen aanzienlijk van 256K naar 64K brokken. Ik was verrast dat brokken van 1 MB sneller werden verwerkt dan brokken van 16 MB. Het is de moeite waard om een meer grondige studie te doen en te begrijpen of dit een normale verdeling is of dat er iets anders bij betrokken is.
In de huidige implementatie van het formaat worden de gegevens in principe niet gecomprimeerd, dus de grootte in het geheugen en "on the wire" is ongeveer hetzelfde. Compressie kan in de toekomst een optie worden.
Totaal
Het streamen van kolomgegevens kan een efficiënte manier zijn om grote gegevenssets in kleine stukjes over te dragen naar hulpmiddelen voor kolomanalyse, zoals panda's. Gegevensservices die rijgeoriënteerde opslag gebruiken, kunnen kleine stukjes gegevens overdragen en transponeren die handiger zijn voor de L2- en L3-cache van uw processor.
Volledige code
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Bron: www.habr.com