Streamovanie údajov stĺpcov pomocou Apache Arrow

Preklad článku bol pripravený špeciálne pre študentov kurzu dátový inžinier.

Streamovanie údajov stĺpcov pomocou Apache Arrow

Za posledných pár týždňov máme Nong Li pridané do Apache Arrow binárny formát streamovania, ktorý dopĺňa existujúci formát súborov s náhodným prístupom/IPC. Máme implementácie Java a C++ a väzby Pythonu. V tomto článku vysvetlím, ako formát funguje, a ukážem, ako môžete dosiahnuť veľmi vysokú dátovú priepustnosť pre pandas DataFrame.

Streamovanie údajov stĺpca

Častou otázkou, ktorú dostávam od používateľov Arrow, sú vysoké náklady na migráciu veľkých súborov tabuľkových údajov z formátu orientovaného na riadky alebo záznamy do formátu orientovaného na stĺpce. Pre viacgigabajtové súbory údajov môže byť transpozícia v pamäti alebo na disku zdrvujúcou úlohou.

Ak chcete streamovať údaje, či už sú zdrojové údaje riadkové alebo stĺpcové, jednou z možností je odoslať malé dávky riadkov, z ktorých každý obsahuje stĺpcové rozloženie.

V Apache Arrow sa kolekcia polí stĺpcov v pamäti reprezentujúcich časť tabuľky nazýva záznamová dávka. Na reprezentáciu jednej dátovej štruktúry logickej tabuľky je možné zhromaždiť niekoľko dávok záznamov.

V existujúcom formáte súboru s „náhodným prístupom“ zaznamenávame metadáta obsahujúce schému tabuľky a umiestnenie blokov na konci súboru, čo vám umožňuje mimoriadne lacno vybrať ľubovoľnú dávku záznamov alebo ľubovoľný stĺpec z množiny údajov. Vo formáte streamingu posielame sériu správ: prehľad a potom jednu alebo viac dávok záznamov.

Rôzne formáty vyzerajú asi takto:

Streamovanie údajov stĺpcov pomocou Apache Arrow

Streamovanie údajov v PyArrow: Aplikácia

Aby som vám ukázal, ako to funguje, vytvorím vzorovú množinu údajov predstavujúcu jeden kus streamu:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Teraz povedzme, že chceme zapísať 1 GB dát, pozostávajúcich z kúskov po 1 MB, teda spolu 1024 kúskov. Na začiatok vytvorte prvý dátový rámec s veľkosťou 1 MB so 16 stĺpcami:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Potom ich prevediem na pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Teraz vytvorím výstupný prúd, ktorý bude zapisovať do RAM a vytvárať StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Potom napíšeme 1024 kusov, čo bude v konečnom dôsledku predstavovať 1 GB dátový súbor:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Keďže sme zapisovali do RAM, môžeme získať celý stream v jednej vyrovnávacej pamäti:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Keďže tieto údaje sú v pamäti, čítanie dávok záznamov Arrow je operácia s nulovým kopírovaním. Otvorím StreamReader, načítam dáta pyarrow.Tablea potom ich skonvertujte na DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

To všetko je, samozrejme, dobré, ale môžete mať otázky. Ako rýchlo sa to stane? Ako veľkosť časti ovplyvňuje výkon získavania dátových rámcov pandas?

Výkon streamovania

Keď sa veľkosť streamovaného bloku znižuje, náklady na rekonštrukciu súvislého stĺpcového dátového rámca v pandách sa zvyšujú v dôsledku neefektívnych vzorov prístupu do vyrovnávacej pamäte. Existuje tiež určitá réžia pri práci s dátovými štruktúrami a poliami C++ a ich vyrovnávacími pamäťami.

Pre 1 MB, ako je uvedené vyššie, na mojom notebooku (štvorjadrový Xeon E3-1505M) to vyzerá:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ukázalo sa, že efektívna priepustnosť je 7.75 GB/s na obnovenie 1GB DataFrame z 1024 1MB kúskov. Čo sa stane, ak použijeme väčšie alebo menšie kúsky? Toto sú výsledky:

Streamovanie údajov stĺpcov pomocou Apache Arrow

Výkon výrazne klesá z 256 64 na 1 16 kusov. Prekvapilo ma, že XNUMX MB časti boli spracované rýchlejšie ako XNUMX MB časti. Stojí za to vykonať dôkladnejšiu štúdiu a pochopiť, či ide o normálne rozdelenie alebo či je v hre niečo iné.

V súčasnej implementácii formátu nie sú údaje v zásade komprimované, takže veľkosť v pamäti a „v drôtoch“ je približne rovnaká. V budúcnosti sa kompresia môže stať ďalšou možnosťou.

Celkový

Streamovanie stĺpcových údajov môže byť efektívnym spôsobom, ako vložiť veľké súbory údajov do nástrojov na stĺpcovú analýzu, ako sú napríklad pandy, v malých kúskoch. Dátové služby, ktoré používajú riadkovo orientované úložisko, môžu prenášať a transponovať malé časti údajov, ktoré sú vhodnejšie pre vyrovnávacie pamäte L2 a L3 vášho procesora.

Úplný kód

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Zdroj: hab.com

Pridať komentár