Streaming data kolom nganggo Apache Arrow

Terjemahan artikel disiapake khusus kanggo siswa kursus kasebut Data Engineer.

Streaming data kolom nganggo Apache Arrow

Sajrone sawetara minggu kepungkur, kita Nong Li ditambahake menyang Apache Arrow format stream binar, nambah akses acak sing wis ana / format file IPC. Kita duwe Java lan C ++ implementasine lan Python bindings. Ing artikel iki, aku bakal nerangake carane format bisa dianggo lan nuduhake carane sampeyan bisa entuk throughput data dhuwur banget kanggo panda DataFrame.

Streaming Data Kolom

Pitakonan umum sing daktampa saka pangguna Arrow yaiku biaya dhuwur kanggo migrasi set data tabular sing gedhe saka format berorientasi baris utawa rekaman menyang format berorientasi kolom. Kanggo dataset multi-gigabyte, transposing ing memori utawa ing disk bisa dadi tugas sing akeh banget.

Kanggo stream data, manawa data sumber baris utawa kolom, siji pilihan kanggo ngirim kumpulan cilik saka larik, saben ngemot tata letak columnar nang.

Ing Apache Arrow, kumpulan array kolom ing memori sing makili potongan tabel diarani kumpulan rekaman. Kanggo makili struktur data siji saka tabel logis, sawetara kumpulan cathetan bisa diklumpukake.

Ing format file "akses acak" sing wis ana, kita ngrekam metadata sing ngemot skema tabel lan lokasi blok ing mburi file, ngidini sampeyan milih kanthi murah wae kumpulan rekaman utawa kolom saka kumpulan data. Ing format streaming, kita ngirim seri pesen: outline, banjur siji utawa luwih kumpulan rekaman.

Format sing beda katon kaya gambar iki:

Streaming data kolom nganggo Apache Arrow

Streaming data ing PyArrow: aplikasi

Kanggo nuduhake sampeyan cara kerjane, aku bakal nggawe conto dataset sing makili potongan stream siji:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Saiki, ayo ngomong yen kita pengin nulis 1 GB data, sing dumadi saka potongan 1 MB saben, kanthi total 1024 potongan. Kanggo miwiti, ayo nggawe pigura data 1 MB pisanan kanthi 16 kolom:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Banjur aku ngowahi menyang pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Saiki aku bakal nggawe stream output sing bakal nulis menyang RAM lan nggawe StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Banjur kita bakal nulis potongan 1024, sing pungkasane bakal dadi set data 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Awit kita nulis ing RAM, kita bisa entuk kabeh stream ing siji buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Wiwit data iki ana ing memori, maca kumpulan rekaman panah minangka operasi nul salinan. Aku mbukak StreamReader, maca data menyang pyarrow.Table, banjur ngowahi dadi DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Kabeh iki, mesthi, apik, nanging sampeyan bisa uga duwe pitakonan. Sepira cepet kedadeyan iki? Kepiye ukuran potongan mengaruhi kinerja pengambilan DataFrame panda?

Kinerja streaming

Nalika ukuran potongan streaming suda, biaya mbangun maneh DataFrame kolom sing cedhak ing panda mundhak amarga pola akses cache sing ora efisien. Ana uga sawetara overhead saka nggarap C ++ struktur data lan susunan lan buffer memori sing.

Kanggo 1 MB, kaya ing ndhuwur, ing laptopku (Quad-core Xeon E3-1505M) ternyata:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Pranyata metu sing throughput efektif 7.75 GB / s kanggo mulihake 1GB DataFrame saka 1024 1MB potongan. Apa sing kedadeyan yen kita nggunakake potongan sing luwih gedhe utawa luwih cilik? Iki minangka asil:

Streaming data kolom nganggo Apache Arrow

Kinerja mudhun banget saka 256K dadi potongan 64K. Aku kaget yen potongan 1MB diproses luwih cepet tinimbang potongan 16MB. Iku worth nganakake sinau luwih pepek lan ngerti apa iki distribusi normal utawa apa ana liyane ing muter.

Ing implementasine format saiki, data ora dikompres ing prinsip, supaya ukuran ing memori lan "ing kabel" kira-kira padha. Ing mangsa ngarep, kompresi bisa dadi pilihan tambahan.

Asile

Streaming data columnar bisa dadi cara sing efektif kanggo feed set data gedhe menyang alat analytics columnar kaya panda ing potongan cilik. Layanan data sing nggunakake panyimpenan berorientasi baris bisa nransfer lan transpose potongan cilik data sing luwih trep kanggo cache L2 lan L3 prosesor sampeyan.

Kode lengkap

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Add a comment