Pretakanje stolpčnih podatkov s puščico Apache

Prevod članka je bil pripravljen posebej za študente tečaja Podatkovni inženir.

Pretakanje stolpčnih podatkov s puščico Apache

V zadnjih nekaj tednih smo Nong Li dodano Puščica Apache binarni pretočni format, ki dopolnjuje obstoječi format datoteke z naključnim dostopom/IPC. Imamo izvedbe Java in C++ ter vezave Python. V tem članku bom razložil, kako format deluje, in pokazal, kako lahko dosežete zelo visoko prepustnost podatkov za pandas DataFrame.

Pretakanje podatkov stolpca

Pogosto vprašanje, ki ga prejmem od uporabnikov Arrowa, je visok strošek selitve velikih nizov tabelaričnih podatkov iz formata, usmerjenega v vrstico ali zapis, v format, usmerjen v stolpec. Pri naborih podatkov z več gigabajti je lahko prenos v pomnilnik ali na disk težka naloga.

Za pretakanje podatkov, ne glede na to, ali so izvorni podatki vrstica ali stolpec, je ena možnost pošiljanje majhnih skupin vrstic, od katerih vsaka vsebuje stolpično postavitev.

V Apache Arrow se zbirka nizov stolpcev v pomnilniku, ki predstavlja kos tabele, imenuje paket zapisov. Za predstavitev ene same podatkovne strukture logične tabele je mogoče zbrati več paketov zapisov.

V obstoječem formatu datoteke "naključni dostop" beležimo metapodatke, ki vsebujejo shemo tabele in postavitev bloka na koncu datoteke, kar vam omogoča izjemno poceni izbiro katere koli serije zapisov ali katerega koli stolpca iz nabora podatkov. V pretočnem formatu pošljemo niz sporočil: oris in nato enega ali več sklopov zapisov.

Različni formati izgledajo nekako takole:

Pretakanje stolpčnih podatkov s puščico Apache

Pretakanje podatkov v PyArrow: Aplikacija

Da bi vam pokazal, kako to deluje, bom ustvaril primer nabora podatkov, ki predstavlja en kos toka:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Zdaj pa recimo, da želimo zapisati 1 GB podatkov, sestavljenih iz kosov po 1 MB, kar je skupno 1024 kosov. Za začetek ustvarimo prvi podatkovni okvir velikosti 1 MB s 16 stolpci:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Nato jih pretvorim v pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Zdaj bom ustvaril izhodni tok, ki bo pisal v RAM in ustvarjal StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Nato bomo zapisali 1024 kosov, kar bo na koncu pomenilo nabor podatkov 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Ker smo pisali v RAM, lahko dobimo celoten tok v enem medpomnilniku:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Ker so ti podatki v pomnilniku, je branje paketov zapisov Arrow operacija brez kopiranja. Odprem StreamReader, preberem podatke pyarrow.Tablein jih nato pretvorite v DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Vse to je seveda dobro, a morda imate vprašanja. Kako hitro se to zgodi? Kako velikost kosa vpliva na zmogljivost iskanja pandas DataFrame?

Pretočna zmogljivost

Ko se velikost pretočnega kosa zmanjšuje, se stroški rekonstrukcije sosednjega stolpičnega DataFrame v pandah povečujejo zaradi neučinkovitih vzorcev dostopa do predpomnilnika. Delo s podatkovnimi strukturami in nizi C++ ter njihovimi pomnilniškimi medpomnilniki povzroča tudi nekaj dodatnih stroškov.

Za 1 MB, kot zgoraj, na mojem prenosniku (štirijedrni Xeon E3-1505M) se izkaže:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Izkazalo se je, da je efektivna prepustnost 7.75 GB/s za obnovitev 1GB DataFrame iz 1024 kosov 1MB. Kaj se zgodi, če uporabimo večje ali manjše kose? To so rezultati:

Pretakanje stolpčnih podatkov s puščico Apache

Zmogljivost znatno pade od 256K do 64K kosov. Presenetilo me je, da so bili 1 MB kosi obdelani hitreje kot 16 MB kosi. Vredno je opraviti temeljitejšo študijo in razumeti, ali je to normalna porazdelitev ali je v igri kaj drugega.

Pri trenutni izvedbi formata podatki načeloma niso stisnjeni, zato je velikost v pomnilniku in »v žicah« približno enaka. V prihodnosti lahko kompresija postane dodatna možnost.

Skupaj

Pretakanje stolpčnih podatkov je lahko učinkovit način za vnos velikih podatkovnih nizov v stolpčna analitična orodja, kot so pande v majhnih kosih. Podatkovne storitve, ki uporabljajo vrstično usmerjeno shranjevanje, lahko prenašajo in prenašajo majhne koščke podatkov, ki so primernejši za predpomnilnik L2 in L3 vašega procesorja.

Polna koda

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Vir: www.habr.com

Dodaj komentar