Apache Arrow көмегімен баған деректерін ағынмен жіберу

Мақаланың аудармасы курс студенттері үшін арнайы дайындалған Деректер инженері.

Apache Arrow көмегімен баған деректерін ағынмен жіберу

Соңғы бірнеше аптада біз Нонг Ли -ға қосылды Apache көрсеткі бар кездейсоқ қол жеткізу/IPC файл пішімін толықтыратын екілік ағын пішімі. Бізде Java және C++ іске асырулары және Python байланыстырулары бар. Бұл мақалада мен пішім қалай жұмыс істейтінін түсіндіремін және pandas DataFrame үшін өте жоғары деректер өткізу қабілетіне қалай қол жеткізуге болатынын көрсетемін.

Ағынды баған деректері

Arrow пайдаланушыларынан алатын жалпы сұрақ үлкен кестелік деректер жиынын жолға немесе жазбаға бағытталған пішімнен баған пішіміне көшірудің жоғары құны болып табылады. Көп гигабайттық деректер жиыны үшін жадта немесе дискіде транспозиция өте қиын болуы мүмкін.

Бастапқы деректер жол немесе баған болсын, ағынды деректер үшін, әрқайсысында ішкі баған орналасуы бар жолдардың шағын бумаларын жіберу опцияларының бірі болып табылады.

Apache Arrow бағдарламасында кесте бөлігін көрсететін жад ішіндегі бағаналы массивтердің жинағы жазбалар бумасы деп аталады. Логикалық кестенің бір деректер құрылымын көрсету үшін жазбалардың бірнеше бумасын жинауға болады.

Қолданыстағы «кездейсоқ қол жеткізу» файл пішімінде біз файлдың соңында кесте схемасы мен блок орналасуын қамтитын метадеректерді жазамыз, бұл жазбалардың кез келген партиясын немесе деректер жиынынан кез келген бағанды ​​өте арзан таңдауға мүмкіндік береді. Ағындық пішімде біз хабарлар сериясын жібереміз: схема, содан кейін жазбалардың бір немесе бірнеше топтамалары.

Түрлі пішімдер мына суретке ұқсайды:

Apache Arrow көмегімен баған деректерін ағынмен жіберу

PyArrow қолданбасында ағынды деректер: қолданба

Оның қалай жұмыс істейтінін көрсету үшін, мен бір ағындық бөлікті көрсететін мысал деректер жинағын жасаймын:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Енді әрқайсысы 1 МБ бөліктен тұратын 1 ГБ деректерді, барлығы 1024 бөлікке жазғымыз келеді делік. Алдымен, 1 бағаннан тұратын алғашқы 16МБ деректер кадрын жасайық:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Содан кейін мен оларды түрлендіремін pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Енді мен жедел жадқа жазатын және жасайтын шығыс ағынын жасаймын StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Содан кейін біз 1024 ГБ деректер жиынтығын құрайтын 1 бөлікті жазамыз:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Біз жедел жадта жазғандықтан, біз бүкіл ағынды бір буферде ала аламыз:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Бұл деректер жадта болғандықтан, Arrow жазбаларының оқу топтамалары нөлдік көшірме әрекеті арқылы алынады. Мен StreamReader бағдарламасын ашамын, деректерді оқимын pyarrow.Tableсодан кейін оларды түрлендіріңіз DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Мұның бәрі, әрине, жақсы, бірақ сізде сұрақтар туындауы мүмкін. Ол қаншалықты жылдам болады? Бөлшек өлшемі pandas DataFrame алу өнімділігіне қалай әсер етеді?

Ағындық өнімділік

Ағынды бөлік өлшемі азайған сайын, пандалардағы іргелес бағаналы DataFrame қайта құру құны кэшке кірудің тиімсіз схемаларына байланысты артады. Сондай-ақ, C++ деректер құрылымдарымен және массивтерімен және олардың жад буферлерімен жұмыс істеуге біраз шығындар бар.

Жоғарыдағыдай 1 МБ үшін менің ноутбугымда (төрт ядролы Xeon E3-1505M) мынандай болады:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75 ГБ DataFrame-ді 1 1024 МБ бөліктен қалпына келтіру үшін тиімді өткізу қабілеті 1 Гб/с болатыны белгілі болды. Үлкенірек немесе кішірек кесектерді пайдалансақ не болады? Міне, сіз алған нәтижелер:

Apache Arrow көмегімен баған деректерін ағынмен жіберу

Өнімділік 256K-дан 64K бөлікке дейін айтарлықтай төмендейді. Мен 1МБ бөліктердің 16МБ бөліктеріне қарағанда жылдам өңделгеніне таң қалдым. Толығырақ зерттеу жүргізіп, бұл қалыпты таралу ма, әлде басқа нәрсе бар ма екенін түсіну керек.

Пішімді ағымдағы енгізуде деректер негізінен қысылмайды, сондықтан жадтағы және «сымдағы» өлшем шамамен бірдей. Болашақта қысу опция болуы мүмкін.

Нәтиже

Баған деректерін ағынмен жіберу үлкен деректер жиынын шағын бөліктердегі пандалар сияқты бағандық талдау құралдарына тасымалдаудың тиімді жолы болуы мүмкін. Жолға бағытталған жадты пайдаланатын деректер қызметтері процессордың L2 және L3 кэштері үшін ыңғайлырақ деректердің шағын бөліктерін тасымалдай және ауыстыра алады.

Толық код

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Ақпарат көзі: www.habr.com

пікір қалдыру