Streaming tad-dejta tal-kolonna b'Apache Arrow

It-traduzzjoni tal-artiklu tħejjiet speċifikament għall-istudenti tal-kors Inġinier tad-Data.

Streaming tad-dejta tal-kolonna b'Apache Arrow

Matul l-aħħar ġimgħat għandna Nong Li miżjud mal Vleġġa Apache format ta 'streaming binarju, li jikkumplimenta l-format tal-fajl eżistenti ta' aċċess każwali/IPC. Għandna implimentazzjonijiet Java u C++ u rbit Python. F'dan l-artikolu, ser nispjega kif jaħdem il-format u nuri kif tista' tikseb throughput għoli ħafna tad-dejta għal pandas DataFrame.

Streaming tad-Dejta tal-Kolonna

Mistoqsija komuni li nirċievi mingħand l-utenti Arrow hija l-ispiża għolja tal-migrazzjoni ta' settijiet kbar ta' dejta tabulari minn format orjentat lejn ringiela jew rekord għal format orjentat lejn kolonna. Għal settijiet ta' dejta b'ħafna gigabyte, it-traspożizzjoni fil-memorja jew fuq id-diska tista' tkun biċċa xogħol kbira.

Biex tistrimja d-dejta, kemm jekk id-dejta tas-sors hija ringiela jew kolonna, għażla waħda hija li tibgħat lottijiet żgħar ta 'ringieli, kull wieħed ikun fih tqassim ta' kolonni ġewwa.

F'Apache Arrow, il-ġbir ta 'arrays ta' kolonni fil-memorja li jirrappreżentaw biċċa tabella tissejjaħ lott ta 'rekords. Biex tirrappreżenta struttura ta' data waħda ta' tabella loġika, jistgħu jinġabru diversi lottijiet ta' rekords.

Fil-format eżistenti tal-fajl "aċċess każwali", aħna nirreġistraw metadejta li fiha l-iskema tat-tabella u l-postijiet tal-blokki fl-aħħar tal-fajl, li jippermettilek tagħżel b'mod estremament irħis kwalunkwe lott ta 'rekords jew kwalunkwe kolonna minn sett ta' dejta. F'format ta 'streaming, aħna nibagħtu serje ta' messaġġi: kontorn, u mbagħad lott wieħed jew aktar ta 'rekords.

Il-formati differenti jidhru xi ħaġa bħal din:

Streaming tad-dejta tal-kolonna b'Apache Arrow

Streaming tad-Dejta f'PyArrow: Applikazzjoni

Biex nurik kif jaħdem dan, ser noħloq sett ta' dejta eżempju li jirrappreżenta biċċa fluss wieħed:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Issa, ejja ngħidu li rridu niktbu 1 GB ta 'dejta, li jikkonsistu f'biċċiet ta' 1 MB kull wieħed, għal total ta '1024 biċċa. Biex tibda, ejja noħolqu l-ewwel qafas tad-dejta ta’ 1 MB b’16-il kolonna:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Imbagħad nikkonvertihom għal pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Issa se noħloq fluss ta 'output li se jikteb lil RAM u joħloq StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Imbagħad se niktbu 1024 biċċa, li fl-aħħar mill-aħħar se jammontaw għal sett ta 'dejta ta' 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Peress li ktibna lil RAM, nistgħu nġibu n-nixxiegħa kollha f'buffer wieħed:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Peress li din id-dejta tinsab fil-memorja, il-qari ta 'lottijiet ta' rekords Arrow hija operazzjoni ta 'żero kopja. Niftaħ StreamReader, naqra d-dejta pyarrow.Table, u mbagħad jaqilbuhom għal DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Dan kollu huwa, ovvjament, tajjeb, imma jista 'jkollok mistoqsijiet. Kemm jiġri malajr dan? Kif id-daqs tal-biċċiet jaffettwa l-prestazzjoni tal-irkupru tad-DataFrame tal-pandas?

Prestazzjoni Streaming

Hekk kif id-daqs tal-biċċa tal-istreaming jonqos, l-ispiża tar-rikostruzzjoni ta 'DataFrame kolonni kontigwu fil-pandas tiżdied minħabba mudelli ineffiċjenti ta' aċċess għall-cache. Hemm ukoll xi overhead minn ħidma ma 'strutturi ta' data C++ u arrays u l-buffers tal-memorja tagħhom.

Għal 1 MB, bħal hawn fuq, fuq il-laptop tiegħi (Quad-core Xeon E3-1505M) jirriżulta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Jirriżulta li l-fluss effettiv huwa ta '7.75 GB/s biex jirrestawra DataFrame ta' 1GB minn biċċiet ta '1024 1MB. X'jiġri jekk nużaw biċċiet akbar jew iżgħar? Dawn huma r-riżultati:

Streaming tad-dejta tal-kolonna b'Apache Arrow

Il-prestazzjoni tinżel b'mod sinifikanti minn biċċiet 256K għal 64K. Kont sorpriż li biċċiet 1 MB ġew ipproċessati aktar malajr minn biċċiet 16 MB. Ta’ min iwettaq studju aktar bir-reqqa u nifhem jekk din hijiex distribuzzjoni normali jew jekk hemmx xi ħaġa oħra.

Fl-implimentazzjoni attwali tal-format, id-dejta mhijiex ikkompressata fil-prinċipju, għalhekk id-daqs fil-memorja u "fil-wajers" huwa bejn wieħed u ieħor l-istess. Fil-futur, il-kompressjoni tista 'ssir għażla addizzjonali.

Total

L-istrimming tad-dejta tal-kolonni jista’ jkun mod effettiv biex jitimgħu settijiet kbar ta’ dejta f’għodod analitiċi tal-kolonni bħal pandas f’biċċiet żgħar. Is-servizzi tad-dejta li jużaw ħażna orjentata lejn ir-ringieli jistgħu jittrasferixxu u jittrasponu biċċiet żgħar ta 'dejta li huma aktar konvenjenti għall-caches L2 u L3 tal-proċessur tiegħek.

Kodiċi sħiħ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Sors: www.habr.com

Żid kumment