Streaming kem cov ntaub ntawv nrog Apache Arrow

Cov lus txhais ntawm tsab xov xwm tau npaj tshwj xeeb rau cov tub ntxhais kawm ntawm chav kawm Data Engineer.

Streaming kem cov ntaub ntawv nrog Apache Arrow

Ob peb lub lis piam dhau los, peb Nong Li ntxiv rau Apache Xub binary kwj hom, ntxiv rau cov uas twb muaj lawm random nkag / IPC cov ntaub ntawv hom. Peb muaj Java thiab C ++ kev siv thiab Python bindings. Hauv tsab xov xwm no, kuv yuav piav qhia yuav ua li cas hom ntawv ua haujlwm thiab qhia tias koj tuaj yeem ua tiav cov ntaub ntawv siab dhau los rau pandas DataFrame.

Streaming kem cov ntaub ntawv

Ib lo lus nug uas kuv tau txais los ntawm Arrow cov neeg siv yog tus nqi siab ntawm migrating loj tabular datasets los ntawm kab los yog cov ntaub ntawv taw qhia hom ntawv mus rau ib kab ntawv. Rau multi-gigabyte datasets, transposing nyob rau hauv lub cim xeeb los yog nyob rau hauv disk yuav ua tau yooj yim.

Rau streaming cov ntaub ntawv, txawm tias cov ntaub ntawv los yog kab, ib qho kev xaiv yog xa cov kab me me ntawm cov kab, txhua tus uas muaj cov kab ke sab hauv.

Hauv Apache Arrow, ib qho kev sau ntawm hauv-nco columnar arrays sawv cev rau lub rooj chunk yog hu ua cov ntaub ntawv batch. Txhawm rau sawv cev rau ib tus qauv ntaub ntawv ntawm lub rooj sib tham, koj tuaj yeem sau ntau cov ntaub ntawv teev tseg.

Nyob rau hauv cov ntaub ntawv "random access" uas twb muaj lawm, peb sau metadata uas muaj lub rooj schema thiab thaiv layout ntawm qhov kawg ntawm cov ntaub ntawv, uas tso cai rau koj xaiv ib pawg ntawm cov ntaub ntawv los yog ib kem los ntawm cov dataset pheej yig heev. Nyob rau hauv ib tug streaming hom, peb xa ib tug series ntawm cov lus: ib tug schema, thiab ces ib los yog ntau batch ntawm cov ntaub ntawv.

Cov qauv sib txawv zoo ib yam li daim duab no:

Streaming kem cov ntaub ntawv nrog Apache Arrow

Streaming cov ntaub ntawv hauv PyArrow: daim ntawv thov

Txhawm rau qhia koj tias nws ua haujlwm li cas, Kuv yuav tsim ib qho piv txwv dataset sawv cev rau ib qho kwj chunk:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Tam sim no, xav tias peb xav sau 1 GB ntawm cov ntaub ntawv, suav nrog 1 MB chunks txhua, rau tag nrho ntawm 1024 chunks. Ua ntej, cia peb tsim thawj 1MB cov ntaub ntawv ncej nrog 16 kab:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Ces kuv hloov lawv mus pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Tam sim no kuv yuav tsim cov kwj tawm uas yuav sau rau RAM thiab tsim StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Tom qab ntawd peb yuav sau 1024 chunks, uas thaum kawg yuav tsim 1GB dataset:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Txij li thaum peb sau hauv RAM, peb tuaj yeem tau txais tag nrho cov kwj hauv ib qho tsis:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Vim tias cov ntaub ntawv no nyob rau hauv lub cim xeeb, kev nyeem cov ntaub ntawv ntawm Arrow cov ntaub ntawv tau txais los ntawm kev ua haujlwm xoom-copy. Kuv qhib StreamReader, nyeem cov ntaub ntawv rau hauv pyarrow.Tablethiab ces hloov lawv mus rau DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tag nrho cov no, tau kawg, yog qhov zoo, tab sis koj tuaj yeem muaj lus nug. Nws tshwm sim sai npaum li cas? Cov chunk loj cuam tshuam li cas pandas DataFrame retrieval kev ua tau zoo?

Streaming kev ua tau zoo

Raws li cov streaming chunk loj zuj zus, tus nqi ntawm reconstructing ib tug contiguous columnar DataFrame nyob rau hauv pandas nce vim tsis muaj peev xwm cache nkag schemes. Tseem muaj qee qhov nyiaj siv ua haujlwm los ntawm kev ua haujlwm nrog C ++ cov ntaub ntawv cov qauv thiab cov arrays thiab lawv lub cim xeeb buffers.

Rau 1 MB raws li saum toj no, ntawm kuv lub laptop (Quad-core Xeon E3-1505M) nws hloov tawm:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Nws hloov tawm tias qhov kev ua tau zoo dhau los yog 7.75 Gb / s rau kev kho dua 1 GB DataFrame los ntawm 1024 1 MB chunks. Yuav ua li cas yog tias peb siv cov chunks loj lossis me? Nov yog cov txiaj ntsig koj tau txais:

Streaming kem cov ntaub ntawv nrog Apache Arrow

Kev ua tau zoo poob qis los ntawm 256K txog 64K chunks. Kuv xav tsis thoob tias 1MB chunks tau ua tiav sai dua 16MB chunks. Nws yog tsim nyog ua ib qho kev tshawb fawb ntau dua thiab nkag siab seb qhov no puas yog ib qho kev faib tawm lossis lwm yam koom nrog.

Hauv kev siv tam sim no ntawm hom ntawv, cov ntaub ntawv tsis yog compressed hauv txoj cai, yog li qhov loj me hauv nco thiab "ntawm cov xaim" yog kwv yees li qub. Compression yuav dhau los ua ib qho kev xaiv yav tom ntej.

Qhov no

Kev xa cov ntaub ntawv kab ke tuaj yeem yog txoj hauv kev zoo los hloov cov ntaub ntawv loj rau cov cuab yeej ntsuas kab ke xws li pandas hauv cov chunks me. Cov kev pabcuam cov ntaub ntawv uas siv cov kab-taw qhia cia tuaj yeem hloov pauv thiab xa cov ntaub ntawv me me uas yooj yim dua rau koj lub processor L2 thiab L3 cache.

Tag nrho cov cai

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Tau qhov twg los: www.hab.com

Ntxiv ib saib