Menstrim data lajur dengan Apache Arrow

Terjemahan artikel disediakan khusus untuk pelajar kursus tersebut Jurutera Data.

Menstrim data lajur dengan Apache Arrow

Sejak beberapa minggu lepas, kami Nong Li ditambah kepada Anak panah Apache format aliran binari, menambah akses rawak/format fail IPC yang sedia ada. Kami mempunyai pelaksanaan Java dan C++ dan pengikatan Python. Dalam artikel ini, saya akan menerangkan cara format berfungsi dan menunjukkan cara anda boleh mencapai daya pemprosesan data yang sangat tinggi untuk DataFrame panda.

Menstrim data lajur

Soalan biasa yang saya dapat daripada pengguna Arrow ialah kos tinggi untuk memindahkan set data jadual besar daripada format berorientasikan baris atau rekod kepada format lajur. Untuk set data berbilang gigabait, pemindahan dalam memori atau pada cakera boleh menjadi sangat menggembirakan.

Untuk penstriman data, sama ada data sumber ialah baris atau lajur, satu pilihan ialah menghantar kumpulan kecil baris, setiap satu mengandungi reka letak lajur secara dalaman.

Dalam Apache Arrow, koleksi tatasusunan lajur dalam memori yang mewakili bongkah jadual dipanggil kumpulan rekod. Untuk mewakili struktur data tunggal jadual logik, anda boleh mengumpul beberapa set rekod.

Dalam format fail "akses rawak" sedia ada, kami menulis metadata yang mengandungi skema jadual dan reka letak blok pada penghujung fail, yang membolehkan anda memilih mana-mana kumpulan rekod atau mana-mana lajur daripada set data dengan sangat murah. Dalam format penstriman, kami menghantar satu siri mesej: skema, dan kemudian satu atau lebih kumpulan rekod.

Format yang berbeza kelihatan seperti gambar ini:

Menstrim data lajur dengan Apache Arrow

Menstrim data dalam PyArrow: aplikasi

Untuk menunjukkan kepada anda cara ia berfungsi, saya akan membuat set data contoh yang mewakili satu bahagian aliran:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Sekarang, katakan kita ingin menulis 1 GB data, yang terdiri daripada 1 MB ketulan setiap satu, untuk sejumlah 1024 ketulan. Mula-mula, mari buat bingkai data 1MB pertama dengan 16 lajur:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Kemudian saya menukarkannya kepada pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Sekarang saya akan mencipta aliran keluaran yang akan menulis ke RAM dan mencipta StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Kemudian kami akan menulis 1024 ketulan, yang akhirnya akan membentuk set data 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Oleh kerana kami menulis dalam RAM, kami boleh mendapatkan keseluruhan aliran dalam satu penimbal:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Oleh kerana data ini berada dalam ingatan, kumpulan bacaan rekod Anak Panah diperoleh melalui operasi salinan sifar. Saya membuka StreamReader, membaca data ke dalam pyarrow.Tabledan kemudian menukarnya kepada DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Semua ini, sudah tentu, bagus, tetapi anda mungkin mempunyai soalan. Seberapa pantas ia berlaku? Bagaimanakah saiz ketulan mempengaruhi prestasi pengambilan DataFrame panda?

Prestasi penstriman

Apabila saiz bahagian penstriman berkurangan, kos untuk membina semula DataFrame kolumnar bersebelahan dalam panda meningkat disebabkan oleh skim capaian cache yang tidak cekap. Terdapat juga beberapa overhed daripada bekerja dengan struktur dan tatasusunan data C++ serta penimbal memorinya.

Untuk 1 MB seperti di atas, pada komputer riba saya (Quad-core Xeon E3-1505M) ternyata:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ternyata daya pemprosesan berkesan ialah 7.75 Gb / s untuk memulihkan 1 GB DataFrame daripada 1024 1 MB ketulan. Apa yang berlaku jika kita menggunakan ketulan yang lebih besar atau lebih kecil? Berikut adalah hasil yang anda perolehi:

Menstrim data lajur dengan Apache Arrow

Prestasi menurun dengan ketara daripada 256K kepada 64K bahagian. Saya terkejut bahawa ketulan 1MB diproses lebih cepat daripada ketulan 16MB. Perlu membuat kajian yang lebih teliti dan memahami sama ada ini adalah taburan normal atau sesuatu yang lain terlibat.

Dalam pelaksanaan format semasa, data tidak dimampatkan pada dasarnya, jadi saiz dalam memori dan "pada wayar" adalah lebih kurang sama. Pemampatan mungkin menjadi pilihan pada masa hadapan.

Jumlah

Menstrim data lajur boleh menjadi cara yang cekap untuk memindahkan set data besar ke alat analitis lajur seperti panda dalam ketulan kecil. Perkhidmatan data yang menggunakan storan berorientasikan baris boleh memindahkan dan memindahkan sebahagian kecil data yang lebih mudah untuk cache L2 dan L3 pemproses anda.

Kod penuh

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Sumber: www.habr.com

Tambah komen