ααΆαααααααα’αααααααααΌαααΆααααα
αααΆαα·ααααααααΆαααα·ααααααααααα·ααααΆ
ααα»ααααΆααααααΆα ααααααααααα ααΎαααΆα
ααααααΈααα·αααααααα½ααα
αααα½αααΌαα ααααααα»αααα½αααΆαααΈα’αααααααΎααααΆαα Arrow ααΊααΆαα αααΆαααααααααα»αααΆαααααααα·ααααααααΆααΆαααΆα αααΎαααΈαα½αααα α¬αααααααααααααα·ααααααααααΆαα ααΆαααααααααααααα½αα αααααΆαααααα»ααα·ααααααα αααΎαααΈα αααΆαα ααΆααααααΌααα αααα»αα’αααα αα αΆα α¬αα ααΎααΆαα’αΆα ααΆαα·α αα ααΆαααααΎααααα
ααΎααααΈααααααΈααα·αααααα αα·αααΆαα·αααααααααααααΆαα½α α¬αα½αααααααα αααααΎααα½αααΊααααΌαααααΎαα½αααΌα αααΆαααα»αα αααααΈαα½ααααΆαααααααα½ααααα ααΆααααα»αα
αα αααα»α Apache Arrow ααΆααααααΌαα’αΆαααα½ααααααα»αααα·αααααααΆαα±ααααααΆααααΆααΆαααααΌαααΆαααα α ααΆ ααΆα ααααααααααΆα ααΎααααΈααααΆαα±αααα ααΆαααααααααα·αααααααααα½αααααΆααΆαα‘αΌααΈαα ααααα»ααααααααααααΆααΆα αααΎαα’αΆα ααααΌαααΆααααααΌαα
αα αααα»αααααααα―αααΆα "ααΆαα αΌαααααΎαααα ααααα" αααααΆαααααΆαα ααΎααααααααΆαα·ααααααααααΆαααααΆααααααααΆαααααΆααΆα αα·αααΈααΆαααααα»ααα α α»ααααα ααααα―αααΆα αααα’αα»ααααΆαα±ααα’αααααααΎαααΎαααααααααααααααΆααΆαα½α α¬αα½αααααΆαα½αααΈαααα»ααα·αααααααααα»ααααααααααααα»αα αααα»αααααααααΆαααααΆα ααΎαααααΎααΆαααΆαααααΈα ααααααα½α α αΎααααααΆαααααααααααααΆαα½α α¬α αααΎαα
ααααααααααααααααΆααΎααα ααΌα αααα
ααααααΈααα·αααααααα αααα»α PyArrow: αααααα·ααΈ
ααΎααααΈαααα αΆαα’αααααΈαααααααααΆααααΎαααΆα αααα»αααΉααααααΎααααα»ααα·ααααααα§ααΆα ααααααααααΆαα±ααααααΆααααααααΈααααα½αα
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
α₯α‘αΌαααα α§αααΆααΆααΎαα ααααααααα·αααααα 1 GB αααααΆαααααααα 1 MB ααΈαα½αααααααΆααααα»αα ααα½α 1024 ααααΆααα ααΎααααΈα αΆααααααΎα α αΌαααΎααααααΎαααα»ααα·αααααα 1 MB ααααΌααααααΆα 16 αα½αααα
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
αααααΆαααααααα»αααααααααα½αααΆαα
ααΆ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
α₯α‘αΌαααααααα»αααΉααααααΎαααααααΈααα·ααααααααααΉαααααααα
RAM αα·ααααααΎα StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
αααααΆααααααΎαααΉαααααα 1024 ααααΆαα ααααα ααΈαααα»αααΉααααααΎααααα»ααα·αααααα 1GBα
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
α αΆααααΆααααΈααΎαααααααα RAM ααΎαα’αΆα ααα½αααΆαα ααααααΆααααΌααα αααα»αααα·ααααααα’αΆαααααααα½αα
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
αααααΆααα·ααααααααααααα·ααα
αααα»αα’αααα
αα
αΆα ααΆαα’αΆαααααα»ααααααααααααΆαααα½αααΊααΆααααα·ααααα·ααΆαα
ααααααΌαααα αααα»αααΎα StreamReader α’αΆααα·ααααααα
αΌα pyarrow.Table
α αΎααααααΆααααααααααααα½αααΆαα
ααΆ DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
ααΆααα’ααααααα·αααΆααααΊααα’ ααα»ααααα’αααα’αΆα ααΆααααα½αα ααΎααΏααααααΎαα‘αΎαααΏαααα»ααααΆ? ααΎααα αααααΆαααααααΆαααααααααΎαααΆαααΆααα DataFrame αααααααααΆαααΆαααΌα ααααα ?
ααΆαααααααααααααΈα
αα ααααααααα αααααΆααααααααΈαααΆαααΆαααα α»α ααΆαα αααΆαααααΆααααααΎα DataFrame αα½αααααΆααααααΆαα αααα»ααααααΆααΎαα‘αΎα αααααΆαααααΆαα αΌαααααΎααααΆαααααααΆαααα·αααΆαααααα·αααααΆαα ααΆααααΆαααΆαα αααΆαααΎααα½αα ααα½αααΈααΆαααααΎααΆαααΆαα½ααα ααΆαααααααααα·αααααα C ++ αα·αα’αΆαα αα·αα’αααα αα αΆαααααααα’αΆαααααααααα½αααα
αααααΆαα 1 MB ααΌα ααΆαααΎαα ααΎαα»αααααΌααααα½ααααααααααα»α (Quad-core Xeon E3-1505M) ααΆααααααΆ:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
ααΆααααααΆααα αΌααααααΆαααααα·αααααΆαααΊ 7.75 GB/s ααΎααααΈααααΆα DataFrame 1GB ααΈ 1024 1MB ααααΆααα ααΎααΆαα’αααΈααΎαα‘αΎαααααα·αααΎααΎαααααΎααααΆααααααΆαα¬ααΌα ααΆα? ααΆαααααααΊααΆααααααα
ααΆαααααααααααΆααα
α»ααααΆαααααΆααααΈ 256K αα
64K ααααΆααα αααα»αααααΆααααα’αΎααααααααΆαα 1 MB ααααΌαααΆαααααΎαααΆαααΏαααΆαααααΆαα 16 MB α ααΆααΆααααααααααΎααΆααα·ααααΆ αα·αααααααααα±ααααΆαα ααααα
αααααααααααααΆααΎαααααΊααΆααΆαα
ααα
αΆαααααααΆα¬ααΆααΎααΆαα’αααΈαααααααααα
αααα»αααΆααααα
αα αααα»αααΆαα’αα»αααααα αα α»αααααααααααααααααΆα αα·αααααααα·αααααΌαααΆααααα αΆααααΆαααααΆααααα ααΌα ααααααα ααα αααα»αα’αααα αα αΆα αα·α "αα αααα»ααααα" ααΊαααα ααααΌα ααααΆα αα αααα’ααΆαα ααΆααααα αΆααα’αΆα ααααΆαααΆαααααΎαααααααα
αααααα
ααΆαααααααΈααα·αααααααα½αααα’αΆα ααΆαααααααΆαααααΆαααααα·αααααΆααα½αααΎααααΈαααα αΌααααα»ααα·αααααααααα αααα»αα§ααααααα·ααΆααα½αααααΌα ααΆααααΆαααα»ααααααΆαααα»ααααααααΌα αα ααααΆαααααα·αααααααααααααΎααΆααααα»ααααααααα½αα’αΆα ααααα αα·ααααααΌααα·ααααααααΌα ααααααΆααααα½αααΆααααααΆααααααΆαααααααΆαα L2 αα·α L3 αααααα½αααααΆαα’αααα
αααααΌαααα
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
ααααα: www.habr.com