Streamování dat sloupců pomocí Apache Arrow

Překlad článku byl připraven speciálně pro studenty kurzu datový inženýr.

Streamování dat sloupců pomocí Apache Arrow

Během několika posledních týdnů jsme Nong Li přidáno k Apache Arrow binární formát streamování, který doplňuje stávající formát souborů s náhodným přístupem/IPC. Máme implementace Java a C++ a vazby Pythonu. V tomto článku vysvětlím, jak formát funguje, a ukážu, jak můžete dosáhnout velmi vysoké datové propustnosti pro pandas DataFrame.

Streamování dat sloupců

Častou otázkou, kterou dostávám od uživatelů Arrow, jsou vysoké náklady na migraci velkých sad tabulkových dat z formátu orientovaného na řádky nebo záznamy do formátu orientovaného na sloupce. U vícegigabajtových datových sad může být transpozice v paměti nebo na disku ohromujícím úkolem.

Chcete-li streamovat data, ať už jsou zdrojová data řádek nebo sloupec, jednou z možností je odeslat malé dávky řádků, z nichž každý obsahuje uvnitř sloupcové rozvržení.

V Apache Arrow se kolekce polí sloupců v paměti představujících blok tabulky nazývá dávka záznamů. Pro reprezentaci jediné datové struktury logické tabulky lze shromáždit několik dávek záznamů.

Ve stávajícím formátu souboru „random access“ zaznamenáváme metadata obsahující schéma tabulky a umístění bloků na konci souboru, což vám umožňuje extrémně levně vybrat libovolnou dávku záznamů nebo libovolný sloupec z datové sady. Ve formátu streamování posíláme řadu zpráv: osnovu a poté jednu nebo více dávek záznamů.

Různé formáty vypadají asi takto:

Streamování dat sloupců pomocí Apache Arrow

Streamování dat v PyArrow: Aplikace

Abych vám ukázal, jak to funguje, vytvořím příklad datové sady představující jeden blok proudu:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Nyní řekněme, že chceme zapsat 1 GB dat, skládajících se z kousků po 1 MB, celkem tedy 1024 kousků. Pro začátek vytvořte první 1 MB datový rámec se 16 sloupci:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Pak je převedu na pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nyní vytvořím výstupní proud, který bude zapisovat do RAM a vytvářet StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Poté zapíšeme 1024 kousků, což nakonec bude činit 1GB datovou sadu:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Protože jsme zapisovali do RAM, můžeme získat celý stream v jedné vyrovnávací paměti:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Protože jsou tato data v paměti, čtení dávek záznamů Arrow je operace s nulovým kopírováním. Otevřu StreamReader, načtu data pyarrow.Tablea poté je převést na DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

To vše je samozřejmě dobré, ale můžete mít otázky. Jak rychle se to stane? Jak velikost bloku ovlivňuje výkon načítání datových rámců pandas?

Výkon streamování

Jak se velikost streamovaného bloku snižuje, náklady na rekonstrukci souvislého sloupcového DataFrame v pandách rostou kvůli neefektivním vzorcům přístupu do mezipaměti. Existuje také určitá režie práce s datovými strukturami a poli C++ a jejich vyrovnávací paměti.

Pro 1 MB, jak je uvedeno výše, na mém notebooku (čtyřjádrový Xeon E3-1505M) to vypadá:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ukazuje se, že efektivní propustnost je 7.75 GB/s pro obnovu 1GB DataFrame z 1024 1MB bloků. Co se stane, když použijeme větší nebo menší kousky? Toto jsou výsledky:

Streamování dat sloupců pomocí Apache Arrow

Výkon výrazně klesá z 256K na 64K bloků. Překvapilo mě, že 1 MB bloky byly zpracovány rychleji než 16 MB bloky. Stojí za to provést důkladnější studii a pochopit, zda se jedná o normální rozdělení, nebo zda je ve hře něco jiného.

V současné implementaci formátu nejsou data v zásadě komprimována, takže velikost v paměti a „ve drátech“ je přibližně stejná. V budoucnu se může komprese stát další možností.

Celkový

Streamování sloupcových dat může být efektivním způsobem, jak vložit velké soubory dat do nástrojů pro sloupcovou analýzu, jako jsou pandy, v malých kouscích. Datové služby, které využívají úložiště orientované na řádky, mohou přenášet a transponovat malé části dat, které jsou pohodlnější pro mezipaměti L2 a L3 vašeho procesoru.

Celý kód

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Zdroj: www.habr.com

Přidat komentář