์ด ๊ธฐ์ฌ์ ๋ฒ์ญ์ ์ฝ์ค์ ํ์๋ค์ ์ํด ํน๋ณํ ์ค๋น๋์์ต๋๋ค.
์ง๋ ๋ช ์ฃผ ๋์ ์ฐ๋ฆฌ๋
์ด ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ
Arrow ์ฌ์ฉ์๋ค๋ก๋ถํฐ ๋ฐ๋ ์ผ๋ฐ์ ์ธ ์ง๋ฌธ์ ํฐ ํ ์ด๋ธ ํ์ ๋ฐ์ดํฐ ์ธํธ๋ฅผ ํ ๋๋ ๋ ์ฝ๋ ์งํฅ ํ์์์ ์ด ํ์์ผ๋ก ๋ง์ด๊ทธ๋ ์ด์ ํ๋ ๋ฐ ๋๋ ๋์ ๋น์ฉ์ ๋๋ค. ๋ฉํฐ ๊ธฐ๊ฐ๋ฐ์ดํธ ๋ฐ์ดํฐ ์ธํธ์ ๊ฒฝ์ฐ ๋ฉ๋ชจ๋ฆฌ ๋๋ ๋์คํฌ์์ ์ ์นํ๋ ๊ฒ์ ์๋์ ์ผ ์ ์์ต๋๋ค.
์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ ์์ค ๋ฐ์ดํฐ๊ฐ ํ์ด๋ ์ด์ด๋ ํ ๊ฐ์ง ์ต์ ์ ๊ฐ๊ฐ ๋ด๋ถ์ ์ด ๋ ์ด์์์ ํฌํจํ๋ ์์ ํ ๋ฐฐ์น๋ฅผ ๋ณด๋ด๋ ๊ฒ์ ๋๋ค.
Apache Arrow์์๋ ํ ์ด๋ธ ์ฒญํฌ๋ฅผ ๋ํ๋ด๋ ๋ฉ๋ชจ๋ฆฌ ๋ด ์ปฌ๋ผ ๋ฐฐ์ด ๋ชจ์์ ๋ ์ฝ๋ ๋ฐฐ์น๋ผ๊ณ ํฉ๋๋ค. ๋ ผ๋ฆฌ์ ํ ์ด๋ธ์ ๋จ์ผ ๋ฐ์ดํฐ ๊ตฌ์กฐ๋ฅผ ๋ํ๋ด๊ธฐ ์ํด ์ฌ๋ฌ ๋ ์ฝ๋ ์งํฉ์ ์์งํ ์ ์์ต๋๋ค.
๊ธฐ์กด์ "๋ฌด์์ ์ก์ธ์ค" ํ์ผ ํ์์์๋ ํ์ผ ๋์ ํ ์ด๋ธ ์คํค๋ง์ ๋ธ๋ก ๋ ์ด์์์ด ํฌํจ๋ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๋ฏ๋ก ๋งค์ฐ ์ ๋ ดํ๊ฒ ๋ฐ์ดํฐ ์งํฉ์์ ๋ชจ๋ ๋ ์ฝ๋ ๋๋ ์ด์ ์ ํํ ์ ์์ต๋๋ค. ์คํธ๋ฆฌ๋ฐ ํ์์์๋ ์ผ๋ จ์ ๋ฉ์์ง, ์ฆ ์คํค๋ง์ ํ๋ ์ด์์ ๋ ์ฝ๋ ๋ฐฐ์น๋ฅผ ๋ณด๋ ๋๋ค.
๋ค์ํ ํ์์ ๋ค์ ๊ทธ๋ฆผ๊ณผ ๊ฐ์ต๋๋ค.
PyArrow์ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ: ์ ํ๋ฆฌ์ผ์ด์
์๋ ๋ฐฉ์์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ๋จ์ผ ์คํธ๋ฆผ ์ฒญํฌ๋ฅผ ๋ํ๋ด๋ ์์ ๋ฐ์ดํฐ ์ธํธ๋ฅผ ์์ฑํ๊ฒ ์ต๋๋ค.
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
์ด์ ์ด 1๊ฐ์ ์ฒญํฌ์ ๋ํด ๊ฐ๊ฐ 1MB ์ฒญํฌ๋ก ๊ตฌ์ฑ๋ 1024GB์ ๋ฐ์ดํฐ๋ฅผ ์ฐ๊ณ ์ถ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค. ๋จผ์ 1๊ฐ์ ์ด์ด ์๋ ์ฒซ ๋ฒ์งธ 16MB ๋ฐ์ดํฐ ํ๋ ์์ ์์ฑํด ๋ณด๊ฒ ์ต๋๋ค.
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
๊ทธ๋ฐ ๋ค์ ๋๋ ๊ทธ๊ฒ๋ค์ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
์ด์ RAM์ ์ฐ๊ณ ์์ฑํ ์ถ๋ ฅ ์คํธ๋ฆผ์ ์์ฑํฉ๋๋ค. StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
๊ทธ๋ฐ ๋ค์ 1024๊ฐ์ ์ฒญํฌ๋ฅผ ์์ฑํ๊ณ ๊ฒฐ๊ตญ 1GB ๋ฐ์ดํฐ ์ธํธ๋ฅผ ๊ตฌ์ฑํฉ๋๋ค.
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
RAM์ ์์ฑํ๊ธฐ ๋๋ฌธ์ ํ๋์ ๋ฒํผ์์ ์ ์ฒด ์คํธ๋ฆผ์ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค.
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
์ด ๋ฐ์ดํฐ๋ ๋ฉ๋ชจ๋ฆฌ์ ์๊ธฐ ๋๋ฌธ์ Zero-copy ์์
์ ํตํด Arrow ๋ ์ฝ๋์ ์ฝ๊ธฐ ๋ฐฐ์น๋ฅผ ์ป์ต๋๋ค. StreamReader๋ฅผ ์ด๊ณ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ต๋๋ค. pyarrow.Table
๊ทธ๋ฐ ๋ค์ DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
๋ฌผ๋ก ์ด ๋ชจ๋ ๊ฒ์ด ์ข์ง๋ง ์ง๋ฌธ์ด ์์ ์ ์์ต๋๋ค. ์ผ๋ง๋ ๋นจ๋ฆฌ ๋ฐ์ํฉ๋๊น? ์ฒญํฌ ํฌ๊ธฐ๋ pandas DataFrame ๊ฒ์ ์ฑ๋ฅ์ ์ด๋ค ์ํฅ์ ์ค๋๊น?
์คํธ๋ฆฌ๋ฐ ์ฑ๋ฅ
์คํธ๋ฆฌ๋ฐ ์ฒญํฌ ํฌ๊ธฐ๊ฐ ๊ฐ์ํจ์ ๋ฐ๋ผ ๋นํจ์จ์ ์ธ ์บ์ ์ก์ธ์ค ์ฒด๊ณ๋ก ์ธํด pandas์์ ์ฐ์์ ์ธ ์ด ํ์ DataFrame์ ์ฌ๊ตฌ์ฑํ๋ ๋น์ฉ์ด ์ฆ๊ฐํฉ๋๋ค. ๋ํ C++ ๋ฐ์ดํฐ ๊ตฌ์กฐ์ ๋ฐฐ์ด ๋ฐ ํด๋น ๋ฉ๋ชจ๋ฆฌ ๋ฒํผ๋ก ์์ ํ๋ ๋ฐ ์ฝ๊ฐ์ ์ค๋ฒํค๋๊ฐ ์์ต๋๋ค.
์์ ๊ฐ์ด 1MB์ ๊ฒฝ์ฐ ๋ด ๋ ธํธ๋ถ(์ฟผ๋ ์ฝ์ด Xeon E3-1505M)์์ ๋ค์๊ณผ ๊ฐ์ด ๋ํ๋ฉ๋๋ค.
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
7.75๊ฐ์ 1MB ์ฒญํฌ์์ 1024GB DataFrame์ ๋ณต์ํ๊ธฐ ์ํ ์ ํจ ์ฒ๋ฆฌ๋์ 1Gb/s์ธ ๊ฒ์ผ๋ก ๋ํ๋ฌ์ต๋๋ค. ํฌ๊ฑฐ๋ ์์ ์ฒญํฌ๋ฅผ ์ฌ์ฉํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น์? ์ป์ ๊ฒฐ๊ณผ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
์ฑ๋ฅ์ 256K์์ 64K ์ฒญํฌ๋ก ํฌ๊ฒ ๋จ์ด์ง๋๋ค. 1MB ์ฒญํฌ๊ฐ 16MB ์ฒญํฌ๋ณด๋ค ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌ๋์ด ๋๋์ต๋๋ค. ๋ณด๋ค ์ฒ ์ ํ ์ฐ๊ตฌ๋ฅผ ์ํํ๊ณ ์ด๊ฒ์ด ์ ๊ท ๋ถํฌ์ธ์ง ๋๋ ๋ค๋ฅธ ๊ฒ์ด ๊ด๋ จ๋์ด ์๋์ง ์ดํดํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
ํ์์ ํ์ฌ ๊ตฌํ์์ ๋ฐ์ดํฐ๋ ์์น์ ์ผ๋ก ์์ถ๋์ง ์์ผ๋ฏ๋ก ๋ฉ๋ชจ๋ฆฌ ํฌ๊ธฐ์ "์จ๋ ์์ด์ด"๋ ๊ฑฐ์ ๋์ผํฉ๋๋ค. ์์ถ์ ํฅํ ์ต์ ์ด ๋ ์ ์์ต๋๋ค.
ํฉ๊ณ
์คํธ๋ฆฌ๋ฐ ์ด ๋ฐ์ดํฐ๋ ํฐ ๋ฐ์ดํฐ ์ธํธ๋ฅผ ์์ ์ฒญํฌ๋ก ํ๋ค์ ๊ฐ์ ์ด ๋ถ์ ๋๊ตฌ๋ก ์ ์กํ๋ ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ด ๋ ์ ์์ต๋๋ค. ํ ๊ธฐ๋ฐ ์คํ ๋ฆฌ์ง๋ฅผ ์ฌ์ฉํ๋ ๋ฐ์ดํฐ ์๋น์ค๋ ํ๋ก์ธ์์ L2 ๋ฐ L3 ์บ์์ ๋ ํธ๋ฆฌํ ์์ ๋ฐ์ดํฐ ์ฒญํฌ๋ฅผ ์ ์กํ๊ณ ๋ฐ๊ฟ ์ ์์ต๋๋ค.
์ ์ฒด ์ฝ๋
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
์ถ์ฒ : habr.com