Terjemahan artikel disiapake khusus kanggo siswa kursus kasebut
Sajrone sawetara minggu kepungkur, kita
Streaming Data Kolom
Pitakonan umum sing daktampa saka pangguna Arrow yaiku biaya dhuwur kanggo migrasi set data tabular sing gedhe saka format berorientasi baris utawa rekaman menyang format berorientasi kolom. Kanggo dataset multi-gigabyte, transposing ing memori utawa ing disk bisa dadi tugas sing akeh banget.
Kanggo stream data, manawa data sumber baris utawa kolom, siji pilihan kanggo ngirim kumpulan cilik saka larik, saben ngemot tata letak columnar nang.
Ing Apache Arrow, kumpulan array kolom ing memori sing makili potongan tabel diarani kumpulan rekaman. Kanggo makili struktur data siji saka tabel logis, sawetara kumpulan cathetan bisa diklumpukake.
Ing format file "akses acak" sing wis ana, kita ngrekam metadata sing ngemot skema tabel lan lokasi blok ing mburi file, ngidini sampeyan milih kanthi murah wae kumpulan rekaman utawa kolom saka kumpulan data. Ing format streaming, kita ngirim seri pesen: outline, banjur siji utawa luwih kumpulan rekaman.
Format sing beda katon kaya gambar iki:
Streaming data ing PyArrow: aplikasi
Kanggo nuduhake sampeyan cara kerjane, aku bakal nggawe conto dataset sing makili potongan stream siji:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Saiki, ayo ngomong yen kita pengin nulis 1 GB data, sing dumadi saka potongan 1 MB saben, kanthi total 1024 potongan. Kanggo miwiti, ayo nggawe pigura data 1 MB pisanan kanthi 16 kolom:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Banjur aku ngowahi menyang pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Saiki aku bakal nggawe stream output sing bakal nulis menyang RAM lan nggawe StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Banjur kita bakal nulis potongan 1024, sing pungkasane bakal dadi set data 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Awit kita nulis ing RAM, kita bisa entuk kabeh stream ing siji buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Wiwit data iki ana ing memori, maca kumpulan rekaman panah minangka operasi nul salinan. Aku mbukak StreamReader, maca data menyang pyarrow.Table
, banjur ngowahi dadi DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Kabeh iki, mesthi, apik, nanging sampeyan bisa uga duwe pitakonan. Sepira cepet kedadeyan iki? Kepiye ukuran potongan mengaruhi kinerja pengambilan DataFrame panda?
Kinerja streaming
Nalika ukuran potongan streaming suda, biaya mbangun maneh DataFrame kolom sing cedhak ing panda mundhak amarga pola akses cache sing ora efisien. Ana uga sawetara overhead saka nggarap C ++ struktur data lan susunan lan buffer memori sing.
Kanggo 1 MB, kaya ing ndhuwur, ing laptopku (Quad-core Xeon E3-1505M) ternyata:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Pranyata metu sing throughput efektif 7.75 GB / s kanggo mulihake 1GB DataFrame saka 1024 1MB potongan. Apa sing kedadeyan yen kita nggunakake potongan sing luwih gedhe utawa luwih cilik? Iki minangka asil:
Kinerja mudhun banget saka 256K dadi potongan 64K. Aku kaget yen potongan 1MB diproses luwih cepet tinimbang potongan 16MB. Iku worth nganakake sinau luwih pepek lan ngerti apa iki distribusi normal utawa apa ana liyane ing muter.
Ing implementasine format saiki, data ora dikompres ing prinsip, supaya ukuran ing memori lan "ing kabel" kira-kira padha. Ing mangsa ngarep, kompresi bisa dadi pilihan tambahan.
Asile
Streaming data columnar bisa dadi cara sing efektif kanggo feed set data gedhe menyang alat analytics columnar kaya panda ing potongan cilik. Layanan data sing nggunakake panyimpenan berorientasi baris bisa nransfer lan transpose potongan cilik data sing luwih trep kanggo cache L2 lan L3 prosesor sampeyan.
Kode lengkap
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com