ืชืจืืื ืืืืืจ ืืืื ืืืืืื ืขืืืจ ืชืืืืื ืืงืืจืก
ืืืืื ืืฉืืืขืืช ืืืืจืื ืื ืืฉ ืื ื
ืืืจืืช ื ืชืื ื ืขืืืืืช
ืฉืืื ื ืคืืฆื ืฉืื ื ืืงืื ืืืฉืชืืฉื Arrow ืืื ืืขืืืช ืืืืืื ืฉื ืืขืืจืช ืงืืืฆืืช ืืืืืืช ืฉื ื ืชืื ืื ืืืืืืื ืืคืืจืื ืืืืืื ืฉืืจื ืื ืจืฉืืื ืืคืืจืื ืืื ืื ืขืืืืืช. ืขืืืจ ืืขืจืื ื ืชืื ืื ืืจืืื ื'ืืื-ืืืื, ืืขืืจื ืืืืืจืื ืื ืืืืกืง ืืืืื ืืืืืช ืืฉืืื ืืืจืืขื.
ืืื ืืืืจืื ื ืชืื ืื, ืืื ืื ื ืชืื ื ืืืงืืจ ืื ืฉืืจื ืื ืขืืืื, ืืคืฉืจืืช ืืืช ืืื ืืฉืืื ืงืืืฆืืช ืงืื ืืช ืฉื ืฉืืจืืช, ืฉืื ืืืช ืืื ืืืืื ืคืจืืกื ืขืืืื ืืคื ืื.
ื-Apache Arrow, ืืืกืฃ ืืขืจืื ืืขืืืืืช ืืืืืจืื ืืืืืฆืืื ื ืชื ืืืื ื ืงืจื ืืฆืืื ืจืฉืืืืช. ืืื ืืืืฆื ืืื ื ื ืชืื ืื ืืืื ืฉื ืืืื ืืืืืช, ื ืืชื ืืืกืืฃ ืืกืคืจ ืงืืืฆืืช ืฉื ืจืฉืืืืช.
ืืคืืจืื ืืงืืืฅ "ืืืฉื ืืงืจืืืช" ืืงืืื, ืื ื ืืชืขืืื ืืื ื ืชืื ืื ืืืืืืื ืืช ืกืืืืช ืืืืื ืืืืงืืื ืืกืืื ืืกืืฃ ืืงืืืฅ, ืื ืฉืืืคืฉืจ ืื ืืืืืจ ืืืื ืืืืืื ืื ืืฆืืื ืฉื ืจืฉืืืืช ืื ืื ืขืืืื ืืชืื ืืขืจื ื ืชืื ืื. ืืคืืจืื ืกืืจืืืื ื, ืื ื ืฉืืืืื ืกืืจื ืฉื ืืืืขืืช: ืืชืืื, ืืืืืจ ืืื ืืฆืืื ืืืช ืื ืืืชืจ ืฉื ืจืฉืืืืช.
ืืคืืจืืืื ืืฉืื ืื ื ืจืืื ืืขืจื ืื:
ืืืจืืช ื ืชืื ืื ื- PyArrow: ืืคืืืงืฆืื
ืืื ืืืจืืืช ืื ืืื ืื ืขืืื, ืื ื ืืฆืืจ ืืขืจื ื ืชืื ืื ืืืืืื ืืืืืฆื ื ืชื ืืจื ืืืื:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
ืืขืช, ื ื ืื ืฉืื ื ืจืืฆืื ืืืชืื 1 ื'ืืื-ืืืื ืฉื ื ืชืื ืื, ืืืืจืืืื ืื ืชืืื ืฉื 1 ืืื-ืืืื ืื ืืื, ืขืืืจ ืกื ืฉื 1024 ื ืชืืื. ืืื ืืืชืืื, ืืืื ื ืืฆืืจ ืืช ืืกืืจืช ืื ืชืื ืื ืืจืืฉืื ื ืฉื 1 MB ืขื 16 ืขืืืืืช:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
ืืื ืื ื ืืืืจ ืืืชื ื pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
ืขืืฉืื ืื ื ืืฆืืจ ืืจื ืคืื ืฉืืืชืื ื-RAM ืืืืฆืืจ StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
ืืืืจ ืืื ื ืืชืื 1024 ื ืชืืื, ืฉืืกืชืืื ืืกืืคื ืฉื ืืืจ ืืืขืจื ื ืชืื ืื ืฉื 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
ืืืืืื ืฉืืชืื ื ื-RAM, ื ืืื ืืงืื ืืช ืื ืืืจื ืืืืืจ ืืื:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
ืืืืืื ืฉืื ืชืื ืื ืืืื ื ืืฆืืื ืืืืืจืื, ืงืจืืืช ืงืืืฆืืช ืฉื ืจืฉืืืืช ืืฅ ืืื ืคืขืืืช ืืคืก ืืขืชืงื. ืื ื ืคืืชื ืืช StreamReader, ืืงืจืื ื ืชืื ืื pyarrow.Table
, ืืืืืจ ืืื ืืืจ ืืืชื ื DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
ืื ืื, ืืืืื, ืืื, ืืื ืืืื ืืฉ ืื ืฉืืืืช. ืืื ืืืจ ืื ืงืืจื? ืืืฆื ืืืื ืื ืชื ืืฉืคืืข ืขื ืืืฆืืขื ืืืืืจ DataFrame ืฉื ืคื ืื?
ืืืฆืืขื ืกืืจืืืื ื
ืืื ืฉืืืื ื ืชื ืืกืืจืืืื ื ืคืืืช, ืืขืืืช ืฉื ืฉืืืืจ DataFrame ืขืืืื ืจืฆืืฃ ืืคื ืืืช ืขืืื ืขืงื ืืคืืกื ืืืฉื ืื ืืขืืืื ืืืืืื. ืืฉ ืื ืชืงืืจื ืืกืืืืช ืืขืืืื ืขื ืืื ื ื ืชืื ืื ืืืขืจืืื ืฉื C++ ืืืืืจื ืืืืืจืื ืฉืืื.
ืขืืืจ 1 MB, ืืื ืืขืื, ืืืืฉื ืื ืืื ืฉืื (Quad-core Xeon E3-1505M) ืืชืืจืจ:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
ืืกืชืืจ ืฉืืชืคืืงื ืืืคืงืืืืืช ืืื 7.75 GB/s ืืฉืืืืจ DataFrame ืฉื 1GB ื-1024 ื ืชืื 1MB. ืื ืงืืจื ืื ืื ื ืืฉืชืืฉืื ืื ืชืืื ืืืืืื ืืืชืจ ืื ืงืื ืื ืืืชืจ? ืืื ืืชืืฆืืืช:
ืืืืฆืืขืื ืืืจืืื ืืฉืืขืืชืืช ื-256K ื-64K ื ืชืืื. ืืืคืชืขืชื ืฉ-1 MB chunks ืขืืืื ืืืจ ืืืชืจ ืืืฉืจ 16 MB chunks. ืืืื ืืขืจืื ืืืงืจ ืืขืืืง ืืืชืจ ืืืืืื ืืื ืืืืืจ ืืืชืคืืืืช ื ืืจืืืืช ืื ืฉืื ืืฉ ืืฉืื ืืืจ ืืืฉืืง.
ืืืืฉืื ืื ืืืื ืฉื ืืคืืจืื, ืื ืชืื ืื ืืื ื ืืืืกืื ืืืืคื ืขืงืจืื ื, ืื ืฉืืืืื ืืืืืจืื ืื"ืืืืื" ืืื ืืขืจื. ืืขืชืื, ืืืืกื ืขืฉืืื ืืืคืื ืืืคืฉืจืืช ื ืืกืคืช.
ืกื ืืื
ืืืจืืช ื ืชืื ืื ืขืืืืื ืืืืื ืืืืืช ืืจื ืืขืืื ืืืืื ืืขืจืื ื ืชืื ืื ืืืืืื ืืชืื ืืื ื ืืชืื ืขืืืืื ืืื ืคื ืืืช ืื ืชืืื ืงืื ืื. ืฉืืจืืชื ื ืชืื ืื ืืืฉืชืืฉืื ืืืืกืื ืืืืื ืฉืืจื ืืืืืื ืืืขืืืจ ืืืืขืืืจ ื ืชืืื ืงืื ืื ืฉื ื ืชืื ืื ืื ืืืื ืืืชืจ ืืืืืื L2 ื-L3 ืฉื ืืืขืื ืฉืื.
ืงืื ืืื
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
ืืงืืจ: www.habr.com