ืื ืืืืขืจืืขืฆืื ื ืคืื ืืขื ืึทืจืืืงื ืืื ืืขืืืขื ืฆืืืขืืจืืื ืกืคึผืึทืกืืคืืงืื ืคึฟืึทืจ ืื ืกืืืืขื ืื ืคืื ืืขื ืงืืจืก
ืืืืขืจ ืื ืืขืฆืืข ืืืกื ืืืึธืื ืืืจ ืืึธืื
ืกืืจืืืื ื ืืืึทื ืืึทืืึท
ื ืคึผืจืึธืกื ืงืฉืื ืืื ืืึทืงืืืขื ืคืื ืขืจืึธื ืืืืขืจื ืืื ืื ืืืื ืคึผืจืืึทื ืคืื ืืืืืจืืืืื ื ืืจืืืก ืฉืืขืื ืคืื ืืึทืืืืึทืจ ืืึทืื ืคืื ืึท ืจืืืขืจื ืึธืืขืจ ืจืขืงืึธืจื-ืึธืจืืขื ืืื ืคึฟืึธืจืืึทื ืฆื ืึท ืืืึทื-ืึธืจืืขื ืืื ืคึฟืึธืจืืึทื. ืคึฟืึทืจ ืืึทืืื-ืืืืืืืื ืืึทืืึทืกืขืฅ, ืืจืึทื ืกืคึผืึธืกืื ื ืืื ืืึผืจืื ืึธืืขืจ ืืืืฃ ืืืกืง ืงืขื ืขื ืืืื ืึท ืึธืืืืขืจืืืขืืืื ื ืึทืจืืขื.
ืฆื ืกืืจืืืื ื ืืึทืื, ืฆื ืื ืืงืืจ ืืึทืื ืืขื ืขื ืจืืืขืจื ืึธืืขืจ ืืืึทื, ืืืื ืึธืคึผืฆืืข ืืื ืฆื ืฉืืงื ืงืืืื ืืึทืืฉืึทื ืคืื ืจืึธืื, ืืขืืขืจ ืืื ืึท ืงืึธืืืื ืขืจ ืืืืกืืืื ืื.
ืืื ืึทืคึผืึทืืฉื ืขืจืึธื, ืื ืืึทืืืื ื ืคืื ืืื-ืืืงืึธืจื ืืืึทื ืขืจืืื ืจืขืคึผืจืืืขื ืืื ื ืึท ืืืฉ ืคึผืืึทืืข ืืื ืืขืจืืคื ืึท ืจืขืงืึธืจื ืคึผืขืงื. ืฆื ืคืึธืจืฉืืขืื ืึท ืืืื ืืึทืื ืกืืจืืงืืืจ ืคืื ืึท ืืึทืืืฉืืงืึทื ืืืฉ, ืขืืืขืืข ืืึทืืฉืึทื ืคืื ืจืขืงืึธืจืืก ืงืขื ืขื ืืืื ืืขืืืืื.
ืืื ืื ืืืืืกืืื ื "ืจืึทื ืืึธื ืึทืงืกืขืก" ืืขืงืข ืคึฟืึธืจืืึทื, ืืืจ ืจืขืงืึธืจืืืจื ืืขืืึทืืึทืืึท ืืื ืื ืืืฉ ืกืืฉืขืืึท ืืื ืืืึธืง ืืืืกืืืื ืืื ืื ืกืืฃ ืคืื ืืขืจ ืืขืงืข, ืึทืืึทืืื ื ืืืจ ืฆื ืืึธืจ ืืืืืง ืืืืกืงืืืึทืื ืงืืื ืคึผืขืงื ืคืื ืจืขืงืึธืจืืก ืึธืืขืจ ืงืืื ืืืึทื ืคืื ืึท ืืึทืื ืฉืืขืื. ืืื ืึท ืกืืจืืืื ื ืคึฟืึธืจืืึทื, ืืืจ ืฉืืงื ืึท ืกืขืจืืข ืคืื โโืึทืจืืืงืืขื: ืึท ืึทืืืืืื, ืืื ืืขืืึธืื ืืืื ืขืจ ืึธืืขืจ ืืขืจ ืืึทืืฉืึทื ืคืื ืจืขืงืึธืจืืก.
ืื ืคืึทืจืฉืืืขื ืข ืคึฟืึธืจืืึทืืืจืื ืืขื ืงืืงื ืขืคึผืขืก ืืื ืืึธืก:
ืกืืจืืืื ื ืืึทืื ืืื PyArrow: ืึทืคึผืคึผืืืงืึทืืืึธื
ืฆื ืืืืึทืื ืืืจ ืืื ืืึธืก ืึทืจืืขื, ืืื ืืืขื ืืึทืื ืึท ืืืืฉืคึผืื ืืึทืืึทืกืขื ืจืขืคึผืจืืืขื ืืื ื ืึท ืืืื ืืืึทื ืคึผืืึทืืข:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
ืืืฆื, ืืึธืื ืก ืืึธืื ืืืจ ืืืืื ืฆื ืฉืจืืึทืื 1 ืืืืืืืื ืคืื ืืึทืื, ืงืึทื ืกืืกืืื ื ืคืื ืืฉืึทื ืืงืก ืคืื 1 ืื ืืขืืขืจ, ืคึฟืึทืจ ืึท ืืึทื ืฅ ืคืื 1024 ืืฉืึทื ืืงืก. ืฆื ืึธื ืืืืื, ืืึธืื ืืื ืื ืืึทืื ืื ืขืจืฉืืขืจ 1 ืื ืืึทืื ืจืึทื ืืื 16 ืฉืคืืืื:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
ืืขืจื ืึธื ืืื ืืขืจ ืืื ืฆื pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
ืืืฆื ืืื ืืืขื ืืึทืื ืึท ืจืขืืืืืึทื ืืืึทื ืืืึธืก ืืืขื ืฉืจืืึทืื ืฆื ืืึทืจืึทื ืืื ืฉืึทืคึฟื StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
ืืขืจื ืึธื ืืืจ ืืืขืื ืฉืจืืึทืื 1024 ืืฉืึทื ืืงืก, ืืืึธืก ืืขืกืึธืฃ ืืืขื ืืืื 1 ืื ืคืื ืืึทืื ืฉืืขืื:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
ืืื ื ืืืจ ืืขืฉืจืืื ืฆื ืืึทืจืึทื, ืืืจ ืงืขื ืขื ืืึทืงืืืขื ืื ืืื ืฆืข ืืืึทื ืืื ืืืื ืืึทืคืขืจ:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
ืืื ื ืื ืืึทืื ืืขื ืขื ืืื ืืึผืจืื, ืืืืขื ืขื ืืึทืืฉืึทื ืคืื ืขืจืึธื ืจืขืงืึธืจืืก ืืื ืึท ื ืื-ืงืึธืคึผืืข ืึธืคึผืขืจืึทืฆืืข. ืืื ืขืคึฟืขื ืขื StreamReader, ืืืืขื ืขื ืืึทืื ืืื pyarrow.Table
, ืืื ืืขืืึธืื ืืขืจ ืืื ืฆื DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
ืึทืืข ืืขื ืืื, ืคืื ืงืืจืก, ืืื, ืึธืืขืจ ืืืจ ืงืขื ืืึธืื ืฉืืืืช. ืืื ืืขืฉืืืื ื ืืื ืืึธืก ืคึผืึทืกืืจื? ืืื ืึทืคืขืงืฅ ืื ืืจืืืก ืคืื ืื ืคึผืึทื ืืึทืก ืืึทืืึทืคืจืึทืืข ืจืืืจืืืืึทื ืคืึธืจืฉืืขืืื ื?
ืกืืจืืืื ื ืคืึธืจืฉืืขืืื ื
ืืื ืื ืกืืจืืืื ื ืฉืืืง ืืจืืืก ืืืงืจืืกืึทื, ืื ืคึผืจืืึทื ืคืื ืจืืงืึทื ืกืืจืึทืงืืื ื ืึท ืงืึทื ืืืืืืึทืก ืงืึธืืืื ืขืจ ืืึทืืึทืคืจืึทืืข ืืื ืคึผืึทื ืืึทืก ืื ืงืจืืกืื ืจืขืื ืฆื ืืึทืืืึธื ืืฉ ืงืึทืฉ ืึทืงืกืขืก ืคึผืึทืืขืจื ื. ืขืก ืืื ืืืื ืขืืืขืืข ืึธืืืืขืจืืขื ืคืื ืืจืืขืื ืืื C ++ ืืึทืื ืกืืจืึทืงืืฉืขืจื ืืื ืขืจืืื ืืื ืืืืขืจ ืืึผืจืื ืืึทืคืขืจื.
ืคึฟืึทืจ 1 ืื, ืืื ืืืืื, ืืืืฃ ืืืื ืืึทืคึผืืึทืคึผ (ืงืืืึทื-ืืึทืจืฅ Xeon E3-1505M) ืขืก ืืืจื ืก ืืืืก:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
ืขืก ืืืจื ืก ืืืืก ืึทื ืื ืขืคืขืงืืืื ืืจืืคึผืื ืืื 7.75 ืืืืืืืื / s ืฆื ืืืงืขืจื ืึท 1 ืื ืืึทืืึทืคืจืึทืืข ืคึฟืื 1024 1 ืื ืืฉืึทื ืืงืก. ืืืึธืก ืืึทืคึผืึทื ื ืืืื ืืืจ ื ืืฆื ืืจืขืกืขืจืข ืึธืืขืจ ืงืืขื ืขืจืขืจ ืืฉืึทื ืืงืก? ืืึธืก ืืขื ืขื ืื ืจืขืืืืืึทืื:
ืคืึธืจืฉืืขืืื ื ืืจืืคื ืก ืืืืืืืืง ืคืื 256 ืง ืฆื 64 ืง ืืฉืึทื ืืงืก. ืืื ืืื ืืขืืืขื ืกืึทืคึผืจืืืื ืึทื 1 ืื ืืฉืึทื ืืงืก ืืขื ืขื ืคึผืจืึทืกืขืกื ืคืึทืกืืขืจ ืืื 16 ืื ืืฉืึทื ืืงืก. ืขืก ืืื ืืืื ืฆื ืืืจืืคืืจื ืึท ืืขืจ ืืจืื ืืืง ืืขืจื ืขื ืืื ืคึฟืึทืจืฉืืืื ืฆื ืืึธืก ืืื ืึท ื ืึธืจืืึทื ืคืึทืจืฉืคึผืจืืืืื ื ืึธืืขืจ ืฆื ืขืก ืืื ืขืคึผืขืก ืึทื ืืขืจืฉ ืืื ืฉืคึผืื.
ืืื ืืขื ืงืจืึทื ื ืืืคึผืืึทืืขื ืืืืฉืึทื ืคืื ืืขื ืคึฟืึธืจืืึทื, ืื ืืึทืื ืืขื ืขื ื ืืฉื ืงืึทืืคึผืจืขืกื ืืื ืคึผืจืื ืฆืืคึผ, ืึทืืื ืื ืืจืืืก ืืื ืืึผืจืื ืืื "ืืื ืื ืืืืจืขืก" ืืื ืืขืขืจืขื ืืขืจ ืืขืืืืงืขืจ. ืืื ืืขืจ ืฆืืงืื ืคึฟื, ืงืึทืืคึผืจืขืฉืึทื ืงืขื ืืืื ืึทื ื ืึธื ืึธืคึผืฆืืข.
ืืึทื ืฅ
ืกืืจืืืื ื ืงืึธืืืื ืขืจ ืืึทืื ืงืขื ืขื ืืืื ืึท ืขืคืขืงืืืื ืืืขื ืฆื ืงืึธืจืืขื ืืจืืืก ืืึทืื ืฉืืขืื ืืื ืงืึธืืืื ืขืจ ืึทื ืึทืืืืืงืก ืืืฉืืจืื ืืื ืคึผืึทื ืืึทืก ืืื ืงืืืื ืฉืืืงืขืจ. ืืึทืื ืืึทืืื ืื ืืก ืืืึธืก ื ืืฆื ืจืืืขืจื-ืึธืจืืขื ืืื ืกืืึธืจืืืืฉ ืงืขื ืขื ืึทืจืืืขืจืคืืจื ืืื ืืจืึทื ืกืคึผืึธืกืืจื ืงืืืื ืืฉืึทื ืืงืก ืคืื ืืึทืื ืืืึธืก ืืขื ืขื ืืขืจ ืืึทืงืืืขื ืคึฟืึทืจ ืืืื ืคึผืจืึทืกืขืกืขืจ L2 ืืื L3 ืงืึทืืฉืขืก.
ืืึทื ืฅ ืงืึธื
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
ืืงืืจ: www.habr.com