ΠŸΡ€Π΅Π½Π΅ΡΡƒΠ²Π°ΡšΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½Π° со Apache Arrow

ΠŸΡ€Π΅Π²ΠΎΠ΄ΠΎΡ‚ Π½Π° ΡΡ‚Π°Ρ‚ΠΈΡ˜Π°Ρ‚Π° Π΅ ΠΏΠΎΠ΄Π³ΠΎΡ‚Π²Π΅Π½ ΡΠΏΠ΅Ρ†ΠΈΡ˜Π°Π»Π½ΠΎ Π·Π° студСнтитС Π½Π° курсот Π˜Π½ΠΆΠ΅Π½Π΅Ρ€ Π·Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ.

ΠŸΡ€Π΅Π½Π΅ΡΡƒΠ²Π°ΡšΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½Π° со Apache Arrow

Π’ΠΎ Ρ‚Π΅ΠΊΠΎΡ‚ Π½Π° ΠΈΠ·ΠΌΠΈΠ½Π°Ρ‚ΠΈΡ‚Π΅ Π½Π΅ΠΊΠΎΠ»ΠΊΡƒ Π½Π΅Π΄Π΅Π»ΠΈ, Π½ΠΈΠ΅ Нонг Π›ΠΈ Π΄ΠΎΠ΄Π°Π΄Π΅Π½Π° Π½Π° Апачи стрСлка Π±ΠΈΠ½Π°Ρ€Π΅Π½ стрим Ρ„ΠΎΡ€ΠΌΠ°Ρ‚, Π΄ΠΎΠΏΠΎΠ»Π½ΡƒΠ²Π°Ρ˜ΡœΠΈ Π³ΠΎ вСќС постоСчкиот Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ Π½Π° Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠ° со случаСн пристап/IPC. ИмамС Java ΠΈ C++ ΠΈΠΌΠΏΠ»Π΅ΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ ΠΈ Python Π²Ρ€Π·ΡƒΠ²Π°ΡšΠ°. Π’ΠΎ ΠΎΠ²Π°Π° ΡΡ‚Π°Ρ‚ΠΈΡ˜Π°, ќС објаснам ΠΊΠ°ΠΊΠΎ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½ΠΈΡ€Π° Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ΠΎΡ‚ ΠΈ ќС ΠΏΠΎΠΊΠ°ΠΆΠ°ΠΌ ΠΊΠ°ΠΊΠΎ ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° постигнСтС ΠΌΠ½ΠΎΠ³Ρƒ висока пропусност Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π·Π° DataFrame Π½Π° ΠΏΠ°Π½Π΄ΠΈ.

ΠŸΡ€Π΅Π½ΠΎΡ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½Π°

ЧСстото ΠΏΡ€Π°ΡˆΠ°ΡšΠ΅ ΡˆΡ‚ΠΎ Π³ΠΎ Π΄ΠΎΠ±ΠΈΠ²Π°ΠΌ ΠΎΠ΄ корисницитС Π½Π° Arrow Π΅ високата Ρ†Π΅Π½Π° Π·Π° ΠΌΠΈΠ³Ρ€ΠΈΡ€Π°ΡšΠ΅ Π½Π° Π³ΠΎΠ»Π΅ΠΌΠΈ Ρ‚Π°Π±Π΅Π»Π°Ρ€Π½ΠΈ Π·Π±ΠΈΡ€ΠΊΠΈ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ ΠΎΡ€ΠΈΠ΅Π½Ρ‚ΠΈΡ€Π°Π½ ΠΊΠΎΠ½ Ρ€Π΅Π΄ ΠΈΠ»ΠΈ запис Π²ΠΎ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ Π½Π° ΠΊΠΎΠ»ΠΎΠ½Π°. Π—Π° Π·Π±ΠΈΡ€ΠΊΠΈ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ повСќС Π³ΠΈΠ³Π°Π±Π°Ρ˜Ρ‚ΠΈ, Ρ‚Ρ€Π°Π½ΡΠΏΠΎΠ½ΠΈΡ€Π°ΡšΠ΅Ρ‚ΠΎ Π²ΠΎ ΠΌΠ΅ΠΌΠΎΡ€ΠΈΡ˜Π° ΠΈΠ»ΠΈ Π½Π° диск ΠΌΠΎΠΆΠ΅ Π΄Π° Π±ΠΈΠ΄Π΅ ΠΎΠ³Ρ€ΠΎΠΌΠ½ΠΎ.

Π—Π° прСнос Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ, Π±Π΅Π· Ρ€Π°Π·Π»ΠΈΠΊΠ° Π΄Π°Π»ΠΈ ΠΈΠ·Π²ΠΎΡ€Π½ΠΈΡ‚Π΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ сС Ρ€Π΅Π΄ ΠΈΠ»ΠΈ ΠΊΠΎΠ»ΠΎΠ½Π°, Π΅Π΄Π½Π° ΠΎΠΏΡ†ΠΈΡ˜Π° Π΅ Π΄Π° сС испратат ΠΌΠ°Π»ΠΈ сСрии ΠΎΠ΄ Ρ€Π΅Π΄ΠΎΠ²ΠΈ, ΠΎΠ΄ ΠΊΠΎΠΈ сСкоја ќС содрТи распорСд Π½Π° ΠΊΠΎΠ»ΠΎΠ½Π° Π²Π½Π°Ρ‚Ρ€Π΅.

Π’ΠΎ Apache Arrow, Π·Π±ΠΈΡ€ΠΊΠ°Ρ‚Π° ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΎΠ±Ρ€Π°Π·Π½ΠΈ Π½ΠΈΠ·ΠΈ Π²ΠΎ ΠΌΠ΅ΠΌΠΎΡ€ΠΈΡ˜Π°Ρ‚Π° ΡˆΡ‚ΠΎ прСтставуваат Π΄Π΅Π» ΠΎΠ΄ Ρ‚Π°Π±Π΅Π»Π°Ρ‚Π° сС Π½Π°Ρ€Π΅ΠΊΡƒΠ²Π° ΡΠ΅Ρ€ΠΈΡ˜Π° Π½Π° записи. Π—Π° Π΄Π° прСтставитС Π΅Π΄Π½Π° структура Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π½Π° Π»ΠΎΠ³ΠΈΡ‡ΠΊΠ° Ρ‚Π°Π±Π΅Π»Π°, ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° собСрСтС Π½Π΅ΠΊΠΎΠ»ΠΊΡƒ ΠΏΠ°ΠΊΠ΅Ρ‚ΠΈ записи.

Π’ΠΎ постоСчкиот Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ Π½Π° Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠ° со β€žΡΠ»ΡƒΡ‡Π°Π΅Π½ ΠΏΡ€ΠΈΡΡ‚Π°ΠΏβ€œ, ΠΏΠΈΡˆΡƒΠ²Π°ΠΌΠ΅ ΠΌΠ΅Ρ‚Π°ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΡˆΡ‚ΠΎ ја содрТат ΡˆΠ΅ΠΌΠ°Ρ‚Π° Π½Π° Ρ‚Π°Π±Π΅Π»Π°Ρ‚Π° ΠΈ распорСдот Π½Π° Π±Π»ΠΎΠΊΠΎΠ²ΠΈ Π½Π° ΠΊΡ€Π°Ρ˜ΠΎΡ‚ ΠΎΠ΄ Π΄Π°Ρ‚ΠΎΡ‚Π΅ΠΊΠ°Ρ‚Π°, ΡˆΡ‚ΠΎ Π²ΠΈ ΠΎΠ²ΠΎΠ·ΠΌΠΎΠΆΡƒΠ²Π° Π΄Π° ΠΈΠ·Π±Π΅Ρ€Π΅Ρ‚Π΅ која Π±ΠΈΠ»ΠΎ ΡΠ΅Ρ€ΠΈΡ˜Π° Π½Π° записи ΠΈΠ»ΠΈ која Π±ΠΈΠ»ΠΎ ΠΊΠΎΠ»ΠΎΠ½Π° ΠΎΠ΄ сСтот Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΌΠ½ΠΎΠ³Ρƒ Π΅Π²Ρ‚ΠΈΠ½ΠΎ. Π’ΠΎ стриминг Ρ„ΠΎΡ€ΠΌΠ°Ρ‚, ΠΈΡΠΏΡ€Π°ΡœΠ°ΠΌΠ΅ ΡΠ΅Ρ€ΠΈΡ˜Π° ΠΏΠΎΡ€Π°ΠΊΠΈ: шСма, Π° ΠΏΠΎΡ‚ΠΎΠ° Π΅Π΄Π½Π° ΠΈΠ»ΠΈ повСќС Π³Ρ€ΡƒΠΏΠΈ записи.

Π Π°Π·Π»ΠΈΡ‡Π½ΠΈΡ‚Π΅ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ΠΈ ΠΈΠ·Π³Π»Π΅Π΄Π°Π°Ρ‚ Π½Π΅ΡˆΡ‚ΠΎ ΠΊΠ°ΠΊΠΎ ΠΎΠ²Π°Π° слика:

ΠŸΡ€Π΅Π½Π΅ΡΡƒΠ²Π°ΡšΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½Π° со Apache Arrow

Π‘Ρ‚Ρ€ΠΈΠΌΠΈΠ½Π³ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π²ΠΎ PyArrow: Π°ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΡ˜Π°

Π—Π° Π΄Π° Π²ΠΈ ΠΏΠΎΠΊΠ°ΠΆΠ°ΠΌ ΠΊΠ°ΠΊΠΎ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½ΠΈΡ€Π°, ќС создадам ΠΏΡ€ΠΈΠΌΠ΅Ρ€ Π½Π° Π±Π°Π·Π° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΡˆΡ‚ΠΎ прСтставува Π΅Π΄ΠΈΠ½Π΅Ρ‡Π΅Π½ Π΄Π΅Π» ΠΎΠ΄ прСносот:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Π‘Π΅Π³Π°, Π΄Π° прСтпоставимС Π΄Π΅ΠΊΠ° сакамС Π΄Π° напишСмС 1 GB ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ, составСни ΠΎΠ΄ ΠΏΠΎ 1 MB ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ°, Π·Π° Π²ΠΊΡƒΠΏΠ½ΠΎ 1024 ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ°. ΠŸΡ€Π²ΠΎ, Π΄Π° ја создадСмС ΠΏΡ€Π²Π°Ρ‚Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ‡Π½Π° Ρ€Π°ΠΌΠΊΠ° ΠΎΠ΄ 1MB со 16 ΠΊΠΎΠ»ΠΎΠ½ΠΈ:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

ΠŸΠΎΡ‚ΠΎΠ° Π³ΠΈ ΠΏΡ€Π΅Ρ‚Π²ΠΎΡ€Π°ΠΌ Π²ΠΎ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Π‘Π΅Π³Π° ќС создадам ΠΈΠ·Π»Π΅Π·Π΅Π½ ΠΏΠΎΡ‚ΠΎΠΊ ΡˆΡ‚ΠΎ ќС ΠΏΠΈΡˆΡƒΠ²Π° Π²ΠΎ RAM ΠΈ ќС ΠΊΡ€Π΅ΠΈΡ€Π° StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

ΠŸΠΎΡ‚ΠΎΠ° ќС напишСмС 1024 ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ°, ΠΊΠΎΠΈ Π½Π° ΠΊΡ€Π°Ρ˜ΠΎΡ‚ ќС сочинуваат Π±Π°Π·Π° Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Π‘ΠΈΠ΄Π΅Ρ˜ΡœΠΈ ΠΏΠΈΡˆΡƒΠ²Π°Π²ΠΌΠ΅ Π²ΠΎ RAM ΠΌΠ΅ΠΌΠΎΡ€ΠΈΡ˜Π°, ΠΌΠΎΠΆΠ΅ΠΌΠ΅ Π΄Π° Π³ΠΎ Π΄ΠΎΠ±ΠΈΠ΅ΠΌΠ΅ Ρ†Π΅Π»ΠΈΠΎΡ‚ ΠΏΠΎΡ‚ΠΎΠΊ Π²ΠΎ Π΅Π΄Π΅Π½ Π±Π°Ρ„Π΅Ρ€:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Π‘ΠΈΠ΄Π΅Ρ˜ΡœΠΈ ΠΎΠ²ΠΈΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ сС Π²ΠΎ ΠΌΠ΅ΠΌΠΎΡ€ΠΈΡ˜Π°, Ρ‡ΠΈΡ‚Π°ΡšΠ΅Ρ‚ΠΎ Π½Π° сСрии записи со стрСлки сС Π΄ΠΎΠ±ΠΈΠ²Π° со ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡ˜Π° Π½Π° Π½ΡƒΠ»Ρ‚Π° ΠΊΠΎΠΏΠΈΡ€Π°ΡšΠ΅. Π“ΠΎ ΠΎΡ‚Π²ΠΎΡ€Π°ΠΌ StreamReader, Ρ‡ΠΈΡ‚Π°ΠΌ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π²ΠΎ pyarrow.TableΠ° ΠΏΠΎΡ‚ΠΎΠ° ΠΏΡ€Π΅Ρ‚Π²ΠΎΡ€Π΅Ρ‚Π΅ Π³ΠΈ Π²ΠΎ DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Π‘Π΅Ρ‚ΠΎ ΠΎΠ²Π°, сС Ρ€Π°Π·Π±ΠΈΡ€Π°, Π΅ Π΄ΠΎΠ±Ρ€ΠΎ, Π½ΠΎ ΠΌΠΎΠΆΠ΅Π±ΠΈ ΠΈΠΌΠ°Ρ‚Π΅ ΠΏΡ€Π°ΡˆΠ°ΡšΠ°. ΠšΠΎΠ»ΠΊΡƒ Π±Ρ€Π·ΠΎ сС случува? Како Π³ΠΎΠ»Π΅ΠΌΠΈΠ½Π°Ρ‚Π° Π½Π° Π΄Π΅Π»ΠΎΡ‚ влијаС Π½Π° пСрформанситС Π½Π° ΠΏΠ°Π½Π΄ΠΈΡ‚Π΅ Π·Π° ΠΏΡ€Π΅Π·Π΅ΠΌΠ°ΡšΠ΅ DataFrame?

ИзвСдба Π½Π° стриминг

Како ΡˆΡ‚ΠΎ сС Π½Π°ΠΌΠ°Π»ΡƒΠ²Π° Π³ΠΎΠ»Π΅ΠΌΠΈΠ½Π°Ρ‚Π° Π½Π° стриминг ΠΏΠ°Ρ€Ρ‡Π΅Ρ‚ΠΎ, Ρ‚Ρ€ΠΎΡˆΠΎΡ†ΠΈΡ‚Π΅ Π·Π° Ρ€Π΅ΠΊΠΎΠ½ΡΡ‚Ρ€ΡƒΠΊΡ†ΠΈΡ˜Π° Π½Π° сосСдна ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΎΠ±Ρ€Π°Π·Π½Π° DataFrame кај ΠΏΠ°Π½Π΄ΠΈΡ‚Π΅ сС Π·Π³ΠΎΠ»Π΅ΠΌΡƒΠ²Π°Π°Ρ‚ ΠΏΠΎΡ€Π°Π΄ΠΈ нССфикаснитС шСми Π·Π° пристап Π²ΠΎ ΠΊΠ΅ΡˆΠΎΡ‚. Π˜ΡΡ‚ΠΎ Ρ‚Π°ΠΊΠ°, ΠΈΠΌΠ° Π½Π΅ΠΊΠΎΠΈ Ρ‚Ρ€ΠΎΡˆΠΎΡ†ΠΈ Π·Π° Ρ€Π°Π±ΠΎΡ‚Π° со C++ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ‡Π½ΠΈ структури ΠΈ Π½ΠΈΠ·ΠΈ ΠΈ Π½ΠΈΠ²Π½ΠΈΡ‚Π΅ мСмориски Π±Π°Ρ„Π΅Ρ€ΠΈ.

Π—Π° 1 MB ΠΊΠ°ΠΊΠΎ ΠΏΠΎΠ³ΠΎΡ€Π΅, Π½Π° ΠΌΠΎΡ˜ΠΎΡ‚ Π»Π°ΠΏΡ‚ΠΎΠΏ (Quad-core Xeon E3-1505M) ΠΈΠ·Π»Π΅Π³ΡƒΠ²Π°:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Π˜Π·Π»Π΅Π³ΡƒΠ²Π° Π΄Π΅ΠΊΠ° Π΅Ρ„Π΅ΠΊΡ‚ΠΈΠ²Π½Π°Ρ‚Π° пропусност Π΅ 7.75 Gb/s Π·Π° Π²Ρ€Π°ΡœΠ°ΡšΠ΅ Π½Π° DataFrame ΠΎΠ΄ 1 GB ΠΎΠ΄ 1024 ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ° ΠΎΠ΄ 1 MB. Π¨Ρ‚ΠΎ сС случува Π°ΠΊΠΎ користимС ΠΏΠΎΠ³ΠΎΠ»Π΅ΠΌΠΈ ΠΈΠ»ΠΈ ΠΏΠΎΠΌΠ°Π»ΠΈ ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ°? Π•Π²Π΅ Π³ΠΈ Ρ€Π΅Π·ΡƒΠ»Ρ‚Π°Ρ‚ΠΈΡ‚Π΅ ΡˆΡ‚ΠΎ Π³ΠΈ Π΄ΠΎΠ±ΠΈΠ²Π°Ρ‚Π΅:

ΠŸΡ€Π΅Π½Π΅ΡΡƒΠ²Π°ΡšΠ΅ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½Π° со Apache Arrow

ΠŸΠ΅Ρ€Ρ„ΠΎΡ€ΠΌΠ°Π½ΡΠΈΡ‚Π΅ Π·Π½Π°Ρ‡ΠΈΡ‚Π΅Π»Π½ΠΎ сС Π½Π°ΠΌΠ°Π»ΡƒΠ²Π°Π°Ρ‚ ΠΎΠ΄ 256K Π½Π° 64K ΠΏΠ°Ρ€Ρ‡ΠΈΡšΠ°. Π‘Π΅Π² ΠΈΠ·Π½Π΅Π½Π°Π΄Π΅Π½ ΡˆΡ‚ΠΎ Π΄Π΅Π»ΠΎΠ²ΠΈΡ‚Π΅ ΠΎΠ΄ 1MB Π±Π΅Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π΅Π½ΠΈ ΠΏΠΎΠ±Ρ€Π·ΠΎ ΠΎΠ΄ Π΄Π΅Π»ΠΎΠ²ΠΈΡ‚Π΅ ΠΎΠ΄ 16MB. Π’Ρ€Π΅Π΄ΠΈ Π΄Π° сС Π½Π°ΠΏΡ€Π°Π²ΠΈ ΠΏΠΎΡ‚Π΅ΠΌΠ΅Π»Π½Π° ΡΡ‚ΡƒΠ΄ΠΈΡ˜Π° ΠΈ Π΄Π° сС Ρ€Π°Π·Π±Π΅Ρ€Π΅ Π΄Π°Π»ΠΈ ΠΎΠ²Π° Π΅ Π½ΠΎΡ€ΠΌΠ°Π»Π½Π° Π΄ΠΈΡΡ‚Ρ€ΠΈΠ±ΡƒΡ†ΠΈΡ˜Π° ΠΈΠ»ΠΈ Π΅ Π²ΠΊΠ»ΡƒΡ‡Π΅Π½ΠΎ Π½Π΅ΡˆΡ‚ΠΎ Π΄Ρ€ΡƒΠ³ΠΎ.

Π’ΠΎ Ρ‚Π΅ΠΊΠΎΠ²Π½Π°Ρ‚Π° ΠΈΠΌΠΏΠ»Π΅ΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΡ˜Π° Π½Π° Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ΠΎΡ‚, ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈΡ‚Π΅ Π²ΠΎ ΠΏΡ€ΠΈΠ½Ρ†ΠΈΠΏ Π½Π΅ сС компрСсирани, Ρ‚Π°ΠΊΠ° ΡˆΡ‚ΠΎ Π³ΠΎΠ»Π΅ΠΌΠΈΠ½Π°Ρ‚Π° Π²ΠΎ ΠΌΠ΅ΠΌΠΎΡ€ΠΈΡ˜Π°Ρ‚Π° ΠΈ β€žΠ½Π° ΠΆΠΈΡ†Π°β€œ Π΅ ΠΏΡ€ΠΈΠ±Π»ΠΈΠΆΠ½ΠΎ иста. ΠšΠΎΠΌΠΏΡ€Π΅ΡΠΈΡ˜Π°Ρ‚Π° ΠΌΠΎΠΆΠ΅ Π΄Π° станС ΠΎΠΏΡ†ΠΈΡ˜Π° Π²ΠΎ ΠΈΠ΄Π½ΠΈΠ½Π°.

Π’ΠΊΡƒΠΏΠ½ΠΎ

ΠŸΡ€Π΅Π½Π΅ΡΡƒΠ²Π°ΡšΠ΅Ρ‚ΠΎ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΠΎΠ΄ ΠΊΠΎΠ»ΠΎΠ½ΠΈ ΠΌΠΎΠΆΠ΅ Π΄Π° Π±ΠΈΠ΄Π΅ СфикасСн Π½Π°Ρ‡ΠΈΠ½ Π·Π° прСнос Π½Π° Π³ΠΎΠ»Π΅ΠΌΠΈ Π·Π±ΠΈΡ€ΠΊΠΈ Π½Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ Π²ΠΎ Π°Π»Π°Ρ‚ΠΊΠΈ Π·Π° Π°Π½Π°Π»ΠΈΠ·Π° Π½Π° ΠΊΠΎΠ»ΠΎΠ½ΠΈ, ΠΊΠ°ΠΊΠΎ ΡˆΡ‚ΠΎ сС ΠΏΠ°Π½Π΄ΠΈΡ‚Π΅ Π²ΠΎ ΠΌΠ°Π»ΠΈ Π΄Π΅Π»ΠΎΠ²ΠΈ. УслугитС Π·Π° ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΡˆΡ‚ΠΎ користат ΡΠΊΠ»Π°Π΄ΠΈΡ€Π°ΡšΠ΅ ΠΎΡ€ΠΈΠ΅Π½Ρ‚ΠΈΡ€Π°Π½ΠΈ ΠΊΠΎΠ½ Ρ€Π΅Π΄ ΠΌΠΎΠΆΠ΅ Π΄Π° ΠΏΡ€Π΅Ρ„Ρ€Π»Π°Π°Ρ‚ ΠΈ транспонираат ΠΌΠ°Π»ΠΈ Π΄Π΅Π»ΠΎΠ²ΠΈ ΠΎΠ΄ ΠΏΠΎΠ΄Π°Ρ‚ΠΎΡ†ΠΈ ΡˆΡ‚ΠΎ сС ΠΏΠΎΠΏΠΎΠ³ΠΎΠ΄Π½ΠΈ Π·Π° ΠΊΠ΅ΡˆΠΎΡ‚ L2 ΠΈ L3 Π½Π° Π²Π°ΡˆΠΈΠΎΡ‚ процСсор.

ЦСлосСн код

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Π˜Π·Π²ΠΎΡ€: www.habr.com

Π”ΠΎΠ΄Π°Π΄Π΅Ρ‚Π΅ ΠΊΠΎΠΌΠ΅Π½Ρ‚Π°Ρ€