Kolonnu datu straumēšana, izmantojot Apache bultiņu

Raksta tulkojums tika sagatavots speciāli kursa studentiem Datu inženieris.

Kolonnu datu straumēšana, izmantojot Apache bultiņu

Pēdējo nedēļu laikā mēs Nong Li pievienots Apache bultiņa binārās straumes formātā, papildinot jau esošo brīvpiekļuves/IPC faila formātu. Mums ir Java un C++ implementācijas un Python saistījumi. Šajā rakstā es paskaidrošu, kā darbojas formāts, un parādīšu, kā pandas DataFrame var sasniegt ļoti augstu datu caurlaidspēju.

Straumēšanas kolonnas dati

Izplatīts jautājums, ko saņemu no Arrow lietotājiem, ir lielas tabulas datu kopu migrēšanas izmaksas no rindu vai ierakstu formāta uz kolonnu formātu. Vairāku gigabaitu datu kopām transponēšana atmiņā vai diskā var būt milzīga.

Datu straumēšanai neatkarīgi no tā, vai avota dati ir rinda vai kolonna, viena iespēja ir nosūtīt nelielas rindu partijas, katrai no kurām ir iekšējais kolonnu izkārtojums.

Programmā Apache Arrow atmiņā esošo kolonnu masīvu kolekciju, kas attēlo tabulas daļu, sauc par ierakstu partiju. Lai attēlotu vienu loģiskās tabulas datu struktūru, varat apkopot vairākas ierakstu kopas.

Esošajā "brīvpiekļuves" faila formātā mēs faila beigās rakstām metadatus, kas satur tabulas shēmu un bloku izkārtojumu, kas ļauj ļoti lēti atlasīt jebkuru ierakstu partiju vai jebkuru kolonnu no datu kopas. Straumēšanas formātā mēs nosūtām virkni ziņojumu: shēmu un pēc tam vienu vai vairākas ierakstu grupas.

Dažādie formāti izskatās apmēram šādi:

Kolonnu datu straumēšana, izmantojot Apache bultiņu

Datu straumēšana PyArrow: lietojumprogramma

Lai parādītu, kā tas darbojas, es izveidošu datu kopas piemēru, kas attēlo vienu straumes daļu:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Tagad pieņemsim, ka mēs vēlamies ierakstīt 1 GB datu, kas sastāv no 1 MB gabaliem, kopā 1024 datus. Vispirms izveidosim pirmo 1 MB datu rāmi ar 16 kolonnām:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Tad es tos pārvēršu par pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Tagad es izveidošu izvades straumi, kas rakstīs RAM un izveidos StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Pēc tam mēs ierakstīsim 1024 gabalus, kas galu galā veidos 1 GB datu kopu:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Tā kā mēs rakstījām RAM, mēs varam iegūt visu straumi vienā buferī:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Tā kā šie dati atrodas atmiņā, Arrow ierakstu partiju nolasīšana tiek iegūta ar nulles kopiju. Es atveru StreamReader, nolasu datus pyarrow.Tableun pēc tam pārvērst tos par DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tas viss, protams, ir labi, taču jums var būt jautājumi. Cik ātri tas notiek? Kā gabala lielums ietekmē pandas DataFrame izguves veiktspēju?

Straumēšanas veiktspēja

Samazinoties straumēšanas gabala lielumam, neefektīvu kešatmiņas piekļuves shēmu dēļ palielinās blakus esošā kolonnu datu rāmja atjaunošanas izmaksas pandās. Darbs ar C++ datu struktūrām un masīviem un to atmiņas buferiem ir arī nedaudz papildu.

Par 1 MB, kā norādīts iepriekš, manā klēpjdatorā (četrkodolu Xeon E3-1505M) izrādās:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Izrādās, ka efektīva caurlaidspēja ir 7.75 Gb / s, lai atjaunotu 1 GB DataFrame no 1024 1 MB gabaliem. Kas notiek, ja izmantosim lielākus vai mazākus gabalus? Šeit ir iegūtie rezultāti:

Kolonnu datu straumēšana, izmantojot Apache bultiņu

Veiktspēja ievērojami samazinās no 256 64 līdz 1 16 gabaliem. Es biju pārsteigts, ka XNUMX MB gabali tika apstrādāti ātrāk nekā XNUMX MB gabali. Ir vērts veikt rūpīgāku izpēti un saprast, vai tas ir normāls sadalījums, vai tas ir saistīts ar kaut ko citu.

Pašreizējā formāta ieviešanā dati principā netiek saspiesti, tāpēc izmērs atmiņā un "uz vada" ir aptuveni vienāds. Saspiešana var kļūt par iespēju nākotnē.

Kopsavilkums

Kolonnu datu straumēšana var būt efektīvs veids, kā pārsūtīt lielas datu kopas uz kolonnu analīzes rīkiem, piemēram, pandām mazos gabalos. Datu pakalpojumi, kas izmanto uz rindām orientētu krātuvi, var pārsūtīt un transponēt nelielas datu daļas, kas ir ērtākas jūsu procesora L2 un L3 kešatmiņai.

Pilns kods

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Avots: www.habr.com

Pievieno komentāru