Suoratoistaa saraketietoja Apache-nuolen avulla

Artikkelin käännös on tehty erityisesti kurssin opiskelijoille Tietojen insinööri.

Suoratoistaa saraketietoja Apache-nuolen avulla

Viime viikkoina meillä on Nong Li lisätty Apache Arrow binäärisuoratoistomuoto, joka täydentää olemassa olevaa hajasaanti/IPC-tiedostomuotoa. Meillä on Java- ja C++-toteutukset sekä Python-sidokset. Tässä artikkelissa selitän, kuinka muoto toimii, ja näytän, kuinka voit saavuttaa erittäin suuren tiedonsiirron pandas DataFrame -kehykselle.

Suoratoisto saraketiedot

Arrow-käyttäjiltä saamani yleinen kysymys on suurten taulukkotietojen siirtämisen korkeat kustannukset rivi- tai tietuesuuntautuneesta muodosta sarakesuuntautuneeseen muotoon. Usean gigatavun tietojoukoissa transponointi muistiin tai levylle voi olla ylivoimainen tehtävä.

Datan suoratoistoa varten, olipa lähdetieto rivi tai sarake, yksi vaihtoehto on lähettää pieniä rivieriä, joista jokainen sisältää sarakeasettelun.

Apache Arrowssa taulukkopalaa edustavien muistissa olevien saraketaulukoiden kokoelmaa kutsutaan tietueeriksi. Loogisen taulukon yksittäisen tietorakenteen edustamiseksi voidaan kerätä useita tietueeriä.

Olemassa olevaan "random access" -tiedostomuotoon tallennamme metatiedot, jotka sisältävät taulukon skeeman ja lohkon sijainnit tiedoston loppuun, jolloin voit erittäin edullisesti valita minkä tahansa tietueerän tai minkä tahansa sarakkeen tietojoukosta. Suoratoistomuodossa lähetämme sarjan viestejä: luonnoksen ja sitten yhden tai useamman tietueerän.

Eri muodot näyttävät suunnilleen tältä:

Suoratoistaa saraketietoja Apache-nuolen avulla

Tietojen suoratoisto PyArrowissa: Sovellus

Näytän sinulle, miten tämä toimii, luomalla esimerkkitietojoukon, joka edustaa yhtä stream-osaa:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Oletetaan nyt, että haluamme kirjoittaa 1 Gt:n dataa, joka koostuu kukin 1 Mt:n paloista, yhteensä 1024 kappaletta. Aluksi luodaan ensimmäinen 1 Mt:n tietokehys, jossa on 16 saraketta:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Sitten muuntelen ne sellaisiksi pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nyt luon ulostulovirran, joka kirjoittaa RAM-muistiin ja luo StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Sitten kirjoitamme 1024 palaa, joka lopulta vastaa 1 Gt:n tietojoukkoa:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Koska kirjoitimme RAM-muistiin, voimme saada koko streamin yhteen puskuriin:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Koska nämä tiedot ovat muistissa, Arrow-tietueiden erien lukeminen on nollakopiointitoiminto. Avaan StreamReaderin, luen tiedot sisään pyarrow.Tableja muuntaa ne sitten muotoon DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Kaikki tämä on tietysti hyvä, mutta sinulla voi olla kysyttävää. Kuinka nopeasti tämä tapahtuu? Miten palan koko vaikuttaa pandan DataFrame-haun suorituskykyyn?

Suoratoiston suorituskyky

Kun suoratoistokappaleen koko pienenee, pandassa olevan peräkkäisen sarakepohjaisen DataFrame-kehyksen rekonstruoinnin kustannukset kasvavat tehottomien välimuistin käyttötapojen vuoksi. Myös C++-tietorakenteiden ja -taulukoiden sekä niiden muistipuskurien kanssa työskentely aiheuttaa jonkin verran lisäkustannuksia.

Yhdellä megatavulla, kuten yllä, kannettavassa tietokoneessani (neliytiminen Xeon E1-3M) käy:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Osoittautuu, että tehokas suoritusnopeus on 7.75 Gt/s palauttamaan 1 Gt DataFrame 1024:stä 1 Mt:n paloista. Mitä tapahtuu, jos käytämme suurempia tai pienempiä paloja? Nämä ovat tulokset:

Suoratoistaa saraketietoja Apache-nuolen avulla

Suorituskyky putoaa merkittävästi 256 64 kappaleesta 1 16 paloiksi. Olin yllättynyt, että XNUMX Mt:n palaset käsiteltiin nopeammin kuin XNUMX Mt:n palaset. Kannattaa tehdä perusteellisempi tutkimus ja ymmärtää, onko tämä normaalijakauma vai onko pelissä jotain muuta.

Formaatin nykyisessä toteutuksessa dataa ei periaatteessa pakata, joten koko muistissa ja "johdoissa" on suunnilleen sama. Jatkossa pakkaus voi olla lisävaihtoehto.

Koko

Saraketietojen suoratoisto voi olla tehokas tapa syöttää suuria tietojoukkoja sarakeanalytiikkatyökaluihin, kuten pandoihin pieninä paloina. Rivipohjaista tallennusta käyttävät tietopalvelut voivat siirtää ja transponoida pieniä tietopaloja, jotka sopivat paremmin prosessorisi L2- ja L3-välimuistiin.

Täysi koodi

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Lähde: will.com

Lisää kommentti