ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Apache Arrow

ΠŸΠ΅Ρ€Π΅Π²ΠΎΠ΄ ΡΡ‚Π°Ρ‚ΡŒΠΈ ΠΏΠΎΠ΄Π³ΠΎΡ‚ΠΎΠ²Π»Π΅Π½ ΡΠΏΠ΅Ρ†ΠΈΠ°Π»ΡŒΠ½ΠΎ для студСнтов курса Β«Data EngineerΒ».

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Apache Arrow

Π—Π° послСдниС нСсколько нСдСль ΠΌΡ‹ с Nong Li Π΄ΠΎΠ±Π°Π²ΠΈΠ»ΠΈ Π² Apache Arrow Π±ΠΈΠ½Π°Ρ€Π½Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²Ρ‹ΠΉ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚, Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΠ² ΡƒΠΆΠ΅ ΡΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΠΉ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ Ρ„Π°ΠΉΠ»ΠΎΠ² random access/IPC. Π£ нас Π΅ΡΡ‚ΡŒ Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Π½Π° Java ΠΈ C++ ΠΈ привязки Python. Π’ этой ΡΡ‚Π°Ρ‚ΡŒΠ΅ я расскаТу, ΠΊΠ°ΠΊ Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ ΠΈ ΠΏΠΎΠΊΠ°ΠΆΡƒ, ΠΊΠ°ΠΊ ΠΌΠΎΠΆΠ½ΠΎ Π΄ΠΎΡΡ‚ΠΈΡ‡ΡŒ ΠΎΡ‡Π΅Π½ΡŒ высокой пропускной способности Π΄Π°Π½Π½Ρ‹Ρ… для DataFrame pandas.

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ…

РаспространСнный вопрос, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ я ΠΏΠΎΠ»ΡƒΡ‡Π°ΡŽ ΠΎΡ‚ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Π΅ΠΉ Arrow, β€” это вопрос ΠΎ высокой стоимости пСрСноса Π±ΠΎΠ»ΡŒΡˆΠΈΡ… Π½Π°Π±ΠΎΡ€ΠΎΠ² Ρ‚Π°Π±Π»ΠΈΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π°, ΠΎΡ€ΠΈΠ΅Π½Ρ‚ΠΈΡ€ΠΎΠ²Π°Π½Π½ΠΎΠ³ΠΎ Π½Π° строки ΠΈΠ»ΠΈ записи Π² ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹ΠΉ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚. Для ΠΌΠ½ΠΎΠ³ΠΎΠ³ΠΈΠ³Π°Π±Π°ΠΉΡ‚Π½Ρ‹Ρ… датасСтов транспонированиС Π² памяти ΠΈΠ»ΠΈ Π½Π° дискС ΠΌΠΎΠΆΠ΅Ρ‚ ΠΎΠΊΠ°Π·Π°Ρ‚ΡŒΡΡ нСпосильной Π·Π°Π΄Π°Ρ‡Π΅ΠΉ.

Для ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡ΠΈ Π΄Π°Π½Π½Ρ‹Ρ…, нСзависимо ΠΎΡ‚ Ρ‚ΠΎΠ³ΠΎ, ΡΠ²Π»ΡΡŽΡ‚ΡΡ Π»ΠΈ исходныС Π΄Π°Π½Π½Ρ‹Π΅ строковыми ΠΈΠ»ΠΈ ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹ΠΌΠΈ, ΠΎΠ΄Π½ΠΈΠΌ ΠΈΠ· Π²Π°Ρ€ΠΈΠ°Π½Ρ‚ΠΎΠ² остаСтся ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ° Π½Π΅Π±ΠΎΠ»ΡŒΡˆΠΈΡ… ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ² строк, ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π²Π½ΡƒΡ‚Ρ€ΠΈ содСрТит ΠΊΠΎΠΌΠΏΠΎΠ½ΠΎΠ²ΠΊΡƒ ΠΏΠΎ столбцам.

Π’ Apache Arrow коллСкция ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… массивов Π² памяти, ΠΏΡ€Π΅Π΄ΡΡ‚Π°Π²Π»ΡΡŽΡ‰Π°Ρ Ρ‡Π°Π½ΠΊ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹, называСтся ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠΌ записСй (record batch). Π§Ρ‚ΠΎΠ±Ρ‹ ΠΏΡ€Π΅Π΄ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π΅Π΄ΠΈΠ½ΡƒΡŽ структуру Π΄Π°Π½Π½Ρ‹Ρ… логичСской Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΎΠ±Ρ€Π°Ρ‚ΡŒ нСсколько ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ² записСй.

Π’ ΡΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‰Π΅ΠΌ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ Ρ„Π°ΠΉΠ»ΠΎΠ² Β«random accessΒ» ΠΌΡ‹ записываСм ΠΌΠ΅Ρ‚Π°Π΄Π°Π½Π½Ρ‹Π΅, содСрТащиС схСму Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ ΠΈ располоТСниС Π±Π»ΠΎΠΊΠΎΠ² Π² ΠΊΠΎΠ½Ρ†Π΅ Ρ„Π°ΠΉΠ»Π°, Ρ‡Ρ‚ΠΎ позволяСт Π²Π°ΠΌ ΠΊΡ€Π°ΠΉΠ½Π΅ дСшСво Π²Ρ‹Π±ΠΈΡ€Π°Ρ‚ΡŒ любой ΠΏΠ°ΠΊΠ΅Ρ‚ записСй ΠΈΠ»ΠΈ любой столбСц ΠΈΠ· Π½Π°Π±ΠΎΡ€Π° Π΄Π°Π½Π½Ρ‹Ρ…. Π’ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²ΠΎΠΌ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ ΠΌΡ‹ отправляСм ΡΠ΅Ρ€ΠΈΡŽ сообщСний: схСму, Π° ΠΏΠΎΡ‚ΠΎΠΌ ΠΎΠ΄ΠΈΠ½ ΠΈΠ»ΠΈ нСсколько ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ² записСй.

Π Π°Π·Π»ΠΈΡ‡Π½Ρ‹Π΅ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Ρ‹ выглядят ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π½ΠΎ Ρ‚Π°ΠΊ, ΠΊΠ°ΠΊ прСдставлСно Π½Π° этом рисункС:

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Apache Arrow

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° Π΄Π°Π½Π½Ρ‹Ρ… Π² PyArrow: ΠΏΡ€ΠΈΠΌΠ΅Π½Π΅Π½ΠΈΠ΅

Π§Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΠΊΠ°Π·Π°Ρ‚ΡŒ Π²Π°ΠΌ ΠΊΠ°ΠΊ это Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚, я создам ΠΏΡ€ΠΈΠΌΠ΅Ρ€ датасСта, ΠΏΡ€Π΅Π΄ΡΡ‚Π°Π²Π»ΡΡŽΡ‰Π΅Π³ΠΎ ΠΎΠ΄ΠΈΠ½ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²Ρ‹ΠΉ Ρ‡Π°Π½ΠΊ:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Π’Π΅ΠΏΠ΅Ρ€ΡŒ, ΠΏΡ€Π΅Π΄ΠΏΠΎΠ»ΠΎΠΆΠΈΠΌ, Ρ‡Ρ‚ΠΎ ΠΌΡ‹ Ρ…ΠΎΡ‚ΠΈΠΌ Π·Π°ΠΏΠΈΡΠ°Ρ‚ΡŒ 1 Π“Π± Π΄Π°Π½Π½Ρ‹Ρ…, состоящих ΠΈΠ· Ρ‡Π°Π½ΠΊΠΎΠ² Ρ€Π°Π·ΠΌΠ΅Ρ€ΠΎΠΌ 1 Мб ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ, ΠΈΡ‚ΠΎΠ³ΠΎ 1024 Ρ‡Π°Π½ΠΊΠ°. Для Π½Π°Ρ‡Π°Π»Π° Π΄Π°Π²Π°ΠΉΡ‚Π΅ создадим ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ Ρ„Ρ€Π΅ΠΉΠΌ Π΄Π°Π½Π½Ρ‹Ρ… Ρ€Π°Π·ΠΌΠ΅Ρ€ΠΎΠΌ 1 Мб с 16 столбцами:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Π—Π°Ρ‚Π΅ΠΌ я ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚ΠΈΡ€ΡƒΡŽ ΠΈΡ… Π² pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Π’Π΅ΠΏΠ΅Ρ€ΡŒ я создам ΠΏΠΎΡ‚ΠΎΠΊ Π²Ρ‹Π²ΠΎΠ΄Π°, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΈΡΠ°Ρ‚ΡŒ Π² ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΈΠ²Π½ΡƒΡŽ ΠΏΠ°ΠΌΡΡ‚ΡŒ ΠΈ создам StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Π—Π°Ρ‚Π΅ΠΌ ΠΌΡ‹ запишСм 1024 Ρ‡Π°Π½ΠΊΠ°, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π² ΠΈΡ‚ΠΎΠ³Π΅ составят 1Π“Π± Π½Π°Π±ΠΎΡ€Π° Π΄Π°Π½Π½Ρ‹Ρ…:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΌΡ‹ писали Π² ΠžΠ—Π£, Ρ‚ΠΎ вСсь ΠΏΠΎΡ‚ΠΎΠΊ ΠΌΡ‹ смоТСм ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ Π² ΠΎΠ΄Π½ΠΎΠΌ Π±ΡƒΡ„Π΅Ρ€Π΅:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ эти Π΄Π°Π½Π½Ρ‹Π΅ находятся Π² памяти, считываниС ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ² записСй Arrow получаСтся zero-copy ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠ΅ΠΉ. Π― ΠΎΡ‚ΠΊΡ€Ρ‹Π²Π°ΡŽ StreamReader, ΡΡ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽ Π΄Π°Π½Π½Ρ‹Π΅ Π² pyarrow.Table, Π° Π·Π°Ρ‚Π΅ΠΌ ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚ΠΈΡ€ΡƒΡŽ ΠΈΡ… Π² DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

ВсС это, ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎ, Ρ…ΠΎΡ€ΠΎΡˆΠΎ, Π½ΠΎ Ρƒ вас ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ вопросы. Как быстро это происходит? Как Ρ€Π°Π·ΠΌΠ΅Ρ€ Ρ‡Π°Π½ΠΊΠ° влияСт Π½Π° ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΡΡ‚ΡŒ получСния DataFrame pandas?

ΠŸΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΡΡ‚ΡŒ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡ΠΈ Π΄Π°Π½Π½Ρ‹Ρ…

По ΠΌΠ΅Ρ€Π΅ ΡƒΠΌΠ΅Π½ΡŒΡˆΠ΅Π½ΠΈΡ Ρ€Π°Π·ΠΌΠ΅Ρ€Π° Ρ‡Π°Π½ΠΊΠ° ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²ΠΎΠΉ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡ΠΈ ΡΡ‚ΠΎΠΈΠΌΠΎΡΡ‚ΡŒ рСконструкции Π½Π΅ΠΏΡ€Π΅Ρ€Ρ‹Π²Π½ΠΎΠ³ΠΎ столбчатого ΠΊΠ°Π΄Ρ€Π° DataFrame Π² pandas возрастаСт ΠΈΠ·-Π·Π° нСэффСктивных схСм доступа ΠΊ кэш-памяти. Π‘ΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‚ Ρ‚Π°ΠΊΠΆΠ΅ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π½Π°ΠΊΠ»Π°Π΄Π½Ρ‹Π΅ расходы ΠΎΡ‚ Ρ€Π°Π±ΠΎΡ‚Ρ‹ со структурами Π΄Π°Π½Π½Ρ‹Ρ… C++ ΠΈ массивами ΠΈ ΠΈΡ… Π±ΡƒΡ„Π΅Ρ€Π°ΠΌΠΈ памяти.

Для 1 Мб, ΠΊΠ°ΠΊ ΡƒΠΊΠ°Π·Π°Π½ΠΎ Π²Ρ‹ΡˆΠ΅, Π½Π° ΠΌΠΎΠ΅ΠΌ Π½ΠΎΡƒΡ‚Π±ΡƒΠΊΠ΅ (Quad-core Xeon E3-1505M) получаСтся:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

ΠŸΠΎΠ»ΡƒΡ‡Π°Π΅Ρ‚ΡΡ, Ρ‡Ρ‚ΠΎ эффСктивная пропускная ΡΠΏΠΎΡΠΎΠ±Π½ΠΎΡΡ‚ΡŒ β€” 7.75 Π“Π±/с для восстановлСния DataFrame объСмом 1Π“Π± ΠΈΠ· 1024 Ρ‡Π°Π½ΠΊΠΎΠ² ΠΏΠΎ 1Мб. Π§Ρ‚ΠΎ происходит, Ссли ΠΌΡ‹ Π±ΡƒΠ΄Π΅ΠΌ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ Ρ‡Π°Π½ΠΊΠΈ большСго ΠΈΠ»ΠΈ мСньшСго Ρ€Π°Π·ΠΌΠ΅Ρ€Π°? Π’ΠΎΡ‚ Ρ‚Π°ΠΊΠΈΠ΅ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ получатся:

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Apache Arrow

ΠŸΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΡΡ‚ΡŒ сущСствСнно сниТаСтся с 256K Π΄ΠΎ 64K Ρ‡Π°Π½ΠΊΠΎΠ². МСня ΡƒΠ΄ΠΈΠ²ΠΈΠ»ΠΎ, Ρ‡Ρ‚ΠΎ Ρ‡Π°Π½ΠΊΠΈ Ρ€Π°Π·ΠΌΠ΅Ρ€ΠΎΠΌ 1 Мб ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π»ΠΈΡΡŒ быстрСС, Ρ‡Π΅ΠΌ 16 Мб. Π‘Ρ‚ΠΎΠΈΡ‚ провСсти Π±ΠΎΠ»Π΅Π΅ Ρ‚Ρ‰Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎΠ΅ исслСдованиС ΠΈ ΠΏΠΎΠ½ΡΡ‚ΡŒ, являСтся Π»ΠΈ это Π½ΠΎΡ€ΠΌΠ°Π»ΡŒΠ½Ρ‹ΠΌ распрСдСлСниСм ΠΈΠ»ΠΈ Ρ‚ΡƒΡ‚ влияСт Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ Π΅Ρ‰Π΅.

Π’ Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΉ Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π° Π΄Π°Π½Π½Ρ‹Π΅ Π½Π΅ ΡΠΆΠΈΠΌΠ°ΡŽΡ‚ΡΡ Π² ΠΏΡ€ΠΈΠ½Ρ†ΠΈΠΏΠ΅, поэтому Ρ€Π°Π·ΠΌΠ΅Ρ€ Π² памяти ΠΈ Β«Π² ΠΏΡ€ΠΎΠ²ΠΎΠ΄Π°Ρ…Β» ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π½ΠΎ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²Ρ‹ΠΉ. Π’ Π±ΡƒΠ΄ΡƒΡ‰Π΅ΠΌ сТатиС ΠΌΠΎΠΆΠ΅Ρ‚ ΡΡ‚Π°Ρ‚ΡŒ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠΉ ΠΎΠΏΡ†ΠΈΠ΅ΠΉ.

Π˜Ρ‚ΠΎΠ³

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ²Π°Ρ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡Π° ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… ΠΌΠΎΠΆΠ΅Ρ‚ ΠΎΠΊΠ°Π·Π°Ρ‚ΡŒΡΡ эффСктивным способом ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‡ΠΈ Π±ΠΎΠ»ΡŒΡˆΠΈΡ… Π½Π°Π±ΠΎΡ€ΠΎΠ² Π΄Π°Π½Π½Ρ‹Ρ… Π² ΠΊΠΎΠ»ΠΎΠ½ΠΎΡ‡Π½Ρ‹Π΅ аналитичСскиС инструмСнты, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ Π² pandas, с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Π½Π΅Π±ΠΎΠ»ΡŒΡˆΠΈΡ… Ρ‡Π°Π½ΠΊΠΎΠ². Π‘Π»ΡƒΠΆΠ±Ρ‹ Π΄Π°Π½Π½Ρ‹Ρ…, ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‰ΠΈΠ΅ Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅, ΠΎΡ€ΠΈΠ΅Π½Ρ‚ΠΈΡ€ΠΎΠ²Π°Π½Π½ΠΎΠ΅ Π½Π° строки, ΠΌΠΎΠ³ΡƒΡ‚ ΠΏΠ΅Ρ€Π΅Π΄Π°Π²Π°Ρ‚ΡŒ ΠΈ Ρ‚Ρ€Π°Π½ΡΠΏΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ нСбольшиС Ρ‡Π°Π½ΠΊΠΈ Π΄Π°Π½Π½Ρ‹Ρ…, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΠΎΠ»Π΅Π΅ ΡƒΠ΄ΠΎΠ±Π½Ρ‹ для кэша L2 ΠΈ L3 вашСго процСссора.

ΠŸΠΎΠ»Π½Ρ‹ΠΉ ΠΊΠΎΠ΄

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Π˜ΡΡ‚ΠΎΡ‡Π½ΠΈΠΊ: habr.com