ื”ื–ืจืžืช ื ืชื•ื ื™ ืขืžื•ื“ื•ืช ืขื Apache Arrow

ืชืจื’ื•ื ื”ืžืืžืจ ื”ื•ื›ืŸ ื‘ืžื™ื•ื—ื“ ืขื‘ื•ืจ ืชืœืžื™ื“ื™ ื”ืงื•ืจืก ืžื”ื ื“ืก ื ืชื•ื ื™ื.

ื”ื–ืจืžืช ื ืชื•ื ื™ ืขืžื•ื“ื•ืช ืขื Apache Arrow

ื‘ืžื”ืœืš ื”ืฉื‘ื•ืขื•ืช ื”ืื—ืจื•ื ื™ื ื™ืฉ ืœื ื• ื ื•ื ื’ ืœื™ ื ื•ืกืฃ ืœ ื—ืฅ ืืคืืฆ'ื™ ืคื•ืจืžื˜ ืกื˜ืจื™ืžื™ื ื’ ื‘ื™ื ืืจื™, ื”ืžืฉืœื™ื ืืช ืคื•ืจืžื˜ ื”ืงื•ื‘ืฅ ื”ืงื™ื™ื ื‘ื’ื™ืฉื” ืืงืจืื™ืช/IPC. ื™ืฉ ืœื ื• ื™ื™ืฉื•ืžื™ Java ื•-C++ ื•ืงืฉืจื™ื ืœืคื™ื™ืชื•ืŸ. ื‘ืžืืžืจ ื–ื”, ืืกื‘ื™ืจ ื›ื™ืฆื“ ื”ืคื•ืจืžื˜ ืขื•ื‘ื“ ื•ืืจืื” ื›ื™ืฆื“ ื ื™ืชืŸ ืœื”ืฉื™ื’ ืชืคื•ืงืช ื ืชื•ื ื™ื ื’ื‘ื•ื”ื” ืžืื•ื“ ืขื‘ื•ืจ DataFrame ืฉืœ pandas.

ื”ื–ืจืžืช ื ืชื•ื ื™ ืขืžื•ื“ื•ืช

ืฉืืœื” ื ืคื•ืฆื” ืฉืื ื™ ืžืงื‘ืœ ืžืžืฉืชืžืฉื™ Arrow ื”ื™ื ื”ืขืœื•ืช ื”ื’ื‘ื•ื”ื” ืฉืœ ื”ืขื‘ืจืช ืงื‘ื•ืฆื•ืช ื’ื“ื•ืœื•ืช ืฉืœ ื ืชื•ื ื™ื ื˜ื‘ืœืื™ื™ื ืžืคื•ืจืžื˜ ืžื•ื›ื•ื•ืŸ ืฉื•ืจื” ืื• ืจืฉื•ืžื” ืœืคื•ืจืžื˜ ืžื•ื ื—ื” ืขืžื•ื“ื•ืช. ืขื‘ื•ืจ ืžืขืจื›ื™ ื ืชื•ื ื™ื ืžืจื•ื‘ื™ ื’'ื™ื’ื”-ื‘ื™ื™ื˜, ื”ืขื‘ืจื” ื‘ื–ื™ื›ืจื•ืŸ ืื• ื‘ื“ื™ืกืง ื™ื›ื•ืœื” ืœื”ื™ื•ืช ืžืฉื™ืžื” ืžื›ืจื™ืขื”.

ื›ื“ื™ ืœื”ื–ืจื™ื ื ืชื•ื ื™ื, ื‘ื™ืŸ ืื ื ืชื•ื ื™ ื”ืžืงื•ืจ ื”ื ืฉื•ืจื” ืื• ืขืžื•ื“ื”, ืืคืฉืจื•ืช ืื—ืช ื”ื™ื ืœืฉืœื•ื— ืงื‘ื•ืฆื•ืช ืงื˜ื ื•ืช ืฉืœ ืฉื•ืจื•ืช, ืฉื›ืœ ืื—ืช ืžื”ืŸ ืžื›ื™ืœื” ืคืจื™ืกื” ืขืžื•ื“ื” ื‘ืคื ื™ื.

ื‘-Apache Arrow, ืื•ืกืฃ ืžืขืจื›ื™ ื”ืขืžื•ื“ื•ืช ื‘ื–ื™ื›ืจื•ืŸ ื”ืžื™ื™ืฆื’ื™ื ื ืชื— ื˜ื‘ืœื” ื ืงืจื ืืฆื•ื•ื” ืจืฉื•ืžื•ืช. ื›ื“ื™ ืœื™ื™ืฆื’ ืžื‘ื ื” ื ืชื•ื ื™ื ื‘ื•ื“ื“ ืฉืœ ื˜ื‘ืœื” ืœื•ื’ื™ืช, ื ื™ืชืŸ ืœืืกื•ืฃ ืžืกืคืจ ืงื‘ื•ืฆื•ืช ืฉืœ ืจืฉื•ืžื•ืช.

ื‘ืคื•ืจืžื˜ ื”ืงื•ื‘ืฅ "ื’ื™ืฉื” ืืงืจืื™ืช" ื”ืงื™ื™ื, ืื ื• ืžืชืขื“ื™ื ืžื˜ื ื ืชื•ื ื™ื ื”ืžื›ื™ืœื™ื ืืช ืกื›ื™ืžืช ื”ื˜ื‘ืœื” ื•ืžื™ืงื•ืžื™ ื—ืกื™ืžื” ื‘ืกื•ืฃ ื”ืงื•ื‘ืฅ, ืžื” ืฉืžืืคืฉืจ ืœืš ืœื‘ื—ื•ืจ ื‘ื–ื•ืœ ื‘ืžื™ื•ื—ื“ ื›ืœ ืืฆื•ื•ื” ืฉืœ ืจืฉื•ืžื•ืช ืื• ื›ืœ ืขืžื•ื“ื” ืžืชื•ืš ืžืขืจืš ื ืชื•ื ื™ื. ื‘ืคื•ืจืžื˜ ืกื˜ืจื™ืžื™ื ื’, ืื ื• ืฉื•ืœื—ื™ื ืกื“ืจื” ืฉืœ ื”ื•ื“ืขื•ืช: ืžืชื•ื•ื”, ื•ืœืื—ืจ ืžื›ืŸ ืืฆื•ื•ื” ืื—ืช ืื• ื™ื•ืชืจ ืฉืœ ืจืฉื•ืžื•ืช.

ื”ืคื•ืจืžื˜ื™ื ื”ืฉื•ื ื™ื ื ืจืื™ื ื‘ืขืจืš ื›ืš:

ื”ื–ืจืžืช ื ืชื•ื ื™ ืขืžื•ื“ื•ืช ืขื Apache Arrow

ื”ื–ืจืžืช ื ืชื•ื ื™ื ื‘- PyArrow: ืืคืœื™ืงืฆื™ื”

ื›ื“ื™ ืœื”ืจืื•ืช ืœืš ืื™ืš ื–ื” ืขื•ื‘ื“, ืื ื™ ืืฆื•ืจ ืžืขืจืš ื ืชื•ื ื™ื ืœื“ื•ื’ืžื” ื”ืžื™ื™ืฆื’ ื ืชื— ื–ืจื ื™ื—ื™ื“:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

ื›ืขืช, ื ื ื™ื— ืฉืื ื• ืจื•ืฆื™ื ืœื›ืชื•ื‘ 1 ื’'ื™ื’ื”-ื‘ื™ื™ื˜ ืฉืœ ื ืชื•ื ื™ื, ื”ืžื•ืจื›ื‘ื™ื ืžื ืชื—ื™ื ืฉืœ 1 ืžื’ื”-ื‘ื™ื™ื˜ ื›ืœ ืื—ื“, ืขื‘ื•ืจ ืกืš ืฉืœ 1024 ื ืชื—ื™ื. ื›ื“ื™ ืœื”ืชื—ื™ืœ, ื‘ื•ืื• ื ื™ืฆื•ืจ ืืช ืžืกื’ืจืช ื”ื ืชื•ื ื™ื ื”ืจืืฉื•ื ื” ืฉืœ 1 MB ืขื 16 ืขืžื•ื“ื•ืช:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

ื•ืื– ืื ื™ ืžืžื™ืจ ืื•ืชื ืœ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

ืขื›ืฉื™ื• ืื ื™ ืืฆื•ืจ ื–ืจื ืคืœื˜ ืฉื™ื›ืชื•ื‘ ืœ-RAM ื•ื™ื™ืฆื•ืจ StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

ืœืื—ืจ ืžื›ืŸ ื ื›ืชื•ื‘ 1024 ื ืชื—ื™ื, ืฉื™ืกืชื›ืžื• ื‘ืกื•ืคื• ืฉืœ ื“ื‘ืจ ื‘ืžืขืจืš ื ืชื•ื ื™ื ืฉืœ 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

ืžื›ื™ื•ื•ืŸ ืฉื›ืชื‘ื ื• ืœ-RAM, ื ื•ื›ืœ ืœืงื‘ืœ ืืช ื›ืœ ื”ื–ืจื ื‘ืžืื’ืจ ืื—ื“:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

ืžื›ื™ื•ื•ืŸ ืฉื”ื ืชื•ื ื™ื ื”ืืœื” ื ืžืฆืื™ื ื‘ื–ื™ื›ืจื•ืŸ, ืงืจื™ืืช ืงื‘ื•ืฆื•ืช ืฉืœ ืจืฉื•ืžื•ืช ื—ืฅ ื”ื™ื ืคืขื•ืœืช ืืคืก ื”ืขืชืงื”. ืื ื™ ืคื•ืชื— ืืช StreamReader, ืžืงืจื™ื ื ืชื•ื ื™ื pyarrow.Table, ื•ืœืื—ืจ ืžื›ืŸ ื”ืžืจ ืื•ืชื ืœ DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

ื›ืœ ื–ื”, ื›ืžื•ื‘ืŸ, ื˜ื•ื‘, ืื‘ืœ ืื•ืœื™ ื™ืฉ ืœืš ืฉืืœื•ืช. ื›ืžื” ืžื”ืจ ื–ื” ืงื•ืจื”? ื›ื™ืฆื“ ื’ื•ื“ืœ ื”ื ืชื— ืžืฉืคื™ืข ืขืœ ื‘ื™ืฆื•ืขื™ ืื—ื–ื•ืจ DataFrame ืฉืœ ืคื ื“ื”?

ื‘ื™ืฆื•ืขื™ ืกื˜ืจื™ืžื™ื ื’

ื›ื›ืœ ืฉื’ื•ื“ืœ ื ืชื— ื”ืกื˜ืจื™ืžื™ื ื’ ืคื•ื—ืช, ื”ืขืœื•ืช ืฉืœ ืฉื—ื–ื•ืจ DataFrame ืขืžื•ื“ื™ ืจืฆื™ืฃ ื‘ืคื ื“ื•ืช ืขื•ืœื” ืขืงื‘ ื“ืคื•ืกื™ ื’ื™ืฉื” ืœื ื™ืขื™ืœื™ื ืœืžื˜ืžื•ืŸ. ื™ืฉ ื’ื ืชืงื•ืจื” ืžืกื•ื™ืžืช ืžืขื‘ื•ื“ื” ืขื ืžื‘ื ื™ ื ืชื•ื ื™ื ื•ืžืขืจื›ื™ื ืฉืœ C++ ื•ืžืื’ืจื™ ื”ื–ื™ื›ืจื•ืŸ ืฉืœื”ื.

ืขื‘ื•ืจ 1 MB, ื›ืžื• ืœืขื™ืœ, ื‘ืžื—ืฉื‘ ื”ื ื™ื™ื“ ืฉืœื™ (Quad-core Xeon E3-1505M) ืžืชื‘ืจืจ:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

ืžืกืชื‘ืจ ืฉื”ืชืคื•ืงื” ื”ืืคืงื˜ื™ื‘ื™ืช ื”ื™ื 7.75 GB/s ืœืฉื—ื–ื•ืจ DataFrame ืฉืœ 1GB ืž-1024 ื ืชื—ื™ 1MB. ืžื” ืงื•ืจื” ืื ืื ื• ืžืฉืชืžืฉื™ื ื‘ื ืชื—ื™ื ื’ื“ื•ืœื™ื ื™ื•ืชืจ ืื• ืงื˜ื ื™ื ื™ื•ืชืจ? ืืœื• ื”ืชื•ืฆืื•ืช:

ื”ื–ืจืžืช ื ืชื•ื ื™ ืขืžื•ื“ื•ืช ืขื Apache Arrow

ื”ื‘ื™ืฆื•ืขื™ื ื™ื•ืจื“ื™ื ืžืฉืžืขื•ืชื™ืช ืž-256K ืœ-64K ื ืชื—ื™ื. ื”ื•ืคืชืขืชื™ ืฉ-1 MB chunks ืขื•ื‘ื“ื• ืžื”ืจ ื™ื•ืชืจ ืžืืฉืจ 16 MB chunks. ื›ื“ืื™ ืœืขืจื•ืš ืžื—ืงืจ ืžืขืžื™ืง ื™ื•ืชืจ ื•ืœื”ื‘ื™ืŸ ื”ืื ืžื“ื•ื‘ืจ ื‘ื”ืชืคืœื’ื•ืช ื ื•ืจืžืœื™ืช ืื• ืฉืžื ื™ืฉ ืžืฉื”ื• ืื—ืจ ื‘ืžืฉื—ืง.

ื‘ื™ื™ืฉื•ื ื”ื ื•ื›ื—ื™ ืฉืœ ื”ืคื•ืจืžื˜, ื”ื ืชื•ื ื™ื ืื™ื ื ื“ื—ื•ืกื™ื ื‘ืื•ืคืŸ ืขืงืจื•ื ื™, ื›ืš ืฉื”ื’ื•ื“ืœ ื‘ื–ื™ื›ืจื•ืŸ ื•ื‘"ื—ื•ื˜ื™ื" ื–ื”ื” ื‘ืขืจืš. ื‘ืขืชื™ื“, ื“ื—ื™ืกื” ืขืฉื•ื™ื” ืœื”ืคื•ืš ืœืืคืฉืจื•ืช ื ื•ืกืคืช.

ืกืš ื”ื›ืœ

ื”ื–ืจืžืช ื ืชื•ื ื™ื ืขืžื•ื“ื™ื ื™ื›ื•ืœื” ืœื”ื™ื•ืช ื“ืจืš ื™ืขื™ืœื” ืœื”ื–ื™ืŸ ืžืขืจื›ื™ ื ืชื•ื ื™ื ื’ื“ื•ืœื™ื ืœืชื•ืš ื›ืœื™ ื ื™ืชื•ื— ืขืžื•ื“ื™ื ื›ืžื• ืคื ื“ื•ืช ื‘ื ืชื—ื™ื ืงื˜ื ื™ื. ืฉื™ืจื•ืชื™ ื ืชื•ื ื™ื ื”ืžืฉืชืžืฉื™ื ื‘ืื—ืกื•ืŸ ืžื›ื•ื•ืŸ ืฉื•ืจื” ื™ื›ื•ืœื™ื ืœื”ืขื‘ื™ืจ ื•ืœื”ืขื‘ื™ืจ ื ืชื—ื™ื ืงื˜ื ื™ื ืฉืœ ื ืชื•ื ื™ื ื”ื ื•ื—ื™ื ื™ื•ืชืจ ืœืžื˜ืžื•ืŸ L2 ื•-L3 ืฉืœ ื”ืžืขื‘ื“ ืฉืœืš.

ืงื•ื“ ืžืœื

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”