Veeruandmete voogesitamine Apache Noolega

Artikli tõlge koostati spetsiaalselt kursuse üliõpilastele Andmeinsener.

Veeruandmete voogesitamine Apache Noolega

Viimase paari nädala jooksul oleme Nong Li lisatud Apache nool binaarne voogedastusvorming, mis täiendab olemasolevat juhusliku juurdepääsu/IPC failivormingut. Meil on Java ja C++ juurutused ning Pythoni sidumised. Selles artiklis selgitan vormingu toimimist ja näitan, kuidas saate panda DataFrame'i jaoks väga suure andmeedastuse saavutada.

Veeruandmete voogesitus

Arrow kasutajatelt küsitav tavaline küsimus on suurte tabeliandmete kogumite rea- või kirjeorienteeritud vormingust veerupõhisesse vormingusse üleviimise kõrge hind. Mitme gigabaidiste andmekogumite puhul võib mällu või kettale ümberpaigutamine olla üle jõu käiv ülesanne.

Andmete voogesitamiseks, olenemata sellest, kas lähteandmeteks on rida või veerg, on üks võimalus saata väikesed ridade komplektid, millest igaüks sisaldab veergude paigutust.

Apache Nooles nimetatakse tabelitükki esindavate mälusiseste veerumassiivide kogumit kirjepartiiks. Loogilise tabeli ühe andmestruktuuri esitamiseks saab koguda mitu kirjete partiid.

Olemasolevas "juhusliku juurdepääsu" failivormingus salvestame metaandmed, mis sisaldavad tabeli skeemi ja plokkide asukohti faili lõppu, võimaldades teil ülimalt odavalt valida andmekogust mis tahes kirjete partii või mis tahes veeru. Voogesituse vormingus saadame rea sõnumeid: ülevaate ja seejärel ühe või mitu kirjete partiid.

Erinevad vormingud näevad välja umbes sellised:

Veeruandmete voogesitamine Apache Noolega

Andmete voogesitamine PyArrowis: rakendus

Et näidata teile, kuidas see toimib, loon näidisandmestiku, mis esindab ühte voojuppi:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Oletame nüüd, et tahame kirjutada 1 GB andmeid, millest igaüks koosneb 1 MB suurustest tükkidest, kokku 1024 tükki. Alustuseks loome esimese 1 MB andmeraami 16 veeruga:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Seejärel teisendan need teiseks pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nüüd loon väljundvoo, mis kirjutab RAM-i ja loob StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Seejärel kirjutame 1024 tükki, mis lõpuks moodustab 1 GB andmestiku:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Kuna kirjutasime RAM-i, saame kogu voo ühte puhvris:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Kuna need andmed on mälus, on Arrow kirjete partiide lugemine nullkoopia toiming. Avan StreamReaderi, loen andmed sisse pyarrow.Tableja seejärel teisendage need järgmiseks DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Kõik see on muidugi hea, kuid teil võib tekkida küsimusi. Kui kiiresti see juhtub? Kuidas mõjutab tüki suurus pandade DataFrame'i otsingu jõudlust?

Voogesituse jõudlus

Voogesituse tüki suuruse vähenedes suurenevad pandades külgneva veerulise DataFrame'i rekonstrueerimise kulud ebatõhusate vahemälu juurdepääsumustrite tõttu. C++ andmestruktuuride ja -massiivide ning nende mälupuhvritega töötamisega kaasneb ka mõningane lisakulu.

1 MB jaoks, nagu ülal, selgub minu sülearvutis (neljatuumaline Xeon E3-1505M):

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Selgub, et efektiivne läbilaskevõime on 7.75 GB/s, et taastada 1 GB DataFrame 1024 1 MB tükkist. Mis juhtub, kui kasutame suuremaid või väiksemaid tükke? Need on tulemused:

Veeruandmete voogesitamine Apache Noolega

Jõudlus langeb märkimisväärselt, 256 64 tükki 1 16 tükki. Olin üllatunud, et XNUMX MB tükke töödeldi kiiremini kui XNUMX MB tükke. Tasub teha põhjalikum uuring ja mõista, kas tegemist on normaaljaotusega või on mängus midagi muud.

Vormingu praeguses teostuses andmeid põhimõtteliselt ei tihendata, seega on suurus mälus ja "juhtmetes" ligikaudu sama. Tulevikus võib tihendamine saada lisavõimaluseks.

Summaarne

Veerukujuliste andmete voogesitamine võib olla tõhus viis suurte andmekogumite sisestamiseks veergude analüüsitööriistadesse, nagu pandad väikeste tükkidena. Andmeteenused, mis kasutavad ridadele orienteeritud salvestusruumi, saavad edastada ja transponeerida väikeseid andmetükke, mis on teie protsessori L2- ja L3-vahemälu jaoks mugavamad.

Täielik kood

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Allikas: www.habr.com

Lisa kommentaar