Streaming data kolom dengan Apache Arrow

Terjemahan artikel disiapkan khusus untuk mahasiswa kursus tersebut Insinyur Data.

Streaming data kolom dengan Apache Arrow

Selama beberapa minggu terakhir, kami Nong Li ditambahkan ke Apache Panah format aliran biner, melengkapi format file akses acak/IPC yang sudah ada. Kami memiliki implementasi Java dan C++ serta binding Python. Pada artikel ini, saya akan menjelaskan cara kerja format dan menunjukkan bagaimana Anda dapat mencapai throughput data yang sangat tinggi untuk pandas DataFrame.

Data kolom streaming

Pertanyaan umum yang saya terima dari pengguna Arrow adalah tingginya biaya migrasi kumpulan data tabular dalam jumlah besar dari format berorientasi baris atau rekaman ke format berorientasi kolom. Untuk kumpulan data multi-gigabyte, melakukan transposisi di memori atau disk bisa menjadi tugas yang berat.

Untuk data streaming, baik data sumber berupa baris atau kolom, salah satu opsinya adalah mengirim sejumlah kecil baris, masing-masing berisi tata letak kolom secara internal.

Di Apache Arrow, kumpulan array kolom dalam memori yang mewakili potongan tabel disebut kumpulan rekaman. Untuk mewakili struktur data tunggal dari tabel logis, Anda dapat mengumpulkan beberapa paket rekaman.

Dalam format file "akses acak" yang ada, kami menulis metadata yang berisi skema tabel dan tata letak blok di akhir file, yang memungkinkan Anda memilih kumpulan catatan atau kolom apa pun dari kumpulan data dengan sangat murah. Dalam format streaming, kami mengirimkan serangkaian pesan: skema, dan kemudian satu atau lebih kumpulan rekaman.

Format yang berbeda terlihat seperti gambar ini:

Streaming data kolom dengan Apache Arrow

Streaming data di PyArrow: aplikasi

Untuk menunjukkan cara kerjanya, saya akan membuat contoh kumpulan data yang mewakili satu potongan aliran:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Sekarang, katakanlah kita ingin menulis 1 GB data, yang terdiri dari potongan masing-masing 1 MB, dengan total 1024 potongan. Pertama, mari buat bingkai data 1MB pertama dengan 16 kolom:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Lalu saya mengonversinya menjadi pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Sekarang saya akan membuat aliran keluaran yang akan menulis ke RAM dan membuat StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Kemudian kita akan menulis 1024 potongan, yang pada akhirnya akan menjadi kumpulan data 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Karena kita menulis dalam RAM, kita bisa mendapatkan seluruh aliran dalam satu buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Karena data ini ada di memori, membaca kumpulan rekaman Arrow adalah operasi tanpa penyalinan. Saya membuka StreamReader, membaca data ke dalamnya pyarrow.Table, lalu konversikan menjadi DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Semua ini, tentu saja, bagus, tetapi Anda mungkin memiliki pertanyaan. Seberapa cepat hal ini terjadi? Bagaimana ukuran potongan memengaruhi kinerja pengambilan DataFrame panda?

Performa streaming

Ketika ukuran potongan streaming berkurang, biaya rekonstruksi DataFrame kolom yang berdekatan di pandas meningkat karena skema akses cache yang tidak efisien. Ada juga beberapa overhead saat bekerja dengan struktur dan array data C++ serta buffer memorinya.

Untuk 1 MB seperti diatas, di laptop saya (Quad-core Xeon E3-1505M) ternyata:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ternyata throughput efektifnya adalah 7.75 Gb/s untuk memulihkan DataFrame 1 GB dari 1024 potongan 1 MB. Apa yang terjadi jika kita menggunakan potongan yang lebih besar atau lebih kecil? Berikut hasil yang Anda dapatkan:

Streaming data kolom dengan Apache Arrow

Performa turun secara signifikan dari 256 ribu menjadi 64 ribu. Saya terkejut karena potongan 1MB diproses lebih cepat daripada potongan 16MB. Ada baiknya melakukan studi yang lebih menyeluruh dan memahami apakah ini merupakan distribusi normal atau ada hal lain yang terlibat.

Dalam implementasi format saat ini, pada prinsipnya data tidak dikompresi, sehingga ukuran di memori dan "on the wire" kira-kira sama. Kompresi mungkin bisa menjadi pilihan di masa depan.

Total

Streaming data kolom dapat menjadi cara yang efisien untuk mentransfer kumpulan data besar ke alat analisis kolom seperti panda dalam jumlah kecil. Layanan data yang menggunakan penyimpanan berorientasi baris dapat mentransfer dan mengubah urutan potongan kecil data yang lebih sesuai untuk cache L2 dan L3 prosesor Anda.

Kode lengkap

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Sumber: www.habr.com

Tambah komentar