A cikk fordítása kifejezetten a kurzus hallgatói számára készült
Az elmúlt hetekben mi
Oszlopadatok streamelése
Gyakori kérdés, amit az Arrow felhasználóktól kapok, a táblázatos adatok nagy készleteinek sor- vagy rekordorientált formátumból oszloporientált formátumba való migrálásának magas költségei. A több gigabájtos adatkészletek esetében a memóriába vagy a lemezre történő transzponálás nehéz feladat lehet.
Az adatok streameléséhez, függetlenül attól, hogy a forrásadatok sorok vagy oszlopok, az egyik lehetőség a sorok kis kötegeinek küldése, amelyek mindegyike oszlopos elrendezést tartalmaz.
Az Apache Arrow programban a tábladarabot reprezentáló, memórián belüli oszloptömbök gyűjteményét rekord kötegnek nevezzük. Egy logikai tábla egyetlen adatszerkezetének megjelenítéséhez több rekord köteg gyűjthető.
A meglévő "véletlen hozzáférésű" fájlformátumban rögzítjük a tábla sémát és a blokkhelyeket tartalmazó metaadatokat a fájl végén, így rendkívül olcsón kiválasztható egy adathalmazból tetszőleges rekordköteg vagy oszlop. Streaming formátumban üzenetek sorozatát küldjük: egy vázlatot, majd egy vagy több rekord köteget.
A különböző formátumok valahogy így néznek ki:
Adatok streamelése a PyArrow-ban: Alkalmazás
Hogy megmutassam ennek működését, létrehozok egy példaadatkészletet, amely egyetlen adatfolyam-darabot reprezentál:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Tegyük fel, hogy 1 GB adatot szeretnénk írni, amely egyenként 1 MB-os darabokból áll, összesen 1024 darabot. Kezdésként hozzuk létre az első 1 MB-os adatkeretet 16 oszloppal:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Aztán átalakítom őket pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Most létrehozok egy kimeneti adatfolyamot, amely a RAM-ba ír és létrehoz StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Ezután 1024 darabot írunk, ami végül egy 1 GB-os adatkészletet jelent:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Mivel a RAM-ba írtunk, a teljes adatfolyamot egy pufferben kaphatjuk meg:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Mivel ezek az adatok a memóriában vannak, az Arrow rekordok kötegeinek olvasása nulla másolati művelet. Megnyitom a StreamReader-t, beolvasom az adatokat pyarrow.Table
, majd konvertálja őket a következőre DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Mindez természetesen jó, de lehetnek kérdései. Milyen gyorsan történik ez? Hogyan befolyásolja a darab mérete a pandák DataFrame visszakeresési teljesítményét?
Streaming teljesítmény
Ahogy a streaming csonk mérete csökken, a nem hatékony gyorsítótár-hozzáférési minták miatt nő az összefüggő oszlopos DataFrame rekonstrukciós költsége pandákban. A C++ adatstruktúrákkal és tömbökkel, valamint azok memóriapuffereivel való munka némi többletköltséggel is jár.
1 MB-ért, mint fent, a laptopomon (négymagos Xeon E3-1505M) kiderül:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Kiderült, hogy az effektív átviteli sebesség 7.75 GB/s az 1 GB-os DataFrame visszaállításához 1024 1 MB-os darabból. Mi történik, ha nagyobb vagy kisebb darabokat használunk? Ezek az eredmények:
A teljesítmény jelentősen csökken 256 64-ról 1 16 darabra. Meglepett, hogy az XNUMX MB-os darabokat gyorsabban dolgozták fel, mint a XNUMX MB-os darabokat. Érdemes alaposabb vizsgálatot végezni, és megérteni, hogy ez normális eloszlásról van-e szó, vagy valami másról van szó.
A formátum jelenlegi megvalósításában az adatok elvileg nincsenek tömörítve, így a méret a memóriában és a „vezetékekben” megközelítőleg azonos. A jövőben a tömörítés további lehetőséggé válhat.
Teljes
Az oszlopos adatok streamelése hatékony módja lehet nagy adatkészletek oszlopos elemző eszközökbe, például pandákba való betáplálásának kis darabokban. A sororientált tárhelyet használó adatszolgáltatások kis adatdarabokat tudnak átvinni és transzponálni, amelyek kényelmesebbek a processzor L2 és L3 gyorsítótárai számára.
Teljes kód
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Forrás: will.com