Apache Arrow๋กœ ์—ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ

์ด ๊ธฐ์‚ฌ์˜ ๋ฒˆ์—ญ์€ ์ฝ”์Šค์˜ ํ•™์ƒ๋“ค์„ ์œ„ํ•ด ํŠน๋ณ„ํžˆ ์ค€๋น„๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด.

Apache Arrow๋กœ ์—ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ

์ง€๋‚œ ๋ช‡ ์ฃผ ๋™์•ˆ ์šฐ๋ฆฌ๋Š” ๋† ๋ฆฌ ์— ์ถ”๊ฐ€ ์•„ํŒŒ์น˜ ์• ๋กœ์šฐ ์ด๋ฏธ ์กด์žฌํ•˜๋Š” ๋žœ๋ค ์•ก์„ธ์Šค/IPC ํŒŒ์ผ ํ˜•์‹์„ ๋ณด์™„ํ•˜๋Š” ์ด์ง„ ์ŠคํŠธ๋ฆผ ํ˜•์‹. Java ๋ฐ C++ ๊ตฌํ˜„๊ณผ Python ๋ฐ”์ธ๋”ฉ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ธฐ์‚ฌ์—์„œ๋Š” ํ˜•์‹์ด ์ž‘๋™ํ•˜๋Š” ๋ฐฉ์‹์„ ์„ค๋ช…ํ•˜๊ณ  pandas DataFrame์— ๋Œ€ํ•ด ๋งค์šฐ ๋†’์€ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋‹ฌ์„ฑํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

์—ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ

Arrow ์‚ฌ์šฉ์ž๋“ค๋กœ๋ถ€ํ„ฐ ๋ฐ›๋Š” ์ผ๋ฐ˜์ ์ธ ์งˆ๋ฌธ์€ ํฐ ํ…Œ์ด๋ธ” ํ˜•์‹ ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ํ–‰ ๋˜๋Š” ๋ ˆ์ฝ”๋“œ ์ง€ํ–ฅ ํ˜•์‹์—์„œ ์—ด ํ˜•์‹์œผ๋กœ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ํ•˜๋Š” ๋ฐ ๋“œ๋Š” ๋†’์€ ๋น„์šฉ์ž…๋‹ˆ๋‹ค. ๋ฉ€ํ‹ฐ ๊ธฐ๊ฐ€๋ฐ”์ดํŠธ ๋ฐ์ดํ„ฐ ์„ธํŠธ์˜ ๊ฒฝ์šฐ ๋ฉ”๋ชจ๋ฆฌ ๋˜๋Š” ๋””์Šคํฌ์—์„œ ์ „์น˜ํ•˜๋Š” ๊ฒƒ์€ ์••๋„์ ์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ ์†Œ์Šค ๋ฐ์ดํ„ฐ๊ฐ€ ํ–‰์ด๋“  ์—ด์ด๋“  ํ•œ ๊ฐ€์ง€ ์˜ต์…˜์€ ๊ฐ๊ฐ ๋‚ด๋ถ€์— ์—ด ๋ ˆ์ด์•„์›ƒ์„ ํฌํ•จํ•˜๋Š” ์ž‘์€ ํ–‰ ๋ฐฐ์น˜๋ฅผ ๋ณด๋‚ด๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

Apache Arrow์—์„œ๋Š” ํ…Œ์ด๋ธ” ์ฒญํฌ๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฉ”๋ชจ๋ฆฌ ๋‚ด ์ปฌ๋Ÿผ ๋ฐฐ์—ด ๋ชจ์Œ์„ ๋ ˆ์ฝ”๋“œ ๋ฐฐ์น˜๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค. ๋…ผ๋ฆฌ์  ํ…Œ์ด๋ธ”์˜ ๋‹จ์ผ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋ฅผ ๋‚˜ํƒ€๋‚ด๊ธฐ ์œ„ํ•ด ์—ฌ๋Ÿฌ ๋ ˆ์ฝ”๋“œ ์ง‘ํ•ฉ์„ ์ˆ˜์ง‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ธฐ์กด์˜ "๋ฌด์ž‘์œ„ ์•ก์„ธ์Šค" ํŒŒ์ผ ํ˜•์‹์—์„œ๋Š” ํŒŒ์ผ ๋์— ํ…Œ์ด๋ธ” ์Šคํ‚ค๋งˆ์™€ ๋ธ”๋ก ๋ ˆ์ด์•„์›ƒ์ด ํฌํ•จ๋œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ž‘์„ฑํ•˜๋ฏ€๋กœ ๋งค์šฐ ์ €๋ ดํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ ์ง‘ํ•ฉ์—์„œ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ ๋˜๋Š” ์—ด์„ ์„ ํƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ŠคํŠธ๋ฆฌ๋ฐ ํ˜•์‹์—์„œ๋Š” ์ผ๋ จ์˜ ๋ฉ”์‹œ์ง€, ์ฆ‰ ์Šคํ‚ค๋งˆ์™€ ํ•˜๋‚˜ ์ด์ƒ์˜ ๋ ˆ์ฝ”๋“œ ๋ฐฐ์น˜๋ฅผ ๋ณด๋ƒ…๋‹ˆ๋‹ค.

๋‹ค์–‘ํ•œ ํ˜•์‹์€ ๋‹ค์Œ ๊ทธ๋ฆผ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Apache Arrow๋กœ ์—ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ

PyArrow์˜ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜

์ž‘๋™ ๋ฐฉ์‹์„ ๋ณด์—ฌ์ฃผ๊ธฐ ์œ„ํ•ด ๋‹จ์ผ ์ŠคํŠธ๋ฆผ ์ฒญํฌ๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ์˜ˆ์ œ ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ์ƒ์„ฑํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

์ด์ œ ์ด 1๊ฐœ์˜ ์ฒญํฌ์— ๋Œ€ํ•ด ๊ฐ๊ฐ 1MB ์ฒญํฌ๋กœ ๊ตฌ์„ฑ๋œ 1024GB์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์“ฐ๊ณ  ์‹ถ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค. ๋จผ์ € 1๊ฐœ์˜ ์—ด์ด ์žˆ๋Š” ์ฒซ ๋ฒˆ์งธ 16MB ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ์ƒ์„ฑํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

๊ทธ๋Ÿฐ ๋‹ค์Œ ๋‚˜๋Š” ๊ทธ๊ฒƒ๋“ค์„ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

์ด์ œ RAM์— ์“ฐ๊ณ  ์ƒ์„ฑํ•  ์ถœ๋ ฅ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

๊ทธ๋Ÿฐ ๋‹ค์Œ 1024๊ฐœ์˜ ์ฒญํฌ๋ฅผ ์ž‘์„ฑํ•˜๊ณ  ๊ฒฐ๊ตญ 1GB ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

RAM์— ์ž‘์„ฑํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ํ•˜๋‚˜์˜ ๋ฒ„ํผ์—์„œ ์ „์ฒด ์ŠคํŠธ๋ฆผ์„ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

์ด ๋ฐ์ดํ„ฐ๋Š” ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ๊ธฐ ๋•Œ๋ฌธ์— Zero-copy ์ž‘์—…์„ ํ†ตํ•ด Arrow ๋ ˆ์ฝ”๋“œ์˜ ์ฝ๊ธฐ ๋ฐฐ์น˜๋ฅผ ์–ป์Šต๋‹ˆ๋‹ค. StreamReader๋ฅผ ์—ด๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์Šต๋‹ˆ๋‹ค. pyarrow.Table๊ทธ๋Ÿฐ ๋‹ค์Œ DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

๋ฌผ๋ก  ์ด ๋ชจ๋“  ๊ฒƒ์ด ์ข‹์ง€๋งŒ ์งˆ๋ฌธ์ด ์žˆ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์–ผ๋งˆ๋‚˜ ๋นจ๋ฆฌ ๋ฐœ์ƒํ•ฉ๋‹ˆ๊นŒ? ์ฒญํฌ ํฌ๊ธฐ๋Š” pandas DataFrame ๊ฒ€์ƒ‰ ์„ฑ๋Šฅ์— ์–ด๋–ค ์˜ํ–ฅ์„ ์ค๋‹ˆ๊นŒ?

์ŠคํŠธ๋ฆฌ๋ฐ ์„ฑ๋Šฅ

์ŠคํŠธ๋ฆฌ๋ฐ ์ฒญํฌ ํฌ๊ธฐ๊ฐ€ ๊ฐ์†Œํ•จ์— ๋”ฐ๋ผ ๋น„ํšจ์œจ์ ์ธ ์บ์‹œ ์•ก์„ธ์Šค ์ฒด๊ณ„๋กœ ์ธํ•ด pandas์—์„œ ์—ฐ์†์ ์ธ ์—ด ํ˜•์‹ DataFrame์„ ์žฌ๊ตฌ์„ฑํ•˜๋Š” ๋น„์šฉ์ด ์ฆ๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ C++ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ์™€ ๋ฐฐ์—ด ๋ฐ ํ•ด๋‹น ๋ฉ”๋ชจ๋ฆฌ ๋ฒ„ํผ๋กœ ์ž‘์—…ํ•˜๋Š” ๋ฐ ์•ฝ๊ฐ„์˜ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

์œ„์™€ ๊ฐ™์ด 1MB์˜ ๊ฒฝ์šฐ ๋‚ด ๋…ธํŠธ๋ถ(์ฟผ๋“œ ์ฝ”์–ด Xeon E3-1505M)์—์„œ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค.

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75๊ฐœ์˜ 1MB ์ฒญํฌ์—์„œ 1024GB DataFrame์„ ๋ณต์›ํ•˜๊ธฐ ์œ„ํ•œ ์œ ํšจ ์ฒ˜๋ฆฌ๋Ÿ‰์€ 1Gb/s์ธ ๊ฒƒ์œผ๋กœ ๋‚˜ํƒ€๋‚ฌ์Šต๋‹ˆ๋‹ค. ํฌ๊ฑฐ๋‚˜ ์ž‘์€ ์ฒญํฌ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ์š”? ์–ป์€ ๊ฒฐ๊ณผ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Apache Arrow๋กœ ์—ด ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ

์„ฑ๋Šฅ์€ 256K์—์„œ 64K ์ฒญํฌ๋กœ ํฌ๊ฒŒ ๋–จ์–ด์ง‘๋‹ˆ๋‹ค. 1MB ์ฒญํฌ๊ฐ€ 16MB ์ฒญํฌ๋ณด๋‹ค ๋น ๋ฅด๊ฒŒ ์ฒ˜๋ฆฌ๋˜์–ด ๋†€๋ž์Šต๋‹ˆ๋‹ค. ๋ณด๋‹ค ์ฒ ์ €ํ•œ ์—ฐ๊ตฌ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ณ  ์ด๊ฒƒ์ด ์ •๊ทœ ๋ถ„ํฌ์ธ์ง€ ๋˜๋Š” ๋‹ค๋ฅธ ๊ฒƒ์ด ๊ด€๋ จ๋˜์–ด ์žˆ๋Š”์ง€ ์ดํ•ดํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.

ํ˜•์‹์˜ ํ˜„์žฌ ๊ตฌํ˜„์—์„œ ๋ฐ์ดํ„ฐ๋Š” ์›์น™์ ์œผ๋กœ ์••์ถ•๋˜์ง€ ์•Š์œผ๋ฏ€๋กœ ๋ฉ”๋ชจ๋ฆฌ ํฌ๊ธฐ์™€ "์˜จ๋” ์™€์ด์–ด"๋Š” ๊ฑฐ์˜ ๋™์ผํ•ฉ๋‹ˆ๋‹ค. ์••์ถ•์€ ํ–ฅํ›„ ์˜ต์…˜์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ•ฉ๊ณ„

์ŠคํŠธ๋ฆฌ๋ฐ ์—ด ๋ฐ์ดํ„ฐ๋Š” ํฐ ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ์ž‘์€ ์ฒญํฌ๋กœ ํŒ๋‹ค์™€ ๊ฐ™์€ ์—ด ๋ถ„์„ ๋„๊ตฌ๋กœ ์ „์†กํ•˜๋Š” ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ–‰ ๊ธฐ๋ฐ˜ ์Šคํ† ๋ฆฌ์ง€๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐ์ดํ„ฐ ์„œ๋น„์Šค๋Š” ํ”„๋กœ์„ธ์„œ์˜ L2 ๋ฐ L3 ์บ์‹œ์— ๋” ํŽธ๋ฆฌํ•œ ์ž‘์€ ๋ฐ์ดํ„ฐ ์ฒญํฌ๋ฅผ ์ „์†กํ•˜๊ณ  ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ „์ฒด ์ฝ”๋“œ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€