เบเบฒเบเปเบเบเบฒเบชเบฒเบเบญเบเบเบปเบเบเบงเบฒเบกเปเบเปเบเบทเบเบเบฐเบเบฝเบกเปเบเบเบชเบฐเปเบเบฒเบฐเบชเปเบฒเบฅเบฑเบเบเบฑเบเบชเบถเบเบชเบฒเบเบญเบเบซเบผเบฑเบเบชเบนเบ
เปเบเปเบฅเบเบฐเบชเบญเบเบชเบฒเบกเบญเบฒเบเบดเบเบเปเบฒเบเบกเบฒ, เบเบงเบเปเบฎเบปเบฒ
เบเบฒเบเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบเบเบฑเบ
เบเปเบฒเบเบฒเบกเบเบปเปเบงเปเบเบเบตเปเบเปเบญเบเปเบเปเบฎเบฑเบเบเบฒเบเบเบนเปเปเบเป Arrow เปเบกเปเบเบเปเบฒเปเบเปเบเปเบฒเบเบชเบนเบเปเบเบเบฒเบเปเบเบทเปเบญเบเบเปเบฒเบเบเบธเบเบเปเปเบกเบนเบเบเบฒเบเบฐเบฅเบฒเบเบเบฐเบซเบเบฒเบเปเบซเบเปเบเบฒเบเปเบเบงเบซเบผเบทเบฎเบนเบเปเบเบเบเบฑเบเบเบถเบเปเบเบฑเบเบฎเบนเบเปเบเบเบเปเบฅเปเบฒ. เบชเบณเบฅเบฑเบเบเบธเบเบเปเปเบกเบนเบเบซเบผเบฒเบเบเบดเบเบฒเปเบ, เบเบฒเบเบเปเบฒเบเบเบญเบเปเบเปเปเบงเบเบเบงเบฒเบกเบเบณ เบซเบผเบทเปเบเบเบดเบชเบเปเบญเบฒเบเปเบเบฑเบเปเบฅเบทเปเบญเบเบเบตเปเปเบฑเบเปเปเบงเบ.
เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบ, เบเปเปเบงเปเบฒเบเบฐเปเบเบฑเบเบเปเปเบกเบนเบเปเบซเบผเปเบเปเบกเปเบเปเบเบงเบซเบผเบทเบเบฑเบ, เบเบฒเบเปเบฅเบทเบญเบเบซเบเบถเปเบเปเบกเปเบเปเบเบทเปเบญเบชเบปเปเบ batch เบเบฐเบซเบเบฒเบเบเปเบญเบเบเบญเบเปเบเบง, เปเบเปเบฅเบฐเบเบฐเบเบญเบเบเปเบงเบเปเบเบเบฎเปเบฒเบเบเบฑเบเบเบฒเบเปเบ.
เปเบ Apache Arrow, เบเปเปเบฅเบฑเบเบเบฑเบเบเบญเบเบเบฑเบเบเบฑเบเปเบเปเปเบงเบเบเบงเบฒเบกเบเบณเบเบตเปเปเบเบฑเบเบเบปเบงเปเบเบเบเบญเบเบเบฒเบเบฐเบฅเบฒเบเปเบกเปเบเปเบญเบตเปเบเบงเปเบฒเบเบธเบเบเบฑเบเบเบถเบ. เปเบเบทเปเบญเปเบเบฑเบเบเบปเบงเปเบเบเบเบญเบเปเบเบเบชเปเบฒเบเบเปเปเบกเบนเบเบเบฝเบงเบเบญเบเบเบฒเบเบฐเบฅเบฒเบเบขเปเบฒเบเบกเบตเปเบซเบเบเบปเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเปเบเบฑเบเบเปเบฒเบซเบผเบฒเบเบเบธเบเบเบญเบเบเบฑเบเบเบถเบ.
เปเบเบฎเบนเบเปเบเบเปเบเบฅเป "เบเบฒเบเปเบเบปเปเบฒเปเบเบดเบเปเบเบเบชเบธเปเบก" เบเบตเปเบกเบตเบขเบนเปเปเบฅเปเบง, เบเบงเบเปเบฎเบปเบฒเบเบฝเบ metadata เบเบตเปเบกเบตเบเบฒเบเบฐเบฅเบฒเบเบเบฒเบเบฐเบฅเบฒเบเปเบฅเบฐเบฎเบนเบเปเบเบเบเบฑเบเบขเบนเปเปเบเบเบญเบเบเปเบฒเบเบเบญเบเปเบเบฅเป, เปเบเบดเปเบเบเปเบงเบเปเบซเปเบเปเบฒเบเบชเบฒเบกเบฒเบเปเบฅเบทเบญเบเปเบญเบปเบฒเบเบธเบเบเบฑเบเบเบถเบเบซเบผเบทเบเปเบฅเปเบฒเปเบเปเบเบฒเบเบเบธเบเบเปเปเบกเบนเบเบเบตเปเบกเบตเบฅเบฒเบเบฒเบเบทเบเบซเบผเบฒเบ. เปเบโเบฎเบนเบโเปเบเบโเบเบฒเบโเบชเบฐโเบโเบฃเบตเบกโ, เบเบงเบโเปเบฎเบปเบฒโเบชเบปเปเบโเบเบธเบโเบเบญเบโเบเปเปโเบเบงเบฒเบกโ: schemaโ, เปเบฅเบฐโเบซเบผเบฑเบโเบเบฒเบโเบเบฑเปเบโเบซเบเบถเปเบโเบซเบผเบทโเบซเบผเบฒเบโเบเบธเบโเบเบญเบโเบเบฒเบโเบเบฑเบโเบเบถเบโ.
เบฎเบนเบเปเบเบเบเบตเปเปเบเบเบเปเบฒเบเบเบฑเบเปเบเบดเปเบเบเบทเบฎเบนเบเบเบตเป:
เบเบฒเบเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบเปเบ PyArrow: เปเบญเบฑเบเบเบฅเบดเปเบเบเบฑเบ
เปเบเบทเปเบญเบชเบฐเปเบเบเปเบซเปเบเปเบฒเบเบฎเบนเปเบงเปเบฒเบกเบฑเบเปเบฎเบฑเบเบงเบฝเบเปเบเบงเปเบ, เบเปเบญเบเบเบฐเบชเปเบฒเบเบเบธเบเบเปเปเบกเบนเบเบเบปเบงเบขเปเบฒเบเบเบตเปเปเบเบฑเบเบเบปเบงเปเบเบเบเบญเบเบชเบฒเบเบเปเปเบฒเบเบฝเบง:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
เปเบเบเบฑเบเบเบธเบเบฑเบ, เบชเบปเบกเบกเบธเบเบงเปเบฒเบเบงเบเปเบฎเบปเบฒเบเปเบญเบเบเบฒเบเบเบฝเบเบเปเปเบกเบนเบ 1 GB, เบเบฐเบเบญเบเบเปเบงเบ 1 MB เปเบเปเบฅเบฐ chunks, เบชเปเบฒเบฅเบฑเบเบเบฑเบเบซเบกเบปเบ 1024 chunks. เบเปเบญเบเบญเบทเปเบ, เปเบซเปเบชเปเบฒเบเบเบญเบเบเปเปเบกเบนเบ 1MB เบเปเบฒเบญเบดเบเบเบตเปเบกเบต 16 เบเบฑเบ:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบ, เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเบฝเบเปเบซเปเปเบเบปเบฒเปเบเบปเปเบฒ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
เบเบญเบเบเบตเปเบเปเบญเบเบเบฐเบชเปเบฒเบเบเบฐเปเบชเบเบปเบเบเบฐเบฅเบดเบเบเบตเปเบเบฐเบเบฝเบเปเบชเป RAM เปเบฅเบฐเบชเปเบฒเบ StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบ, เบเบงเบเปเบฎเบปเบฒเบเบฐเบเบฝเบ 1024 chunks, เปเบเบดเปเบเปเบเบเบตเปเบชเบธเบเบเบฐเบเบฐเบเบญเบเปเบเบฑเบเบเบธเบเบเปเปเบกเบนเบ 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
เบเบฑเบเบเบฑเปเบเปเบเปเบเบงเบเปเบฎเบปเบฒเบเบฝเบเปเบ RAM, เบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเปเบเปเบฎเบฑเบเบเปเปเบฒเบเบฑเบเบซเบกเบปเบเปเบ buffer เบเบฝเบง:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
เปเบเบทเปเบญเบเบเบฒเบเบงเปเบฒเบเปเปเบกเบนเบเบเบตเปเบขเบนเปเปเบเบซเบเปเบงเบเบเบงเบฒเบกเบเปเบฒ, เบเบฒเบเบญเปเบฒเบ batches เบเบญเบ Arrow records เปเบกเปเบเปเบเปเบฎเบฑเบเปเบเบเบเบฒเบเบเปเบฒเปเบเบตเบเบเบฒเบเบชเบนเบเบชเปเบฒเปเบเบปเบฒ. เบเปเบญเบเปเบเบตเบ StreamReader, เบญเปเบฒเบเบเปเปเบกเบนเบเปเบเบปเปเบฒเปเบเปเบ pyarrow.Table
เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเบฝเบเปเบซเปเปเบเบปเบฒเปเบเบปเปเบฒเปเบเบฑเบ DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
เบเบฑเบเบซเบกเบปเบเบเบตเป, เปเบเปเบเบญเบ, เปเบกเปเบเบเบต, เปเบเปเบเปเบฒเบเบญเบฒเบเบเบฐเบกเบตเบเปเบฒเบเบฒเบก. เบกเบฑเบเปเบเบตเบเบเบถเปเบเปเบงเปเบเบปเปเบฒเปเบ? เบเบฐเบซเบเบฒเบเบเบญเบ chunk เบกเบตเบเบปเบเบเบฐเบเบปเบเบเปเปเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบฒเบเบเบถเบเบเปเปเบกเบนเบ pandas DataFrame เปเบเบงเปเบ?
เบเบฐเบชเบดเบเบเบดเบเบฒเบเบเบฒเบเบเปเบฒเบเบเบญเบ
เปเบกเบทเปเบญเบเบฐเปเบฒเบเบเบญเบเบเปเบญเบเบชเบฐเบเบฃเบตเบกเบกเบดเบเบซเบผเบธเบเบฅเบปเบ, เบเปเบฒเปเบเปเบเปเบฒเบเบเบญเบเบเบฒเบเบชเปเบฒเบเบเปเบฅเบณ DataFrame เบเบดเบเบเปเปเบเบฑเบเปเบ pandas เปเบเบตเปเบกเบเบถเปเบเปเบเบทเปเบญเบเบเบฒเบเบฅเบฐเบเบปเบเบเบฒเบเปเบเบปเปเบฒเปเบเบดเบเปเบเบเบเปเปเบกเบตเบเบฐเบชเบดเบเบเบดเบเบฒเบ. เบเบญเบเบเบฒเบเบเบตเปเบเบฑเบเบกเบตเบเบฒเบเบชเปเบงเบเปเบเบตเบเบเบฒเบเบเบฒเบเปเบฎเบฑเบเบงเบฝเบเบเบฑเบเปเบเบเบชเปเบฒเบเบเปเปเบกเบนเบ C ++ เปเบฅเบฐ arrays เปเบฅเบฐ buffers เบเบงเบฒเบกเบเบปเบเบเปเบฒเบเบญเบเบเบงเบเปเบเบปเบฒ.
เบชเปเบฒเบฅเบฑเบ 1 MB เบเบฑเปเบเบเปเบฒเบเปเบเบดเบ, เปเบเบเบญเบกเบเบดเบงเปเบเบตเปเบเบญเบเบเปเบญเบ (Quad-core Xeon E3-1505M) เบกเบฑเบเบเบฒเบเบปเบเบงเปเบฒ:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
เบกเบฑเบ turns เปเบซเปเปเบซเบฑเบเบงเปเบฒ throughput เบเบฐเบชเบดเบเบเบดเบเบฒเบเปเบกเปเบ 7.75 Gb / s เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบทเปเบเบเบน 1 GB DataFrame เบเบฒเบ 1024 1 MB chunks. เบเบฐเปเบเบตเบเบซเบเบฑเบเบเบถเปเบเบเปเบฒเบเบงเบเปเบฎเบปเบฒเปเบเปเบเปเบญเบเปเบซเบเป เบซเบผเบทเบเปเบญเบเบเบงเปเบฒ? เบเบตเปเปเบกเปเบเบเบปเบเปเบเปเบฎเบฑเบเบเบตเปเบเปเบฒเบเปเบเปเบฎเบฑเบ:
เบเบฐเบชเบดเบเบเบดเบเบฒเบเบซเบผเบธเบเบฅเบปเบเบขเปเบฒเบเบซเบผเบงเบเบซเบผเบฒเบเบเบฒเบ 256K เบซเบฒ 64K chunks. เบเปเบฒโเบเบฐโเปเบเบปเปเบฒโเบเบปเบโเบเบฐโเบฅเบถเบโเบเบตเป chunks 1MB เบเบทเบโเบเบธเบโเปเบเปเบโเปเบงโเบโเปโเบงเบฒ 16MB chunksโ. เบกเบฑเบเบเบธเปเบกเบเปเบฒเบเบตเปเบเบฐเปเบฎเบฑเบเบเบฒเบเบชเบถเบเบชเบฒเบขเปเบฒเบเบฅเบฐเบญเบฝเบเบเบงเปเบฒเปเบฅเบฐเปเบเบปเปเบฒเปเบเบงเปเบฒเบเบตเปเปเบกเปเบเบเบฒเบเปเบเบเบขเบฒเบเบเบปเบเบเบฐเบเบดเบซเบผเบทเบเบฒเบเบชเบดเปเบเบเบฒเบเบขเปเบฒเบเบเบตเปเบเปเบฝเบงเบเปเบญเบ.
เปเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเปเบเบเบฐเบเบธเบเบฑเบเบเบญเบเบฎเบนเบเปเบเบ, เบเปเปเบกเบนเบเบเปเปเปเบเปเบเบทเบเบเบตเบเบญเบฑเบเปเบเบซเบผเบฑเบเบเบฒเบ, เบเบฑเปเบเบเบฑเปเบเบเบฐเบซเบเบฒเบเปเบเบซเบเปเบงเบเบเบงเบฒเบกเบเปเบฒเปเบฅเบฐ "เบชเบฒเบ" เปเบกเปเบเบเบฐเบกเบฒเบเบเบฝเบงเบเบฑเบ. เบเบฒเบเบเบตเบเบญเบฑเบเบญเบฒเบเบเบฐเบเบฒเบเปเบเบฑเบเบเบฒเบเปเบฅเบทเบญเบเปเบเบญเบฐเบเบฒเบเบปเบ.
เบเบปเบเปเบเปเบฎเบฑเบ
เบเบฒเบเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบเบเบฑเบเบชเบฒเบกเบฒเบเปเบเบฑเบเบงเบดเบเบตเบเบตเปเบกเบตเบเบฐเบชเบดเบเบเบดเบเบฒเบเปเบเบเบฒเบเปเบญเบเบเบธเบเบเปเปเบกเบนเบเบเบฐเปเบฒเบเปเบซเบเปเปเบเบซเบฒเปเบเบทเปเบญเบเบกเบทเบเบฒเบเบงเบดเปเบเบฒเบฐเบเบฑเบเปเบเบฑเปเบ: pandas เปเบเบเปเบญเบเบเปเบญเบเป. เบเบฒเบเบเปเบฅเบดเบเบฒเบเบเปเปเบกเบนเบเบเบตเปเปเบเปเบเปเบญเบเปเบเบฑเบเบเปเปเบกเบนเบเปเบเบเปเบเบงเบชเบฒเบกเบฒเบเปเบญเบ เปเบฅเบฐเบเปเบฒเบเบเบญเบเบเปเปเบกเบนเบเบเปเบญเบเปเบเบตเปเบชเบฐเบเบงเบเบเบงเปเบฒเบชเบณเบฅเบฑเบเปเบเบ L2 เปเบฅเบฐ L3 เบเบญเบเปเบเปเบเบเปเบเบตเบเบญเบเบเปเบฒเบ.
เบฅเบฐเบซเบฑเบเปเบเบฑเบก
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
เปเบซเบผเปเบเบเปเปเบกเบนเบ: www.habr.com