Prijevod članka pripremljen je posebno za studente predmeta
U proteklih nekoliko sedmica jesmo
Streaming podataka kolone
Uobičajeno pitanje koje dobijam od korisnika Arrow-a je visoka cena migracije velikih skupova tabelarnih podataka iz formata orijentisanog na redove ili zapise u format orijentisan na kolone. Za skupove podataka od više gigabajta, transponovanje u memoriju ili na disk može biti ogroman zadatak.
Da biste strimovali podatke, bilo da su izvorni podaci red ili kolona, jedna od opcija je slanje malih serija redova, od kojih svaki sadrži kolonarski raspored.
U Apache Arrow-u, kolekcija nizova stupaca u memoriji koji predstavljaju dio tablice naziva se skup zapisa. Za predstavljanje jedne strukture podataka logičke tabele, može se prikupiti nekoliko serija zapisa.
U postojećem formatu datoteke "slučajnog pristupa" snimamo metapodatke koji sadrže shemu tablice i lokacije blokova na kraju datoteke, što vam omogućava da izuzetno jeftino odaberete bilo koju grupu zapisa ili bilo koju kolonu iz skupa podataka. U streaming formatu šaljemo niz poruka: nacrt, a zatim jednu ili više serija zapisa.
Različiti formati izgledaju otprilike ovako:
Streaming podataka u PyArrow: Aplikacija
Da vam pokažem kako ovo funkcionira, kreirat ću primjer skupa podataka koji predstavlja jedan komad toka:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Sada, recimo da želimo da zapišemo 1 GB podataka, koji se sastoje od komada od 1 MB svaki, za ukupno 1024 komada. Za početak, napravimo prvi okvir podataka od 1 MB sa 16 kolona:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Onda ih pretvaram u pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Sada ću kreirati izlazni tok koji će pisati u RAM i kreirati StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Zatim ćemo napisati 1024 komada, što će na kraju iznositi skup podataka od 1 GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Pošto smo pisali u RAM, možemo dobiti cijeli stream u jednom baferu:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Pošto su ovi podaci u memoriji, čitanje serija zapisa strelica je operacija nulte kopije. Otvaram StreamReader, čitam podatke pyarrow.Table
a zatim ih pretvoriti u DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Sve je ovo, naravno, dobro, ali možda imate pitanja. Koliko brzo se to dešava? Kako veličina komada utiče na performanse preuzimanja pandas DataFrame-a?
Streaming Performance
Kako se veličina streaming komada smanjuje, cijena rekonstrukcije kontinuiranog stupnog DataFrame-a u pandas raste zbog neefikasnih obrazaca pristupa kešu. Također postoje određeni troškovi rada sa C++ strukturama podataka i nizovima i njihovim memorijskim baferima.
Za 1 MB, kao gore, na mom laptopu (quad-core Xeon E3-1505M) ispada:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Ispostavilo se da je efektivna propusnost 7.75 GB/s za vraćanje 1GB DataFrame-a iz 1024 komada od 1MB. Šta se događa ako koristimo veće ili manje komade? Ovo su rezultati:
Performanse značajno opadaju sa 256K na 64K komada. Iznenadilo me je da su dijelovi od 1 MB obrađeni brže od dijelova od 16 MB. Vrijedno je provesti detaljniju studiju i razumjeti da li je ovo normalna distribucija ili je nešto drugo u igri.
U trenutnoj implementaciji formata, podaci se u principu ne komprimiraju, pa je veličina u memoriji i "u žicama" približno ista. U budućnosti, kompresija može postati dodatna opcija.
Rezultat
Streaming podataka u kolonama može biti efikasan način za unos velikih skupova podataka u alate za kolumnu analitiku kao što su pande u malim komadima. Usluge podataka koje koriste pohranu orijentiranu na redove mogu prenijeti i transponirati male komade podataka koji su pogodniji za L2 i L3 keš memorije vašeg procesora.
Pun kod
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
izvor: www.habr.com