Flui kolumnajn datumojn per Apache Arrow

La traduko de la artikolo estis preparita specife por la kursanoj Datuma Inĝeniero.

Flui kolumnajn datumojn per Apache Arrow

Dum la lastaj semajnoj ni havas Nong Li aldonita al Apaĉa Sago binara fluanta formato, kompletigante la ekzistantan hazardan aliron/IPC-dosierformaton. Ni havas efektivigojn Java kaj C++ kaj Python-ligojn. En ĉi tiu artikolo, mi klarigos kiel funkcias la formato kaj montros kiel vi povas atingi tre altan datuman trairon por panda DataFrame.

Streaming kolumno datumoj

Ofta demando, kiun mi ricevas de Arrow-uzantoj, estas la alta kosto de migrado de grandaj aroj da tabelaj datumoj de vico- aŭ rekord-orientita formato al kolon-orientita formato. Por plurgigabajtaj datumaroj, transponado en memoro aŭ sur disko povas esti superforta tasko.

Por flui datumojn, ĉu la fontaj datumoj estas vico aŭ kolumno, unu opcio estas sendi malgrandajn arojn da vicoj, ĉiu enhavante kolonaran aranĝon interne.

En Apache Arrow, la kolekto de en-memoraj kolumnaroj reprezentantaj tabelpecon estas nomita rekorda aro. Por reprezenti ununuran datumstrukturon de logika tabelo, pluraj aroj de rekordoj povas esti kolektitaj.

En la ekzistanta "hazarda aliro" dosierformato, ni registras metadatenojn enhavantajn la tabelskemon kaj blokajn lokojn ĉe la fino de la dosiero, permesante al vi ege malmultekoste elekti ajnan aron da rekordoj aŭ ajnan kolumnon el datuma aro. En streaming-formato, ni sendas serion da mesaĝoj: skizon, kaj poste unu aŭ pluraj aroj da rekordoj.

La malsamaj formatoj aspektas kiel ĉi tio:

Flui kolumnajn datumojn per Apache Arrow

Flui datumoj en PyArrow: aplikaĵo

Por montri al vi kiel ĉi tio funkcias, mi kreos ekzemplan datumaron reprezentantan ununuran fluon:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Nun, ni diru, ke ni volas skribi 1 GB da datumoj, konsistantaj el pecoj de 1 MB ĉiu, por entute 1024 pecoj. Por komenci, ni kreu la unuan datumkadron de 1 MB kun 16 kolumnoj:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Tiam mi konvertas ilin al pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nun mi kreos eligan fluon, kiu skribos al RAM kaj kreos StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Tiam ni skribos 1024 pecojn, kiuj finfine sumiĝos al 1GB-datumaro:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Ĉar ni skribis en RAM, ni povas ricevi la tutan rivereton en unu bufro:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Ĉar ĉi tiuj datumoj estas en memoro, legi arojn de Arrow-rekordoj estas nul-kopia operacio. Mi malfermas StreamReader, legas datumojn en pyarrow.Table, kaj poste konverti ilin al DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Ĉio ĉi estas, kompreneble, bona, sed vi eble havas demandojn. Kiom rapide ĉi tio okazas? Kiel la grandeco de peco influas la rendimenton de rehavigo de DataFrame de pandoj?

Streaming rendimento

Ĉar la disflua peco malpliiĝas, la kosto de rekonstruado de apuda kolona DataFrame en pandoj pliiĝas pro malefikaj kaŝmemoraj alirpadronoj. Estas ankaŭ iom da ŝarĝo de laborado kun C++-datumstrukturoj kaj tabeloj kaj iliaj memorbufroj.

Por 1 MB kiel supre, sur mia tekkomputilo (Kvarona Xeon E3-1505M) rezultas:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Montriĝas, ke la efika trairo estas 7.75 GB/s por restarigi 1GB DataFrame de 1024 1MB-partoj. Kio okazas se ni uzas pli grandajn aŭ pli malgrandajn pecojn? Ĉi tiuj estas la rezultoj:

Flui kolumnajn datumojn per Apache Arrow

Efikeco malpliiĝas signife de 256K al 64K pecoj. Mi surpriziĝis, ke 1 MB-pecoj estis prilaboritaj pli rapide ol 16 MB-pecoj. Indas fari pli profundan studon kaj kompreni ĉu ĉi tio estas normala distribuo aŭ ĉu estas io alia en ludo.

En la nuna efektivigo de la formato, la datumoj principe ne estas kunpremitaj, do la grandeco en memoro kaj "en la dratoj" estas proksimume la sama. Kunpremado povas fariĝi eblo en la estonteco.

La rezulto

Flui kolonajn datumojn povas esti efika maniero por nutri grandajn datumajn arojn en kolonajn analizajn ilojn kiel pandoj en malgrandaj pecoj. Datumservoj, kiuj uzas vico-orientitan stokadon, povas transdoni kaj transmeti malgrandajn partojn da datumoj, kiuj estas pli oportunaj por la kaŝmemoroj L2 kaj L3 de via procesoro.

Plena kodo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

fonto: www.habr.com

Aldoni komenton