Ngalirkeun data kolom nganggo Apache Arrow

Tarjamahan artikel ieu disiapkeun husus pikeun siswa tangtu "Insinyur Data".

Ngalirkeun data kolom nganggo Apache Arrow

Dina sababaraha minggu katukang urang gaduh Nong Li ditambahkeun kana Panah Apache format streaming binér, ngalengkepan aksés acak / format file IPC aya. Kami gaduh Java sareng C ++ palaksanaan sareng beungkeutan Python. Dina tulisan ieu, kuring bakal ngajelaskeun kumaha formatna jalan sareng nunjukkeun kumaha anjeun tiasa ngahontal throughput data anu luhur pisan pikeun DataFrame panda.

Streaming Data Kolom

Patarosan umum anu kuring tampi ti pangguna Panah nyaéta biaya anu luhur pikeun migrasi set data tabular anu ageung tina baris- atanapi format berorientasi catetan ka format berorientasi kolom. Pikeun datasets multi-gigabyte, transposing dina mémori atawa dina disk tiasa tugas overwhelming.

Pikeun ngalirkeun data, naha data sumber baris atawa kolom, hiji pilihan pikeun ngirim bets leutik baris, unggal ngandung hiji perenah columnar jero.

Dina Apache Arrow, kumpulan susunan kolom dina mémori ngalambangkeun sakumpulan méja disebut kumpulan rékaman. Pikeun ngagambarkeun struktur data tunggal tabel logis, sababaraha bets rékaman bisa dikumpulkeun.

Dina format file "aksés acak" anu tos aya, kami ngarékam metadata anu ngandung skéma méja sareng perenah blok di tungtung file, ngamungkinkeun anjeun langkung murah milih bets rékaman atanapi kolom naon waé tina set data. Dina format streaming, urang ngirim runtuyan pesen: hiji outline, lajeng hiji atawa leuwih bets rékaman.

Format anu béda katingali sapertos kieu:

Ngalirkeun data kolom nganggo Apache Arrow

Streaming Data dina PyArrow: Aplikasi

Pikeun nunjukkeun anjeun kumaha ieu jalan, kuring bakal nyiptakeun conto set data anu ngagambarkeun sakumpulan aliran tunggal:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Ayeuna, anggap urang hoyong nyerat 1 GB data, diwangun ku sakumpulan 1 MB masing-masing, jumlahna aya 1024 sakumpulan. Pikeun ngamimitian, hayu urang nyieun pigura data 1 MB munggaran kalayan 16 kolom:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Lajeng abdi ngarobah aranjeunna ka pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Ayeuna kuring bakal nyiptakeun aliran kaluaran anu bakal nyerat kana RAM sareng nyiptakeun StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Teras urang bakal nyerat 1024 sakumpulan, anu pamustunganana jumlahna 1GB set data:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Kusabab urang nyerat ka RAM, urang tiasa nampi sadaya aliran dina hiji panyangga:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Kusabab data ieu aya dina mémori, maca bets rékaman Panah nyaéta operasi enol-salinan. Kuring muka StreamReader, maca data dina pyarrow.Table, lajeng ngarobahna kana DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Sadaya ieu, tangtosna, saé, tapi anjeun tiasa gaduh patarosan. Kumaha gancangna ieu kajadian? Kumaha ukuran chunk mangaruhan kinerja dimeunangkeun DataFrame pandas?

Performance Streaming

Nalika ukuran chunk streaming turun, biaya ngarekonstruksikeun DataFrame kolumnar anu padeukeut dina pandas naek kusabab pola aksés cache anu teu efisien. Aya ogé sababaraha overhead ti gawé bareng C ++ struktur data jeung arrays sarta panyangga memori maranéhna.

Pikeun 1 MB, sapertos di luhur, dina laptop kuring (Quad-core Xeon E3-1505M) tétéla:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Tétéla yén throughput éféktif mangrupa 7.75 GB / s pikeun mulangkeun 1GB DataFrame tina 1024 1MB sakumpulan. Naon anu bakal kajadian upami urang nganggo potongan anu langkung ageung atanapi langkung alit? Ieu hasilna:

Ngalirkeun data kolom nganggo Apache Arrow

Kinerja turun sacara signifikan tina 256K ka 64K sakumpulan. Kuring reuwas yén 1 MB sakumpulan diolah leuwih gancang ti 16 MB sakumpulan. Eta sia ngalakonan ulikan leuwih teleb tur ngartos naha ieu téh sebaran normal atawa naha aya hal sejenna dina maén.

Dina palaksanaan format ayeuna, data henteu dikomprés prinsipna, jadi ukuran dina mémori jeung "dina kawat" kira sarua. Di hareup, komprési tiasa janten pilihan tambahan.

hasil

Streaming data columnar tiasa cara éféktif pikeun nyoco set data badag kana parabot analytics columnar kawas pandas dina sakumpulan leutik. Ladenan data anu ngagunakeun panyimpenan berorientasi baris tiasa nransper sareng ngalihkeun sakumpulan leutik data anu langkung merenah pikeun cache L2 sareng L3 prosesor anjeun.

Kode lengkep

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

sumber: www.habr.com

Tambahkeun komentar