Streaming kolomgegevens mei Apache Arrow

De oersetting fan it artikel waard spesifyk taret foar de learlingen fan 'e kursus Data Engineer.

Streaming kolomgegevens mei Apache Arrow

De ôfrûne wiken hawwe wy Nong Li tafoege oan Apache Arrow binêr streamformaat, oanfolling fan it al besteande willekeurige tagong / IPC-bestânformaat. Wy hawwe Java en C ++ ymplemintaasjes en Python bindingen. Yn dit artikel sil ik útlizze hoe't it formaat wurket en lit sjen hoe't jo heul hege gegevenstrochput kinne berikke foar in pandas DataFrame.

Streaming kolomgegevens

In mienskiplike fraach dy't ik krij fan Arrow-brûkers is de hege kosten fan it migrearjen fan grutte tabular datasets fan in rige- of record-rjochte opmaak nei in kolomformaat. Foar multi-gigabyte datasets kin transponearje yn it ûnthâld of op skiif oerweldigjend wêze.

Foar streaminggegevens, of de boarnegegevens rigel of kolom binne, is ien opsje om lytse batches fan rigen te stjoeren, elk mei in kolomyndieling yntern.

Yn Apache Arrow wurdt in samling yn-ûnthâld-kolomêre arrays dy't in tabelblok fertsjintwurdigje, in rekordbatch neamd. Om in inkele gegevensstruktuer fan in logyske tabel te fertsjintwurdigjen, kinne jo ferskate pakketten fan records sammelje.

Yn it besteande "willekeurige tagong" bestânsformaat skriuwe wy metadata dy't it tabelskema en blokyndieling befetsje oan 'e ein fan' e triem, wêrtroch jo elke batch fan records of elke kolom út 'e dataset tige goedkeap kinne selektearje. Yn in streamingformaat stjoere wy in searje berjochten: in skema, en dan ien of mear batches fan records.

De ferskillende formaten sjogge der sa út as dizze foto:

Streaming kolomgegevens mei Apache Arrow

Streaming gegevens yn PyArrow: applikaasje

Om jo sjen te litten hoe't it wurket, sil ik in foarbylddataset meitsje dy't in inkele stream-chunk fertsjintwurdiget:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Stel no dat wy 1 GB oan gegevens wolle skriuwe, besteande út 1 MB stikken elk, foar in totaal fan 1024 brokken. Litte wy earst it earste 1MB gegevensframe meitsje mei 16 kolommen:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Dan konvertearje ik se nei pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

No sil ik in útfierstream meitsje dy't sil skriuwe nei RAM en meitsje StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Dan sille wy 1024 brokken skriuwe, dy't úteinlik in 1GB dataset sille meitsje:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Sûnt wy yn RAM skreaun hawwe, kinne wy ​​​​de heule stream yn ien buffer krije:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Om't dizze gegevens yn it ûnthâld binne, wurdt it lêzen fan batches fan Arrow-records krigen troch in operaasje mei nulkopy. Ik iepenje StreamReader, lês gegevens yn pyarrow.Tableen dan konvertearje se nei DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Dit alles is fansels goed, mar jo kinne fragen hawwe. Hoe fluch bart it? Hoe beynfloedet chunkgrutte de prestaasjes fan panda's DataFrame opheljen?

Streaming prestaasjes

As de grutte fan 'e streaming-chunk ôfnimt, ferheegje de kosten foar it rekonstruearjen fan in oansletten kolomêr DataFrame yn panda's fanwege ineffisjinte cache-tagongsskema's. D'r is ek wat overhead fan it wurkjen mei C ++ gegevensstruktueren en arrays en har ûnthâldbuffers.

Foar 1 MB lykas hjirboppe, op myn laptop (Quad-core Xeon E3-1505M) docht bliken:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

It docht bliken dat de effektive trochfier is 7.75 Gb / s foar it herstellen fan in 1 GB DataFrame út 1024 1 MB brokken. Wat bart der as wy gruttere of lytsere brokken brûke? Hjir binne de resultaten dy't jo krije:

Streaming kolomgegevens mei Apache Arrow

Prestaasje sakket signifikant fan 256K nei 64K brokken. Ik wie ferrast dat 1MB-brokken rapper waarden ferwurke dan 16MB-brokken. It is de muoite wurdich dwaan in mear yngeande stúdzje en begripe oft dit is in normale ferdieling of wat oars is belutsen.

Yn 'e hjoeddeistige útfiering fan it formaat wurde de gegevens yn prinsipe net komprimearre, sadat de grutte yn' e ûnthâld en "op 'e draad" sawat itselde is. Kompresje kin yn 'e takomst in opsje wurde.

It resultaat

Streaming fan kolomgegevens kin in effisjinte manier wêze om grutte datasets oer te bringen nei ark foar kolomanalyse lykas panda's yn lytse brokken. Gegevenstsjinsten dy't rige-oriïntearre opslach brûke kinne lytse stikken gegevens oerdrage en transponearje dy't handiger binne foar de L2- en L3-cache fan jo prosessor.

Folsleine koade

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Boarne: www.habr.com

Add a comment