Straumspilun dálkagagna með Apache Arrow

Þýðing greinarinnar var unnin sérstaklega fyrir nemendur námskeiðsins "gagnaverkfræðingur".

Straumspilun dálkagagna með Apache Arrow

Undanfarnar vikur höfum við Nong Li bætt við Apache ör tvöfaldur streymissnið, viðbót við núverandi slembiaðgang/IPC skráarsnið. Við erum með Java og C++ útfærslur og Python bindingar. Í þessari grein mun ég útskýra hvernig sniðið virkar og sýna hvernig þú getur náð mjög miklum gagnaflutningi fyrir Panda DataFrame.

Streymi dálkagögn

Algeng spurning sem ég fæ frá Arrow notendum er mikill kostnaður við að flytja stór sett af töflugögnum úr röð- eða færslumiðuðu sniði yfir í dálkamiðað snið. Fyrir margra gígabæta gagnasöfn getur yfirfærsla í minni eða á disk verið yfirþyrmandi verkefni.

Til að streyma gögnum, hvort sem upprunagögnin eru röð eða dálkur, er einn valkostur að senda litlar lotur af línum, sem hver inniheldur dálkaskipulag inni.

Í Apache Arrow er safn dálkafylkja í minni sem tákna töfluklump kallað færslulotu. Til að tákna eina gagnauppbyggingu rökréttrar töflu er hægt að safna nokkrum lotum af skrám.

Í núverandi "random access" skráarsniði, skráum við lýsigögn sem innihalda töfluskema og lokunarstaðsetningar í lok skráarinnar, sem gerir þér kleift að velja á mjög ódýran hátt hvaða lotu af færslum sem er eða hvaða dálk sem er úr gagnasetti. Í streymissniði sendum við röð skilaboða: útlínur og síðan eina eða fleiri færslur.

Mismunandi snið líta einhvern veginn svona út:

Straumspilun dálkagagna með Apache Arrow

Straumspilun gagna í PyArrow: Forrit

Til að sýna þér hvernig þetta virkar, mun ég búa til dæmi um gagnasafn sem táknar einn straumklump:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Segjum nú að við viljum skrifa 1 GB af gögnum, sem samanstanda af 1 MB bitum hvor, fyrir samtals 1024 bita. Til að byrja skulum við búa til fyrsta 1 MB gagnarammann með 16 dálkum:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Síðan breyti ég þeim í pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nú mun ég búa til úttaksstraum sem mun skrifa í vinnsluminni og búa til StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Síðan munum við skrifa 1024 klumpur, sem munu á endanum nema 1GB gagnasetti:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Þar sem við skrifuðum í vinnsluminni getum við fengið allan strauminn í einum biðminni:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Þar sem þessi gögn eru í minni er það að lesa runur af Arrow-skrám núllafritunaraðgerð. Ég opna StreamReader, les gögn inn pyarrow.Table, og umbreyttu þeim síðan í DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Allt þetta er auðvitað gott, en þú gætir haft spurningar. Hversu fljótt gerist þetta? Hvernig hefur klumpastærð áhrif á afköst Panda DataFrame endurheimtar?

Árangur á streymi

Eftir því sem straumklumpurinn minnkar, eykst kostnaður við að endurbyggja samliggjandi dálka DataFrame í pöndum vegna óhagkvæms skyndiminnisaðgangsmynsturs. Það er líka nokkur kostnaður við að vinna með C++ gagnabyggingu og fylki og minnisbuffum þeirra.

Fyrir 1 MB, eins og hér að ofan, á fartölvunni minni (Quad-core Xeon E3-1505M) kemur í ljós:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Það kemur í ljós að skilvirkt afköst er 7.75 GB/s til að endurheimta 1GB DataFrame úr 1024 1MB klumpur. Hvað gerist ef við notum stærri eða smærri bita? Þetta eru niðurstöðurnar:

Straumspilun dálkagagna með Apache Arrow

Afköst lækka verulega úr 256K í 64K klumpur. Það kom mér á óvart að 1 MB bitar voru unnar hraðar en 16 MB bitar. Það er þess virði að gera ítarlegri rannsókn og átta sig á því hvort um eðlilega dreifingu sé að ræða eða hvort eitthvað annað sé í gangi.

Í núverandi útfærslu sniðsins eru gögnin í grundvallaratriðum ekki þjappuð, þannig að stærðin í minni og „í vírunum“ er um það bil sú sama. Í framtíðinni gæti þjöppun orðið viðbótarvalkostur.

Samtals

Straumspilun dálkagagna getur verið áhrifarík leið til að fæða stór gagnasöfn í dálkagreiningartól eins og pöndur í litlum bitum. Gagnaþjónusta sem notar raðmiðaða geymslu getur flutt og yfirfært litla klumpa af gögnum sem eru þægilegri fyrir L2 og L3 skyndiminni örgjörvans þíns.

Fullur kóði

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Heimild: www.habr.com

Bæta við athugasemd