Streaming podataka kolone pomoću Apache Arrow

Prijevod članka pripremljen je posebno za studente predmeta Data Engineer.

Streaming podataka kolone pomoću Apache Arrow

U proteklih nekoliko sedmica jesmo Nong Li dodano Apache Arrow binarni format strimovanja, koji dopunjuje postojeći format datoteke sa slučajnim pristupom/IPC. Imamo Java i C++ implementacije i Python veze. U ovom članku ću objasniti kako format funkcionira i pokazati kako možete postići vrlo visoku propusnost podataka za pandas DataFrame.

Streaming podataka kolone

Uobičajeno pitanje koje dobijam od korisnika Arrow-a je visoka cena migracije velikih skupova tabelarnih podataka iz formata orijentisanog na redove ili zapise u format orijentisan na kolone. Za skupove podataka od više gigabajta, transponovanje u memoriju ili na disk može biti ogroman zadatak.

Da biste strimovali podatke, bilo da su izvorni podaci red ili kolona, ​​jedna od opcija je slanje malih serija redova, od kojih svaki sadrži kolonarski raspored.

U Apache Arrow-u, kolekcija nizova stupaca u memoriji koji predstavljaju dio tablice naziva se skup zapisa. Za predstavljanje jedne strukture podataka logičke tabele, može se prikupiti nekoliko serija zapisa.

U postojećem formatu datoteke "slučajnog pristupa" snimamo metapodatke koji sadrže shemu tablice i lokacije blokova na kraju datoteke, što vam omogućava da izuzetno jeftino odaberete bilo koju grupu zapisa ili bilo koju kolonu iz skupa podataka. U streaming formatu šaljemo niz poruka: nacrt, a zatim jednu ili više serija zapisa.

Različiti formati izgledaju otprilike ovako:

Streaming podataka kolone pomoću Apache Arrow

Streaming podataka u PyArrow: Aplikacija

Da vam pokažem kako ovo funkcionira, kreirat ću primjer skupa podataka koji predstavlja jedan komad toka:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Sada, recimo da želimo da zapišemo 1 GB podataka, koji se sastoje od komada od 1 MB svaki, za ukupno 1024 komada. Za početak, napravimo prvi okvir podataka od 1 MB sa 16 kolona:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Onda ih pretvaram u pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Sada ću kreirati izlazni tok koji će pisati u RAM i kreirati StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Zatim ćemo napisati 1024 komada, što će na kraju iznositi skup podataka od 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Pošto smo pisali u RAM, možemo dobiti cijeli stream u jednom baferu:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Pošto su ovi podaci u memoriji, čitanje serija zapisa strelica je operacija nulte kopije. Otvaram StreamReader, čitam podatke pyarrow.Tablea zatim ih pretvoriti u DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Sve je ovo, naravno, dobro, ali možda imate pitanja. Koliko brzo se to dešava? Kako veličina komada utiče na performanse preuzimanja pandas DataFrame-a?

Streaming Performance

Kako se veličina streaming komada smanjuje, cijena rekonstrukcije kontinuiranog stupnog DataFrame-a u pandas raste zbog neefikasnih obrazaca pristupa kešu. Također postoje određeni troškovi rada sa C++ strukturama podataka i nizovima i njihovim memorijskim baferima.

Za 1 MB, kao gore, na mom laptopu (quad-core Xeon E3-1505M) ispada:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ispostavilo se da je efektivna propusnost 7.75 GB/s za vraćanje 1GB DataFrame-a iz 1024 komada od 1MB. Šta se događa ako koristimo veće ili manje komade? Ovo su rezultati:

Streaming podataka kolone pomoću Apache Arrow

Performanse značajno opadaju sa 256K na 64K komada. Iznenadilo me je da su dijelovi od 1 MB obrađeni brže od dijelova od 16 MB. Vrijedno je provesti detaljniju studiju i razumjeti da li je ovo normalna distribucija ili je nešto drugo u igri.

U trenutnoj implementaciji formata, podaci se u principu ne komprimiraju, pa je veličina u memoriji i "u žicama" približno ista. U budućnosti, kompresija može postati dodatna opcija.

Rezultat

Streaming podataka u kolonama može biti efikasan način za unos velikih skupova podataka u alate za kolumnu analitiku kao što su pande u malim komadima. Usluge podataka koje koriste pohranu orijentiranu na redove mogu prenijeti i transponirati male komade podataka koji su pogodniji za L2 i L3 keš memorije vašeg procesora.

Pun kod

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

izvor: www.habr.com

Dodajte komentar