Terjemahan artikel disiapkan khusus untuk mahasiswa kursus tersebut
Selama beberapa minggu terakhir, kami
Data kolom streaming
Pertanyaan umum yang saya terima dari pengguna Arrow adalah tingginya biaya migrasi kumpulan data tabular dalam jumlah besar dari format berorientasi baris atau rekaman ke format berorientasi kolom. Untuk kumpulan data multi-gigabyte, melakukan transposisi di memori atau disk bisa menjadi tugas yang berat.
Untuk data streaming, baik data sumber berupa baris atau kolom, salah satu opsinya adalah mengirim sejumlah kecil baris, masing-masing berisi tata letak kolom secara internal.
Di Apache Arrow, kumpulan array kolom dalam memori yang mewakili potongan tabel disebut kumpulan rekaman. Untuk mewakili struktur data tunggal dari tabel logis, Anda dapat mengumpulkan beberapa paket rekaman.
Dalam format file "akses acak" yang ada, kami menulis metadata yang berisi skema tabel dan tata letak blok di akhir file, yang memungkinkan Anda memilih kumpulan catatan atau kolom apa pun dari kumpulan data dengan sangat murah. Dalam format streaming, kami mengirimkan serangkaian pesan: skema, dan kemudian satu atau lebih kumpulan rekaman.
Format yang berbeda terlihat seperti gambar ini:
Streaming data di PyArrow: aplikasi
Untuk menunjukkan cara kerjanya, saya akan membuat contoh kumpulan data yang mewakili satu potongan aliran:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Sekarang, katakanlah kita ingin menulis 1 GB data, yang terdiri dari potongan masing-masing 1 MB, dengan total 1024 potongan. Pertama, mari buat bingkai data 1MB pertama dengan 16 kolom:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Lalu saya mengonversinya menjadi pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Sekarang saya akan membuat aliran keluaran yang akan menulis ke RAM dan membuat StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Kemudian kita akan menulis 1024 potongan, yang pada akhirnya akan menjadi kumpulan data 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Karena kita menulis dalam RAM, kita bisa mendapatkan seluruh aliran dalam satu buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Karena data ini ada di memori, membaca kumpulan rekaman Arrow adalah operasi tanpa penyalinan. Saya membuka StreamReader, membaca data ke dalamnya pyarrow.Table
, lalu konversikan menjadi DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Semua ini, tentu saja, bagus, tetapi Anda mungkin memiliki pertanyaan. Seberapa cepat hal ini terjadi? Bagaimana ukuran potongan memengaruhi kinerja pengambilan DataFrame panda?
Performa streaming
Ketika ukuran potongan streaming berkurang, biaya rekonstruksi DataFrame kolom yang berdekatan di pandas meningkat karena skema akses cache yang tidak efisien. Ada juga beberapa overhead saat bekerja dengan struktur dan array data C++ serta buffer memorinya.
Untuk 1 MB seperti diatas, di laptop saya (Quad-core Xeon E3-1505M) ternyata:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Ternyata throughput efektifnya adalah 7.75 Gb/s untuk memulihkan DataFrame 1 GB dari 1024 potongan 1 MB. Apa yang terjadi jika kita menggunakan potongan yang lebih besar atau lebih kecil? Berikut hasil yang Anda dapatkan:
Performa turun secara signifikan dari 256 ribu menjadi 64 ribu. Saya terkejut karena potongan 1MB diproses lebih cepat daripada potongan 16MB. Ada baiknya melakukan studi yang lebih menyeluruh dan memahami apakah ini merupakan distribusi normal atau ada hal lain yang terlibat.
Dalam implementasi format saat ini, pada prinsipnya data tidak dikompresi, sehingga ukuran di memori dan "on the wire" kira-kira sama. Kompresi mungkin bisa menjadi pilihan di masa depan.
Total
Streaming data kolom dapat menjadi cara yang efisien untuk mentransfer kumpulan data besar ke alat analisis kolom seperti panda dalam jumlah kecil. Layanan data yang menggunakan penyimpanan berorientasi baris dapat mentransfer dan mengubah urutan potongan kecil data yang lebih sesuai untuk cache L2 dan L3 prosesor Anda.
Kode lengkap
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Sumber: www.habr.com