Yekufambisa mbiru data neApache Arrow

Kuturikirwa kwechinyorwa kwakagadzirirwa zvakananga vadzidzi vekosi "Data Engineer".

Yekufambisa mbiru data neApache Arrow

Mumavhiki mashoma apfuura tine Nong Li akawedzera ku Apache Arrow bhinari yekufambisa fomati, inopindirana neiyo iripo isina kujairika yekuwana / IPC faira fomati. Isu tine Java neC ++ kuita uye Python bindings. Muchinyorwa chino, ini ndichatsanangura kuti iyo fomati inoshanda sei uye ndoratidza maitiro aungaita yakakwira kwazvo data kuburikidza nepandas DataFrame.

Kutenderera Column Data

Mubvunzo wandinogashira kubva kuvashandisi veArrow mutengo wakakwirisa wekutamisa mahombe edata retabular kubva pamutsara- kana fomati yakarerekera kufomati yakatarisana nekoramu. Kune akawanda-gigabyte datasets, transposing mundangariro kana pa diski inogona kuve basa rakakura.

Kufambisa data, ingave iyo sosi data iri mutsara kana koramu, imwe sarudzo ndeyekutumira mabheti madiki emitsara, imwe neimwe iine columnar marongero mukati.

MuApache Arrow, muunganidzwa we-mu-memory column arrays inomiririra tafura chunk inonzi rekodhi batch. Kumiririra imwe data chimiro chetafura inonzwisisika, akati wandei mabheti erekodhi anogona kuunganidzwa.

Mune iripo "random access" faira fomati, tinorekodha metadata ine tafura schema uye nzvimbo dzekuvharira pakupera kwefaira, zvichikubvumidza kuti usarudze zvakachipa chero batch rekodhi kana chero column kubva pane data set. Mukutepfenyura fomati, tinotumira akatevedzana mameseji: rondedzero, uyezve rimwe kana akawanda mabheti emarekodhi.

Mafomu akasiyana anotaridzika seizvi:

Yekufambisa mbiru data neApache Arrow

Yekufambisa Data muPyArrow: Kushandisa

Kuti ndikuratidze kuti izvi zvinoshanda sei, ini ndichagadzira muenzaniso dataset inomiririra imwechete rwizi chunk:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Zvino, ngatitii tinoda kunyora 1 GB yedata, ine chunks ye1 MB imwe neimwe, kwehuwandu hwe1024 chunks. Kutanga, ngatigadzire yekutanga 1 MB data furemu ine gumi nematanhatu makoramu:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Ndobva ndavashandura kuti pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Iye zvino ini ndichagadzira rwizi rwunobuda rwunonyora ku RAM uye kugadzira StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Zvadaro tichanyora 1024 chunks, iyo inozopedzisira yasvika ku1GB yedata set:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Sezvo isu takanyorera RAM, tinogona kuwana rwizi rwese mune imwe buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Sezvo iyi data iri mundangariro, kuverenga mabheti eArrow marekodhi ibasa rezero-kopi. Ini ndinovhura StreamReader, ndinoverenga data mukati pyarrow.Table, wobva waashandura kuva DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Zvese izvi, hongu, zvakanaka, asi unogona kunge uine mibvunzo. Izvi zvinokurumidza sei kuitika? Saizi yechunk inokanganisa sei kuita kwepandas DataFrame kutora?

Streaming Performance

Sezvo saizi yekutepfenyura chunk inodzikira, mutengo wekuvakazve inobatika columnar DataFrame mupandas inowedzera nekuda kwekusashanda cache yekuwana mapatani. Iko kune zvakare kumwe pamusoro kubva pakushanda neC ++ data zvimiro uye arrays uye yavo yekurangarira buffers.

Kune 1 MB, sepamusoro, palaptop yangu (Quad-core Xeon E3-1505M) zvinobuda:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Zvinoitika kuti iyo inoshanda yekufambisa ndeye 7.75 GB/s kudzoreredza 1GB DataFrame kubva ku1024 1MB chunks. Chii chinoitika kana tikashandisa machunks makuru kana madiki? Izvi ndizvo zvabuda:

Yekufambisa mbiru data neApache Arrow

Kuita kunodonha zvakanyanya kubva pa256K kusvika ku64K chunks. Ndakashamisika kuti 1 MB chunks yakagadziriswa nekukurumidza kupfuura 16 MB chunks. Zvakakodzera kuita chidzidzo chakadzama uye kunzwisisa kana uku kuri kugoverwa kwenguva dzose kana kuti pane chimwe chinhu chiri kutamba.

Mukushandiswa kwemazuva ano kwefomati, iyo data haina kumanikidzwa mumusimboti, saka saizi mundangariro uye "mumawaya" inenge yakafanana. Mune ramangwana, compression inogona kuve imwe sarudzo.

Mugumisiro

Kutepfenyura columnar data inogona kuva nzira inoshanda yekudyisa yakakura data seti mune columnar analytics maturusi se pandas mudiki chunks. Masevhisi edata anoshandisa mitsara yekuchengetera anogona kutamisa uye kutamisa madiki madiki e data ari nyore kune processor yako L2 uye L3 cache.

Kodhi yakazara

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Voeg