แกแขแแขแแแก แแแ แแแแแ แแแแแแแแ แกแแแชแแแแฃแ แแ แแฃแ แกแแก แกแขแฃแแแแขแแแแกแแแแก
แแแแ แ แแแแแแแแ แแแแ แแก แแแแแแแแแแแจแ แฉแแแ
แกแแแขแแก แแแแแชแแแแแแก แกแขแ แแแแแแ
Arrow-แแก แแแแฎแแแ แแแแแแแแแ แแแแ แชแแแแแฃแแ แแแแฎแแ แแ แแก แแแแ แขแแแฃแแฃแ แ แแแแแชแแแแ แแแแ แแแแก แแแแ แแชแแแก แแแฆแแแ แฆแแ แแแฃแแแแ แแฌแแ แแแแ แแ แฉแแแแฌแแ แแ แแ แแแแขแแ แแแฃแแ แคแแ แแแขแแแแ แกแแแขแแก แคแแ แแแขแจแ. แแ แแแแ แแแแแแแแขแแแแ แแแแแชแแแแ แแแแ แแแแกแแแแก, แแแฎแกแแแ แแแแจแ แแ แแแกแแแ แแแแแขแแแ แจแแแซแแแแ แแแแแญแแ แแแแฃแแ แแงแแก.
แกแขแ แแแแแแแก แแแแแชแแแแแแกแแแแก, แแฅแแแแ แแก แฌแงแแ แแก แแแแแชแแแแแ แแฌแแ แแแ แแฃ แกแแแขแ, แแ แแ แแแ แแแแขแแ แกแขแ แแฅแแแแแแก แแชแแ แ แแแ แขแแแแแก แแแแแแแแ, แแแแแแฃแแ แจแแแชแแแก แกแแแขแแก แแแแแแแแแแก แจแแแแแ.
Apache Arrow-แจแ แแแฎแกแแแ แแแแก แกแแแขแฃแ แ แแแกแแแแแแก แแ แแแฃแแก, แ แแแแแแช แฌแแ แแแแแแแแก แชแฎแ แแแแก แแแฌแแแก, แแฌแแแแแ แฉแแแแฌแแ แแก แฏแแฃแคแก. แแแแแแฃแ แ แชแฎแ แแแแก แแ แแแแแ แแแแแชแแแแ แกแขแ แฃแฅแขแฃแ แแก แฌแแ แแแกแแแแแแแ, แจแแแแซแแแแ แจแแแแ แแแแ แฉแแแแฌแแ แแแแก แ แแแแแแแแ แแแแแขแ.
แแ แกแแแฃแแ "แจแแแแฎแแแแแแ แฌแแแแแแก" แคแแแแแก แคแแ แแแขแจแ, แฉแแแ แแฌแแ แ แแแขแแแแแแชแแแแแก, แ แแแแแแแช แจแแแชแแแก แชแฎแ แแแแก แกแฅแแแแก แแ แแแแแแก แแแแแแแแแแก แคแแแแแก แแแแแก, แ แแช แกแแจแฃแแแแแแก แแแซแแแแ แแแ แฉแแแ แฉแแแแฌแแ แแแแก แแแแแกแแแแ แ แฏแแฃแคแ แแ แแแแแกแแแแ แ แกแแแขแ แแแแแชแแแแ แแแแ แแแแแแ แซแแแแแ แแแคแแ. แกแขแ แแแแแแแก แคแแ แแแขแจแ, แฉแแแ แแแแแแแแแ แจแแขแงแแแแแแแแแแก แกแแ แแแก: แกแฅแแแแก แแ แจแแแแแ แฉแแแแฌแแ แแแแก แแ แ แแ แแแข แแแ แขแแแก.
แกแฎแแแแแกแฎแแ แคแแ แแแขแ แฐแแแแก แแ แกแฃแ แแแก:
แแแแแชแแแแแแก แกแขแ แแแแแแ PyArrow-แจแ: แแแแแแแชแแ
แแแแก แกแแฉแแแแแแแแ, แแฃ แ แแแแ แแฃแจแแแแก แแก, แแ แจแแแฅแแแ แแแแแชแแแแ แแแแแแแแก, แ แแแแแแช แฌแแ แแแแแแแแก แแ แแ แแแแแแแก แแแฌแแแก:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
แแฎแแ, แแแแฃแจแแแ, แ แแ แแแแแแ แแแแฌแแ แแ 1 แแ แแแแแชแแแแแ, แ แแแแแแช แจแแแแแแ 1 แแ แชแแแ แแแแ, แกแฃแ 1024 แชแแแ. แแแ แแแแ, แแแแแ แจแแแฅแแแแ แแแ แแแแ 1MB แแแแแชแแแแ แฉแแ แฉแ 16 แกแแแขแแ:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
แจแแแแแ แแ แแแแแแแงแแแ แแแ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
แแฎแแ แแ แจแแแฅแแแ แแแแแแแแแ แแแแแแก, แ แแแแแแช แฉแแฌแแ แก RAM-แจแ แแ แจแแฅแแแแก StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
แจแแแแแ แฉแแแ แแแแฌแแ แ 1024 แแแฌแแแก, แ แแแแแแช แกแแแแแแแ แจแแแแแแแก 1 GB แแแแแชแแแแ แแแแแก:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
แแแแแแแแ แฉแแแ แแแแฌแแ แแ RAM-แจแ, แจแแแแแซแแแ แแแแแฆแแ แแแแแ แแแแแแ แแ แ แแฃแคแแ แจแ:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
แแแแก แแแแ, แ แแ แแก แแแแแชแแแแแ แแแฎแกแแแ แแแแจแแ, แแกแ แแก แฉแแแแฌแแ แแแแก แแแ แขแแแแแก แฌแแแแแฎแแ แแแแฆแแแ แแฃแแแแแแ แแกแแแก แแแแ แแชแแแ. แแฎแกแแ StreamReader-แก, แแแแแฎแฃแแแ แแแแแชแแแแแก pyarrow.Table
แแ แจแแแแแ แแแแแแงแแแแแ แแกแแแ DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
แแก แงแแแแแคแแ แ, แ แ แแฅแแ แฃแแแ, แแแ แแแ, แแแแ แแ แจแแแซแแแแ แแแแแฉแแแแ แแแแฎแแแแ. แ แแแแแแแ แกแฌแ แแคแแ แฎแแแแ แแก? แ แแแแ แแแฅแแแแแแก แแแญแ แแก แแแแ แแแแแแแแก DataFrame-แแก แแแซแแแแแแ?
แกแขแ แแแแแแแก แจแแกแ แฃแแแแ
แ แแแแ แช แแแแแแแก แแแฌแแแแก แแแแ แแชแแ แแแแ, แแแแแแแแจแ แแแแแแแแ แ แกแแแขแแแแแ DataFrame-แแก แ แแแแแกแขแ แฃแฅแชแแแก แฆแแ แแแฃแแแแ แแแ แแแแ แฅแแจแแก แฌแแแแแแก แแ แแแคแแฅแขแฃแ แ แกแฅแแแแแแก แแแแ. แแกแแแ แแ แแก แแแ แแแแฃแแ แฎแแ แฏแแแ C++ แแแแแชแแแแ แกแขแ แฃแฅแขแฃแ แแแแแ แแ แแแกแแแแแแแ แแ แแแ แแแฎแกแแแ แแแแก แแฃแคแแ แแแแแ แแฃแจแแแแแกแแแ.
1 แแ-แแกแแแแก, แ แแแแ แช แแแแแ, แฉแแแก แแแแขแแแแ (Quad-core Xeon E3-1505M) แแแแแแแก:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
แแแแแแแก, แ แแ แแคแแฅแขแฃแ แ แแแแขแแ แฃแแแ แแแแแแ แแ แแก 7.75 แแ/แฌแ 1 GB DataFrame-แแก แแฆแกแแแแแแแ 1024 1 MB แแแแแแแแ. แ แ แแแฎแแแแ, แแฃ แแแงแแแแแ แฃแคแ แ แแแ แแ แแแขแแ แ แแแฌแแแแแก? แแฅ แแ แแก แจแแแแแแแ, แ แแแแแแกแแช แแแแฆแแแ:
แจแแกแ แฃแแแแ แแแแจแแแแแแแแแ แแแแแแก 256K-แแแ 64K แแแฌแแแแแแ. แแแแแแแแ แแ, แ แแ 1 แแ แแแฌแแแแแแแ แฃแคแ แ แกแฌแ แแคแแ แแฃแจแแแแแแแแ, แแแแ แ 16 แแ. แฆแแ แก แฃแคแ แ แกแแคแฃแซแแแแแแ แจแแกแฌแแแแ แแ แแแแก แแแแแแ, แแ แแก แแฃ แแ แ แแก แแแ แแแแฃแ แ แแแแแฌแแแแแ แแฃ แกแฎแแ แ แแ แแ แแก แฉแแ แแฃแแ.
แคแแ แแแขแแก แแแแแแแแ แ แแแแฎแแ แชแแแแแแแกแแก, แแแแแชแแแแแ แแ แแแชแแแจแ แแ แแ แแก แจแแแฃแแจแฃแแ, แแแแขแแ แแแแ แแแฎแกแแแ แแแแจแ แแ "แแแแแฃแแแ" แแแแฎแแแแแแ แแแแแแ. แจแแแฃแแจแแ แจแแแซแแแแ แแแแแแแแจแ แแแฎแแแก แแแ แแแแขแ.
แกแฃแ
แกแแแขแแก แแแแแชแแแแแแก แกแขแ แแแแแแ แจแแแซแแแแ แแงแแก แแคแแฅแขแฃแ แ แแแ แแแแ แแแแแชแแแแ แแแแ แแแแก แแแแแกแแขแแแแ แกแแแขแแแแก แแแแแแขแแแแก แแแกแขแ แฃแแแแขแแแแ, แ แแแแ แแชแแ แแแแแแแ แแชแแ แ แแแฌแแแแแจแ. แแแแแชแแแแ แกแแ แแแกแแแก, แ แแแแแแแช แแงแแแแแแ แแฌแแ แแแแ แแ แแแแขแแ แแแฃแ แแแฎแกแแแ แแแแก, แจแแฃแซแแแแ แแแแแแขแแแแ แแ แแแแแแขแแแแ แแแแแชแแแแ แแชแแ แ แแแฌแแแ, แ แแช แฃแคแ แ แแแกแแฎแแ แฎแแแแแแ แแฅแแแแ แแ แแชแแกแแ แแก L2 แแ L3 แฅแแจแแกแแแแก.
แกแ แฃแแ แแแแ
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
แฌแงแแ แ: www.habr.com