ΠΠ΅ΡΠ΅Π²ΠΎΠ΄ ΡΡΠ°ΡΡΠΈ ΠΏΠΎΠ΄Π³ΠΎΡΠΎΠ²Π»Π΅Π½ ΡΠΏΠ΅ΡΠΈΠ°Π»ΡΠ½ΠΎ Π΄Π»Ρ ΡΡΡΠ΄Π΅Π½ΡΠΎΠ² ΠΊΡΡΡΠ°
ΠΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠ΅ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ Π½Π΅Π΄Π΅Π»Ρ ΠΌΡ Ρ
ΠΠΎΡΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠ° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΡ Π΄Π°Π½Π½ΡΡ
Π Π°ΡΠΏΡΠΎΡΡΡΠ°Π½Π΅Π½Π½ΡΠΉ Π²ΠΎΠΏΡΠΎΡ, ΠΊΠΎΡΠΎΡΡΠΉ Ρ ΠΏΠΎΠ»ΡΡΠ°Ρ ΠΎΡ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Π΅ΠΉ Arrow, β ΡΡΠΎ Π²ΠΎΠΏΡΠΎΡ ΠΎ Π²ΡΡΠΎΠΊΠΎΠΉ ΡΡΠΎΠΈΠΌΠΎΡΡΠΈ ΠΏΠ΅ΡΠ΅Π½ΠΎΡΠ° Π±ΠΎΠ»ΡΡΠΈΡ Π½Π°Π±ΠΎΡΠΎΠ² ΡΠ°Π±Π»ΠΈΡΠ½ΡΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΡΠΎΡΠΌΠ°ΡΠ°, ΠΎΡΠΈΠ΅Π½ΡΠΈΡΠΎΠ²Π°Π½Π½ΠΎΠ³ΠΎ Π½Π° ΡΡΡΠΎΠΊΠΈ ΠΈΠ»ΠΈ Π·Π°ΠΏΠΈΡΠΈ Π² ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΠΉ ΡΠΎΡΠΌΠ°Ρ. ΠΠ»Ρ ΠΌΠ½ΠΎΠ³ΠΎΠ³ΠΈΠ³Π°Π±Π°ΠΉΡΠ½ΡΡ Π΄Π°ΡΠ°ΡΠ΅ΡΠΎΠ² ΡΡΠ°Π½ΡΠΏΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π² ΠΏΠ°ΠΌΡΡΠΈ ΠΈΠ»ΠΈ Π½Π° Π΄ΠΈΡΠΊΠ΅ ΠΌΠΎΠΆΠ΅Ρ ΠΎΠΊΠ°Π·Π°ΡΡΡΡ Π½Π΅ΠΏΠΎΡΠΈΠ»ΡΠ½ΠΎΠΉ Π·Π°Π΄Π°ΡΠ΅ΠΉ.
ΠΠ»Ρ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠΈ Π΄Π°Π½Π½ΡΡ , Π½Π΅Π·Π°Π²ΠΈΡΠΈΠΌΠΎ ΠΎΡ ΡΠΎΠ³ΠΎ, ΡΠ²Π»ΡΡΡΡΡ Π»ΠΈ ΠΈΡΡ ΠΎΠ΄Π½ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ ΡΡΡΠΎΠΊΠΎΠ²ΡΠΌΠΈ ΠΈΠ»ΠΈ ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΠΌΠΈ, ΠΎΠ΄Π½ΠΈΠΌ ΠΈΠ· Π²Π°ΡΠΈΠ°Π½ΡΠΎΠ² ΠΎΡΡΠ°Π΅ΡΡΡ ΠΎΡΠΏΡΠ°Π²ΠΊΠ° Π½Π΅Π±ΠΎΠ»ΡΡΠΈΡ ΠΏΠ°ΠΊΠ΅ΡΠΎΠ² ΡΡΡΠΎΠΊ, ΠΊΠ°ΠΆΠ΄ΡΠΉ ΠΈΠ· ΠΊΠΎΡΠΎΡΡΡ Π²Π½ΡΡΡΠΈ ΡΠΎΠ΄Π΅ΡΠΆΠΈΡ ΠΊΠΎΠΌΠΏΠΎΠ½ΠΎΠ²ΠΊΡ ΠΏΠΎ ΡΡΠΎΠ»Π±ΡΠ°ΠΌ.
Π Apache Arrow ΠΊΠΎΠ»Π»Π΅ΠΊΡΠΈΡ ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΡ ΠΌΠ°ΡΡΠΈΠ²ΠΎΠ² Π² ΠΏΠ°ΠΌΡΡΠΈ, ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»ΡΡΡΠ°Ρ ΡΠ°Π½ΠΊ ΡΠ°Π±Π»ΠΈΡΡ, Π½Π°Π·ΡΠ²Π°Π΅ΡΡΡ ΠΏΠ°ΠΊΠ΅ΡΠΎΠΌ Π·Π°ΠΏΠΈΡΠ΅ΠΉ (record batch). Π§ΡΠΎΠ±Ρ ΠΏΡΠ΅Π΄ΡΡΠ°Π²ΠΈΡΡ Π΅Π΄ΠΈΠ½ΡΡ ΡΡΡΡΠΊΡΡΡΡ Π΄Π°Π½Π½ΡΡ Π»ΠΎΠ³ΠΈΡΠ΅ΡΠΊΠΎΠΉ ΡΠ°Π±Π»ΠΈΡΡ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΎΠ±ΡΠ°ΡΡ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΏΠ°ΠΊΠ΅ΡΠΎΠ² Π·Π°ΠΏΠΈΡΠ΅ΠΉ.
Π ΡΡΡΠ΅ΡΡΠ²ΡΡΡΠ΅ΠΌ ΡΠΎΡΠΌΠ°ΡΠ΅ ΡΠ°ΠΉΠ»ΠΎΠ² Β«random accessΒ» ΠΌΡ Π·Π°ΠΏΠΈΡΡΠ²Π°Π΅ΠΌ ΠΌΠ΅ΡΠ°Π΄Π°Π½Π½ΡΠ΅, ΡΠΎΠ΄Π΅ΡΠΆΠ°ΡΠΈΠ΅ ΡΡ Π΅ΠΌΡ ΡΠ°Π±Π»ΠΈΡΡ ΠΈ ΡΠ°ΡΠΏΠΎΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Π±Π»ΠΎΠΊΠΎΠ² Π² ΠΊΠΎΠ½ΡΠ΅ ΡΠ°ΠΉΠ»Π°, ΡΡΠΎ ΠΏΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ Π²Π°ΠΌ ΠΊΡΠ°ΠΉΠ½Π΅ Π΄Π΅ΡΠ΅Π²ΠΎ Π²ΡΠ±ΠΈΡΠ°ΡΡ Π»ΡΠ±ΠΎΠΉ ΠΏΠ°ΠΊΠ΅Ρ Π·Π°ΠΏΠΈΡΠ΅ΠΉ ΠΈΠ»ΠΈ Π»ΡΠ±ΠΎΠΉ ΡΡΠΎΠ»Π±Π΅Ρ ΠΈΠ· Π½Π°Π±ΠΎΡΠ° Π΄Π°Π½Π½ΡΡ . Π ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠΌ ΡΠΎΡΠΌΠ°ΡΠ΅ ΠΌΡ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ ΡΠ΅ΡΠΈΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ: ΡΡ Π΅ΠΌΡ, Π° ΠΏΠΎΡΠΎΠΌ ΠΎΠ΄ΠΈΠ½ ΠΈΠ»ΠΈ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΏΠ°ΠΊΠ΅ΡΠΎΠ² Π·Π°ΠΏΠΈΡΠ΅ΠΉ.
Π Π°Π·Π»ΠΈΡΠ½ΡΠ΅ ΡΠΎΡΠΌΠ°ΡΡ Π²ΡΠ³Π»ΡΠ΄ΡΡ ΠΏΡΠΈΠΌΠ΅ΡΠ½ΠΎ ΡΠ°ΠΊ, ΠΊΠ°ΠΊ ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»Π΅Π½ΠΎ Π½Π° ΡΡΠΎΠΌ ΡΠΈΡΡΠ½ΠΊΠ΅:
ΠΠΎΡΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠ° Π΄Π°Π½Π½ΡΡ Π² PyArrow: ΠΏΡΠΈΠΌΠ΅Π½Π΅Π½ΠΈΠ΅
Π§ΡΠΎΠ±Ρ ΠΏΠΎΠΊΠ°Π·Π°ΡΡ Π²Π°ΠΌ ΠΊΠ°ΠΊ ΡΡΠΎ ΡΠ°Π±ΠΎΡΠ°Π΅Ρ, Ρ ΡΠΎΠ·Π΄Π°ΠΌ ΠΏΡΠΈΠΌΠ΅Ρ Π΄Π°ΡΠ°ΡΠ΅ΡΠ°, ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»ΡΡΡΠ΅Π³ΠΎ ΠΎΠ΄ΠΈΠ½ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΡΠΉ ΡΠ°Π½ΠΊ:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Π’Π΅ΠΏΠ΅ΡΡ, ΠΏΡΠ΅Π΄ΠΏΠΎΠ»ΠΎΠΆΠΈΠΌ, ΡΡΠΎ ΠΌΡ Ρ ΠΎΡΠΈΠΌ Π·Π°ΠΏΠΈΡΠ°ΡΡ 1 ΠΠ± Π΄Π°Π½Π½ΡΡ , ΡΠΎΡΡΠΎΡΡΠΈΡ ΠΈΠ· ΡΠ°Π½ΠΊΠΎΠ² ΡΠ°Π·ΠΌΠ΅ΡΠΎΠΌ 1 ΠΠ± ΠΊΠ°ΠΆΠ΄ΡΠΉ, ΠΈΡΠΎΠ³ΠΎ 1024 ΡΠ°Π½ΠΊΠ°. ΠΠ»Ρ Π½Π°ΡΠ°Π»Π° Π΄Π°Π²Π°ΠΉΡΠ΅ ΡΠΎΠ·Π΄Π°Π΄ΠΈΠΌ ΠΏΠ΅ΡΠ²ΡΠΉ ΡΡΠ΅ΠΉΠΌ Π΄Π°Π½Π½ΡΡ ΡΠ°Π·ΠΌΠ΅ΡΠΎΠΌ 1 ΠΠ± Ρ 16 ΡΡΠΎΠ»Π±ΡΠ°ΠΌΠΈ:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
ΠΠ°ΡΠ΅ΠΌ Ρ ΠΊΠΎΠ½Π²Π΅ΡΡΠΈΡΡΡ ΠΈΡ
Π² pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Π’Π΅ΠΏΠ΅ΡΡ Ρ ΡΠΎΠ·Π΄Π°ΠΌ ΠΏΠΎΡΠΎΠΊ Π²ΡΠ²ΠΎΠ΄Π°, ΠΊΠΎΡΠΎΡΡΠΉ Π±ΡΠ΄Π΅Ρ ΠΏΠΈΡΠ°ΡΡ Π² ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠ²Π½ΡΡ ΠΏΠ°ΠΌΡΡΡ ΠΈ ΡΠΎΠ·Π΄Π°ΠΌ StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
ΠΠ°ΡΠ΅ΠΌ ΠΌΡ Π·Π°ΠΏΠΈΡΠ΅ΠΌ 1024 ΡΠ°Π½ΠΊΠ°, ΠΊΠΎΡΠΎΡΡΠ΅ Π² ΠΈΡΠΎΠ³Π΅ ΡΠΎΡΡΠ°Π²ΡΡ 1ΠΠ± Π½Π°Π±ΠΎΡΠ° Π΄Π°Π½Π½ΡΡ :
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
ΠΠΎΡΠΊΠΎΠ»ΡΠΊΡ ΠΌΡ ΠΏΠΈΡΠ°Π»ΠΈ Π² ΠΠΠ£, ΡΠΎ Π²Π΅ΡΡ ΠΏΠΎΡΠΎΠΊ ΠΌΡ ΡΠΌΠΎΠΆΠ΅ΠΌ ΠΏΠΎΠ»ΡΡΠΈΡΡ Π² ΠΎΠ΄Π½ΠΎΠΌ Π±ΡΡΠ΅ΡΠ΅:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
ΠΠΎΡΠΊΠΎΠ»ΡΠΊΡ ΡΡΠΈ Π΄Π°Π½Π½ΡΠ΅ Π½Π°Ρ
ΠΎΠ΄ΡΡΡΡ Π² ΠΏΠ°ΠΌΡΡΠΈ, ΡΡΠΈΡΡΠ²Π°Π½ΠΈΠ΅ ΠΏΠ°ΠΊΠ΅ΡΠΎΠ² Π·Π°ΠΏΠΈΡΠ΅ΠΉ Arrow ΠΏΠΎΠ»ΡΡΠ°Π΅ΡΡΡ zero-copy ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠ΅ΠΉ. Π― ΠΎΡΠΊΡΡΠ²Π°Ρ StreamReader, ΡΡΠΈΡΡΠ²Π°Ρ Π΄Π°Π½Π½ΡΠ΅ Π² pyarrow.Table
, Π° Π·Π°ΡΠ΅ΠΌ ΠΊΠΎΠ½Π²Π΅ΡΡΠΈΡΡΡ ΠΈΡ
Π² DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
ΠΡΠ΅ ΡΡΠΎ, ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ, Ρ ΠΎΡΠΎΡΠΎ, Π½ΠΎ Ρ Π²Π°Ρ ΠΌΠΎΠ³ΡΡ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡΡΡ Π²ΠΎΠΏΡΠΎΡΡ. ΠΠ°ΠΊ Π±ΡΡΡΡΠΎ ΡΡΠΎ ΠΏΡΠΎΠΈΡΡ ΠΎΠ΄ΠΈΡ? ΠΠ°ΠΊ ΡΠ°Π·ΠΌΠ΅Ρ ΡΠ°Π½ΠΊΠ° Π²Π»ΠΈΡΠ΅Ρ Π½Π° ΠΏΡΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡΠ΅Π»ΡΠ½ΠΎΡΡΡ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ DataFrame pandas?
ΠΡΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡΠ΅Π»ΡΠ½ΠΎΡΡΡ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠΈ Π΄Π°Π½Π½ΡΡ
ΠΠΎ ΠΌΠ΅ΡΠ΅ ΡΠΌΠ΅Π½ΡΡΠ΅Π½ΠΈΡ ΡΠ°Π·ΠΌΠ΅ΡΠ° ΡΠ°Π½ΠΊΠ° ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠΈ ΡΡΠΎΠΈΠΌΠΎΡΡΡ ΡΠ΅ΠΊΠΎΠ½ΡΡΡΡΠΊΡΠΈΠΈ Π½Π΅ΠΏΡΠ΅ΡΡΠ²Π½ΠΎΠ³ΠΎ ΡΡΠΎΠ»Π±ΡΠ°ΡΠΎΠ³ΠΎ ΠΊΠ°Π΄ΡΠ° DataFrame Π² pandas Π²ΠΎΠ·ΡΠ°ΡΡΠ°Π΅Ρ ΠΈΠ·-Π·Π° Π½Π΅ΡΡΡΠ΅ΠΊΡΠΈΠ²Π½ΡΡ ΡΡ Π΅ΠΌ Π΄ΠΎΡΡΡΠΏΠ° ΠΊ ΠΊΡΡ-ΠΏΠ°ΠΌΡΡΠΈ. Π‘ΡΡΠ΅ΡΡΠ²ΡΡΡ ΡΠ°ΠΊΠΆΠ΅ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ Π½Π°ΠΊΠ»Π°Π΄Π½ΡΠ΅ ΡΠ°ΡΡ ΠΎΠ΄Ρ ΠΎΡ ΡΠ°Π±ΠΎΡΡ ΡΠΎ ΡΡΡΡΠΊΡΡΡΠ°ΠΌΠΈ Π΄Π°Π½Π½ΡΡ C++ ΠΈ ΠΌΠ°ΡΡΠΈΠ²Π°ΠΌΠΈ ΠΈ ΠΈΡ Π±ΡΡΠ΅ΡΠ°ΠΌΠΈ ΠΏΠ°ΠΌΡΡΠΈ.
ΠΠ»Ρ 1 ΠΠ±, ΠΊΠ°ΠΊ ΡΠΊΠ°Π·Π°Π½ΠΎ Π²ΡΡΠ΅, Π½Π° ΠΌΠΎΠ΅ΠΌ Π½ΠΎΡΡΠ±ΡΠΊΠ΅ (Quad-core Xeon E3-1505M) ΠΏΠΎΠ»ΡΡΠ°Π΅ΡΡΡ:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
ΠΠΎΠ»ΡΡΠ°Π΅ΡΡΡ, ΡΡΠΎ ΡΡΡΠ΅ΠΊΡΠΈΠ²Π½Π°Ρ ΠΏΡΠΎΠΏΡΡΠΊΠ½Π°Ρ ΡΠΏΠΎΡΠΎΠ±Π½ΠΎΡΡΡ β 7.75 ΠΠ±/Ρ Π΄Π»Ρ Π²ΠΎΡΡΡΠ°Π½ΠΎΠ²Π»Π΅Π½ΠΈΡ DataFrame ΠΎΠ±ΡΠ΅ΠΌΠΎΠΌ 1ΠΠ± ΠΈΠ· 1024 ΡΠ°Π½ΠΊΠΎΠ² ΠΏΠΎ 1ΠΠ±. Π§ΡΠΎ ΠΏΡΠΎΠΈΡΡ ΠΎΠ΄ΠΈΡ, Π΅ΡΠ»ΠΈ ΠΌΡ Π±ΡΠ΄Π΅ΠΌ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ ΡΠ°Π½ΠΊΠΈ Π±ΠΎΠ»ΡΡΠ΅Π³ΠΎ ΠΈΠ»ΠΈ ΠΌΠ΅Π½ΡΡΠ΅Π³ΠΎ ΡΠ°Π·ΠΌΠ΅ΡΠ°? ΠΠΎΡ ΡΠ°ΠΊΠΈΠ΅ ΡΠ΅Π·ΡΠ»ΡΡΠ°ΡΡ ΠΏΠΎΠ»ΡΡΠ°ΡΡΡ:
ΠΡΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡΠ΅Π»ΡΠ½ΠΎΡΡΡ ΡΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ ΡΠ½ΠΈΠΆΠ°Π΅ΡΡΡ Ρ 256K Π΄ΠΎ 64K ΡΠ°Π½ΠΊΠΎΠ². ΠΠ΅Π½Ρ ΡΠ΄ΠΈΠ²ΠΈΠ»ΠΎ, ΡΡΠΎ ΡΠ°Π½ΠΊΠΈ ΡΠ°Π·ΠΌΠ΅ΡΠΎΠΌ 1 ΠΠ± ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π»ΠΈΡΡ Π±ΡΡΡΡΠ΅Π΅, ΡΠ΅ΠΌ 16 ΠΠ±. Π‘ΡΠΎΠΈΡ ΠΏΡΠΎΠ²Π΅ΡΡΠΈ Π±ΠΎΠ»Π΅Π΅ ΡΡΠ°ΡΠ΅Π»ΡΠ½ΠΎΠ΅ ΠΈΡΡΠ»Π΅Π΄ΠΎΠ²Π°Π½ΠΈΠ΅ ΠΈ ΠΏΠΎΠ½ΡΡΡ, ΡΠ²Π»ΡΠ΅ΡΡΡ Π»ΠΈ ΡΡΠΎ Π½ΠΎΡΠΌΠ°Π»ΡΠ½ΡΠΌ ΡΠ°ΡΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΈΠ΅ΠΌ ΠΈΠ»ΠΈ ΡΡΡ Π²Π»ΠΈΡΠ΅Ρ ΡΡΠΎ-ΡΠΎ Π΅ΡΠ΅.
Π ΡΠ΅ΠΊΡΡΠ΅ΠΉ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ ΡΠΎΡΠΌΠ°ΡΠ° Π΄Π°Π½Π½ΡΠ΅ Π½Π΅ ΡΠΆΠΈΠΌΠ°ΡΡΡΡ Π² ΠΏΡΠΈΠ½ΡΠΈΠΏΠ΅, ΠΏΠΎΡΡΠΎΠΌΡ ΡΠ°Π·ΠΌΠ΅Ρ Π² ΠΏΠ°ΠΌΡΡΠΈ ΠΈ Β«Π² ΠΏΡΠΎΠ²ΠΎΠ΄Π°Ρ Β» ΠΏΡΠΈΠΌΠ΅ΡΠ½ΠΎ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΡΠΉ. Π Π±ΡΠ΄ΡΡΠ΅ΠΌ ΡΠΆΠ°ΡΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ ΡΡΠ°ΡΡ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΡΠ½ΠΎΠΉ ΠΎΠΏΡΠΈΠ΅ΠΉ.
ΠΡΠΎΠ³
ΠΠΎΡΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠ° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΡ Π΄Π°Π½Π½ΡΡ ΠΌΠΎΠΆΠ΅Ρ ΠΎΠΊΠ°Π·Π°ΡΡΡΡ ΡΡΡΠ΅ΠΊΡΠΈΠ²Π½ΡΠΌ ΡΠΏΠΎΡΠΎΠ±ΠΎΠΌ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΠΈ Π±ΠΎΠ»ΡΡΠΈΡ Π½Π°Π±ΠΎΡΠΎΠ² Π΄Π°Π½Π½ΡΡ Π² ΠΊΠΎΠ»ΠΎΠ½ΠΎΡΠ½ΡΠ΅ Π°Π½Π°Π»ΠΈΡΠΈΡΠ΅ΡΠΊΠΈΠ΅ ΠΈΠ½ΡΡΡΡΠΌΠ΅Π½ΡΡ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ Π² pandas, Ρ ΠΏΠΎΠΌΠΎΡΡΡ Π½Π΅Π±ΠΎΠ»ΡΡΠΈΡ ΡΠ°Π½ΠΊΠΎΠ². Π‘Π»ΡΠΆΠ±Ρ Π΄Π°Π½Π½ΡΡ , ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡΠΈΠ΅ Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅, ΠΎΡΠΈΠ΅Π½ΡΠΈΡΠΎΠ²Π°Π½Π½ΠΎΠ΅ Π½Π° ΡΡΡΠΎΠΊΠΈ, ΠΌΠΎΠ³ΡΡ ΠΏΠ΅ΡΠ΅Π΄Π°Π²Π°ΡΡ ΠΈ ΡΡΠ°Π½ΡΠΏΠΎΠ½ΠΈΡΠΎΠ²Π°ΡΡ Π½Π΅Π±ΠΎΠ»ΡΡΠΈΠ΅ ΡΠ°Π½ΠΊΠΈ Π΄Π°Π½Π½ΡΡ , ΠΊΠΎΡΠΎΡΡΠ΅ Π±ΠΎΠ»Π΅Π΅ ΡΠ΄ΠΎΠ±Π½Ρ Π΄Π»Ρ ΠΊΡΡΠ° L2 ΠΈ L3 Π²Π°ΡΠ΅Π³ΠΎ ΠΏΡΠΎΡΠ΅ΡΡΠΎΡΠ°.
ΠΠΎΠ»Π½ΡΠΉ ΠΊΠΎΠ΄
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
ΠΡΡΠΎΡΠ½ΠΈΠΊ: habr.com