Kusakatula kwazagawo ndi Apache Arrow

Kumasulira kwa nkhaniyi kunakonzedwa makamaka kwa ophunzira a maphunzirowo Data Engineer.

Kusakatula kwazagawo ndi Apache Arrow

Pamasabata angapo apitawa tatero Nong Li anawonjezera ku Apache Arrow mtundu wosakanizidwa wa binary, womwe ukugwirizana ndi mtundu wa fayilo wa IPC womwe ulipo. Tili ndi kukhazikitsa kwa Java ndi C ++ ndi zomangira za Python. M'nkhaniyi, ndifotokoza momwe mawonekedwe amagwirira ntchito ndikuwonetsa momwe mungakwaniritsire kuchuluka kwa data pa pandas DataFrame.

Kukhamukira Column Data

Funso lofala lomwe ndimalandira kuchokera kwa ogwiritsa ntchito Arrow ndi kukwera mtengo kwa kusamutsa magulu akuluakulu a data ya tabular kuchokera pamzere- kapena mawonekedwe ojambulidwa kupita ku fomati yokhazikika. Kwa ma dataset a ma gigabyte angapo, kutumiza pamtima kapena pa disk kungakhale ntchito yayikulu.

Kuti musunthire deta, kaya gwero la data ndi mzere kapena mzere, njira imodzi ndiyo kutumiza timagulu tating'ono ta mizere, iliyonse ili ndi mapangidwe a columnar mkati.

Mu Apache Arrow, mndandanda wazomwe mukukumbukira zomwe zikuyimira chunk ya tebulo zimatchedwa batch record. Kuyimira dongosolo limodzi la deta la tebulo lomveka, magulu angapo a zolemba akhoza kusonkhanitsidwa.

M'mafayilo omwe alipo kale "ofikira mwachisawawa", timalemba metadata yomwe ili ndi schema ya tebulo ndikutchinga malo kumapeto kwa fayilo, zomwe zimakupatsani mwayi wosankha motsika mtengo kwambiri zolemba zilizonse kapena ndime iliyonse kuchokera pagulu la data. Mu mtundu wokhamukira, timatumiza mauthenga angapo: autilaini, ndiyeno gulu limodzi kapena angapo a zolemba.

Mafomu osiyanasiyana amawoneka motere:

Kusakatula kwazagawo ndi Apache Arrow

Kusaka Zambiri mu PyArrow: Kugwiritsa Ntchito

Kuti ndikuwonetseni momwe izi zimagwirira ntchito, ndipanga chitsanzo cha dataset choyimira chunk imodzi:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Tsopano, tinene kuti tikufuna kulemba 1 GB ya data, yokhala ndi ma chunks a 1 MB iliyonse, pazambiri za 1024 chunks. Kuti tiyambe, tiyeni tipange chithunzi choyambirira cha 1 MB chokhala ndi mizati 16:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Kenako ndimawatembenuza kukhala pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Tsopano ndipanga mtsinje wotuluka womwe ungalembe ku RAM ndikupanga StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Kenako tilemba 1024 chunks, yomwe pamapeto pake idzakhala 1GB data seti:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Popeza tidalembera RAM, titha kupeza mtsinje wonsewo mu buffer imodzi:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Popeza detayi ili m'chikumbukiro, kuwerenga magulu a ma Arrow records ndi ntchito ya zero-copy. Ndimatsegula StreamReader, ndikuwerenga deta mkati pyarrow.Table, ndiyeno mutembenuzire iwo ku DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Zonsezi ndi zabwino, koma mutha kukhala ndi mafunso. Kodi izi zimachitika mwachangu bwanji? Kodi kukula kwa chunk kumakhudza bwanji kubweza kwa pandas DataFrame?

Masewero Okhamukira

Kuchulukirachulukira kukucheperachepera, mtengo wopangiranso dataFrame yolumikizana mu pandas ukuwonjezeka chifukwa cha njira zopezera posungira. Palinso mitu ina yochokera pakugwira ntchito ndi C ++ ma data ndi masanjidwe ndi ma buffer awo amakumbukiro.

Kwa 1 MB, monga pamwambapa, pa laputopu yanga (Quad-core Xeon E3-1505M) imakhala:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Zikuoneka kuti kutulutsa kothandiza ndi 7.75 GB/s kubwezeretsa 1GB DataFrame kuchokera ku 1024 1MB chunks. Kodi chimachitika ndi chiyani ngati tigwiritsa ntchito zigawo zazikulu kapena zazing'ono? Zotsatira zake ndi izi:

Kusakatula kwazagawo ndi Apache Arrow

Kuchita kumatsika kwambiri kuchokera ku 256K mpaka 64K chunks. Ndinadabwa kuti 1 MB chunks idakonzedwa mwachangu kuposa chunks 16 MB. Ndikoyenera kuchita kafukufuku wozama komanso kumvetsetsa ngati izi ndi zogawa bwino kapena ngati pali zina zomwe zikuseweredwa.

Pakukhazikitsa kwaposachedwa kwamtunduwu, detayo siyimapanikizidwa, kotero kukula kwa kukumbukira ndi "muwaya" kumakhala kofanana. M'tsogolomu, kuponderezana kungakhale njira yowonjezera.

Zotsatira

Kusamutsa deta ya columnar kumatha kukhala njira yabwino yoperekera ma data akulu mu zida zowunikira ngati ma pandas ang'onoang'ono. Ntchito za data zomwe zimagwiritsa ntchito kusungirako zokhala ndi mizere zimatha kusamutsa ndikusintha magawo ang'onoang'ono a data omwe ndi osavuta kusungirako L2 ndi L3 ya purosesa yanu.

Full kodi

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Kuwonjezera ndemanga