Prevod članka je bil pripravljen posebej za študente tečaja
V zadnjih nekaj tednih smo
Pretakanje podatkov stolpca
Pogosto vprašanje, ki ga prejmem od uporabnikov Arrowa, je visok strošek selitve velikih nizov tabelaričnih podatkov iz formata, usmerjenega v vrstico ali zapis, v format, usmerjen v stolpec. Pri naborih podatkov z več gigabajti je lahko prenos v pomnilnik ali na disk težka naloga.
Za pretakanje podatkov, ne glede na to, ali so izvorni podatki vrstica ali stolpec, je ena možnost pošiljanje majhnih skupin vrstic, od katerih vsaka vsebuje stolpično postavitev.
V Apache Arrow se zbirka nizov stolpcev v pomnilniku, ki predstavlja kos tabele, imenuje paket zapisov. Za predstavitev ene same podatkovne strukture logične tabele je mogoče zbrati več paketov zapisov.
V obstoječem formatu datoteke "naključni dostop" beležimo metapodatke, ki vsebujejo shemo tabele in postavitev bloka na koncu datoteke, kar vam omogoča izjemno poceni izbiro katere koli serije zapisov ali katerega koli stolpca iz nabora podatkov. V pretočnem formatu pošljemo niz sporočil: oris in nato enega ali več sklopov zapisov.
Različni formati izgledajo nekako takole:
Pretakanje podatkov v PyArrow: Aplikacija
Da bi vam pokazal, kako to deluje, bom ustvaril primer nabora podatkov, ki predstavlja en kos toka:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Zdaj pa recimo, da želimo zapisati 1 GB podatkov, sestavljenih iz kosov po 1 MB, kar je skupno 1024 kosov. Za začetek ustvarimo prvi podatkovni okvir velikosti 1 MB s 16 stolpci:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Nato jih pretvorim v pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Zdaj bom ustvaril izhodni tok, ki bo pisal v RAM in ustvarjal StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Nato bomo zapisali 1024 kosov, kar bo na koncu pomenilo nabor podatkov 1 GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Ker smo pisali v RAM, lahko dobimo celoten tok v enem medpomnilniku:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Ker so ti podatki v pomnilniku, je branje paketov zapisov Arrow operacija brez kopiranja. Odprem StreamReader, preberem podatke pyarrow.Table
in jih nato pretvorite v DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Vse to je seveda dobro, a morda imate vprašanja. Kako hitro se to zgodi? Kako velikost kosa vpliva na zmogljivost iskanja pandas DataFrame?
Pretočna zmogljivost
Ko se velikost pretočnega kosa zmanjšuje, se stroški rekonstrukcije sosednjega stolpičnega DataFrame v pandah povečujejo zaradi neučinkovitih vzorcev dostopa do predpomnilnika. Delo s podatkovnimi strukturami in nizi C++ ter njihovimi pomnilniškimi medpomnilniki povzroča tudi nekaj dodatnih stroškov.
Za 1 MB, kot zgoraj, na mojem prenosniku (štirijedrni Xeon E3-1505M) se izkaže:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Izkazalo se je, da je efektivna prepustnost 7.75 GB/s za obnovitev 1GB DataFrame iz 1024 kosov 1MB. Kaj se zgodi, če uporabimo večje ali manjše kose? To so rezultati:
Zmogljivost znatno pade od 256K do 64K kosov. Presenetilo me je, da so bili 1 MB kosi obdelani hitreje kot 16 MB kosi. Vredno je opraviti temeljitejšo študijo in razumeti, ali je to normalna porazdelitev ali je v igri kaj drugega.
Pri trenutni izvedbi formata podatki načeloma niso stisnjeni, zato je velikost v pomnilniku in »v žicah« približno enaka. V prihodnosti lahko kompresija postane dodatna možnost.
Skupaj
Pretakanje stolpčnih podatkov je lahko učinkovit način za vnos velikih podatkovnih nizov v stolpčna analitična orodja, kot so pande v majhnih kosih. Podatkovne storitve, ki uporabljajo vrstično usmerjeno shranjevanje, lahko prenašajo in prenašajo majhne koščke podatkov, ki so primernejši za predpomnilnik L2 in L3 vašega procesorja.
Polna koda
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Vir: www.habr.com