Потокова передача колонкових даних за допомогою Apache Arrow

Переклад статті підготовлений спеціально для студентів курсу "Data Engineer".

Потокова передача колонкових даних за допомогою Apache Arrow

За останні кілька тижнів ми з Нонг Лі додали в Стрілка Apache бінарний потоковий формат, доповнивши вже існуючий формат файлів random access/IPC. У нас є реалізація на Java і C++ і прив'язки Python. У цій статті я розповім, як працює формат та покажу, як можна досягти дуже високої пропускної спроможності даних для DataFrame pandas.

Потокова передача колонкових даних

Поширене питання, яке я отримую від користувачів Arrow, — це питання про високу вартість перенесення великих наборів табличних даних із формату, орієнтованого на рядки або записи до колонкового формату. Для багатогігабайтних датасетів транспонування в пам'яті або диску може виявитися непосильним завданням.

Для потокової передачі даних, незалежно від того, є вихідні дані рядковими або колонковими, одним з варіантів залишається відправка невеликих пакетів рядків, кожен з яких всередині містить компонування по стовпцях.

В Apache Arrow колекція колонкових масивів у пам'яті, що представляє чанк таблиці, називається пакетом записів (record batch). Щоб уявити єдину структуру даних логічної таблиці можна зібрати кілька пакетів записів.

У існуючому форматі файлів «random access» ми записуємо метадані, що містять схему таблиці та розташування блоків наприкінці файлу, що дозволяє вам вкрай дешево вибирати будь-який пакет записів або стовпець з набору даних. У потоковому форматі ми надсилаємо серію повідомлень: схему, а потім один чи кілька пакетів записів.

Різні формати виглядають приблизно так, як представлено на цьому малюнку:

Потокова передача колонкових даних за допомогою Apache Arrow

Потокова передача даних у PyArrow: застосування

Щоб показати вам як це працює, я створю приклад датасета, що представляє один потоковий чанк:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Тепер, припустимо, що ми хочемо записати 1 Гб даних, що складаються з чанків розміром 1 Мб кожен, разом 1024 чанки. Для початку створимо перший кадр даних розміром 1 Мб з 16 стовпцями:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Потім я конвертую їх у pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Тепер я створюю потік висновку, який писатиме в оперативну пам'ять та створюю StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Потім ми запишемо 1024 чанки, які в результаті складуть 1Гб набору даних:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Оскільки ми писали в ОЗУ, весь потік ми зможемо отримати в одному буфері:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Оскільки ці дані знаходяться в пам'яті, зчитування пакетів Arrow виходить zero-copy операцією. Я відкриваю StreamReader, зчитую дані у pyarrow.Table, а потім конвертую їх у DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Все це, звичайно, добре, але у вас можуть виникнути запитання. Як швидко це відбувається? Як розмір чанки впливає на продуктивність отримання DataFrame pandas?

Продуктивність потокової передачі даних

У міру зменшення розміру чанка потокової передачі вартість реконструкції безперервного стовпчастого кадру DataFrame у pandas зростає через неефективні схеми доступу до кеш-пам'яті. Існують також деякі накладні витрати від роботи зі структурами даних C++ та масивами та їх буферами пам'яті.

Для 1 Мб, як зазначено вище, на моєму ноутбуці (Quad-core Xeon E3-1505M) виходить:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Виходить, що ефективна пропускна здатність - 7.75 Гб/с для відновлення DataFrame об'ємом 1Гб із 1024 чанків по 1Мб. Що відбувається, якщо ми використовуватимемо чанки більшого чи меншого розміру? Ось такі результати вийдуть:

Потокова передача колонкових даних за допомогою Apache Arrow

Продуктивність суттєво знижується з 256K до 64K чанків. Мене здивувало, що чанки розміром 1 Мб оброблялися швидше ніж 16 Мб. Варто провести ретельніше дослідження і зрозуміти, чи це нормальним розподілом чи тут впливає щось ще.

У поточній реалізації формату дані не стискаються в принципі, тому розмір у пам'яті та "у проводах" приблизно однаковий. У майбутньому стиск може стати додатковою опцією.

Підсумок

Потокова передача колонкових даних може виявитися ефективним способом передачі великих наборів даних колонкові аналітичні інструменти, наприклад в pandas, за допомогою невеликих чанків. Служби даних, що використовують сховище, орієнтоване на рядки, можуть передавати та транспонувати невеликі чанки даних, які зручніші для кешу L2 та L3 вашого процесора.

Повний код

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Джерело: habr.com

Додати коментар або відгук