Daneyên stûnê bi Apache Arrow ve diherike

Wergera gotarê bi taybetî ji bo xwendekarên qursê hatiye amadekirin "Endezyar Daneyên".

Daneyên stûnê bi Apache Arrow ve diherike

Di van çend hefteyên borî de em hene Nong Li lê zêde kirin Apache Arrow formata weşana binaryê, ku forma pelê ya gihîştina random/IPC ya heyî temam dike. Me pêkanînên Java û C++ û girêdanên Python hene. Di vê gotarê de, ez ê rave bikim ka format çawa dixebite û destnîşan bikim ka hûn çawa dikarin ji bo DataFrame-ya pandayan gihandina daneya pir bilind bi dest bixin.

Streaming Daneyên Stûnê

Pirseke hevpar a ku ez ji bikarhênerên Arrow distînim lêçûna zêde ya koçkirina komên mezin ên daneyên tabloyê ji formatek rêz-an-an-koordar berbi formatek stûn-oriented e. Ji bo danûstendinên pir-gigabyte, veguheztina di bîranînê de an li ser dîskê dikare bibe karekî giran.

Ji bo guheztina daneyan, ka daneya çavkaniyê rêz be an stûn be, vebijarkek ev e ku hûn komên piçûk ên rêzan bişînin, ku her yek di hundurê de xêzek stûnek heye.

Di Apache Arrow de, berhevoka rêzikên stûnên nav-bîrê yên ku perçeyek tabloyê temsîl dikin jê re komek tomar tê gotin. Ji bo temsîlkirina avahiyek daneya yekane ya tabloyek mentiqî, çend beşên tomar dikarin werin berhev kirin.

Di forma pelê ya "gihîştina tesadufî" ya heyî de, em metadaneyên ku şemaya tabloyê vedihewîne tomar dikin û cîhên li dawiya pelê bloke dikin, dihêlin hûn bi erzanî ji komek tomar an stûnek ji komek daneyê hilbijêrin. Di formata weşanê de, em rêzek peyaman dişînin: xêzek, û dûv re yek an jî çend komek tomar.

Formatên cûda mîna vê xuya dikin:

Daneyên stûnê bi Apache Arrow ve diherike

Streaming Daneyên li PyArrow: Serlêdan

Ji bo ku nîşanî we bidim ka ev çawa dixebite, ez ê danehevek mînakek ku perçeyek yekane temsîl dike biafirînim:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Naha, em bibêjin ku em dixwazin 1 GB daneyan binivîsin, ku her yek ji perçeyên 1 MB pêk tê, bi tevahî 1024 parçe. Ji bo destpêkirinê, bila em çarçoweya daneya yekem a 1 MB bi 16 stûnan biafirînin:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Paşê ez wan vediguherînim pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Naha ez ê diherikek encamekê biafirînim ku dê li RAM-ê binivîsîne û biafirîne StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Dûv re em ê 1024 perçeyan binivîsin, ku dê di dawiyê de bigihîje komek daneya 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Ji ber ku me ji RAM-ê re nivîsand, em dikarin tevahiyê di yek tamponê de bistînin:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Ji ber ku ev dane di bîranînê de ye, xwendina komikên tomarên Arrow operasyonek sifir-kopî ye. Ez StreamReader vedikim, daneyan tê de dixwînim pyarrow.Table, û paşê wan veguherînin DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Hemî ev, bê guman, baş e, lê dibe ku pirsên we hebin. Ev çiqas zû dibe? Mezinahiya perçeyê çawa bandorê li ser performansa hilanîna DataFrame ya panda dike?

Performansa Streaming

Her ku mezinahiya perçeya tîrêjê kêm dibe, lêçûna ji nû veavakirina DataFrame-ya stûnek a hevgirtî di pandayan de ji ber qalibên gihîştina cache-ya bêbandor zêde dibe. Di heman demê de ji xebata bi strukturên daneya C++ û array û tamponên bîranîna wan re hin serf jî heye.

Ji bo 1 MB, wekî li jor, li ser laptopa min (Quad-core Xeon E3-1505M) derdikeve holê:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Derdikeve holê ku rêjeya bandorker 7.75 GB / s e ku 1 GB DataFrame ji 1024 perçeyên 1 MB vegerîne. Ger em perçeyên mezin an piçûktir bikar bînin çi dibe? Ev encam ev in:

Daneyên stûnê bi Apache Arrow ve diherike

Performansa bi girîngî ji 256K berbi 64K perçeyan dadikeve. Ez şaş bûm ku perçeyên 1 MB ji perçeyên 16 MB zûtir têne pêvajoyê kirin. Hêja ye ku lêkolînek berfirehtir were kirin û têgihîştin ka gelo ev belavkirinek normal e an tiştek din di lîstikê de heye.

Di pêkanîna heyî ya formatê de, dane di prensîbê de nayên qewirandin, ji ber vê yekê mezinahiya bîranîn û "di têlan" de bi qasî hev e. Di pêşerojê de, compression dibe ku bibe vebijarkek din.

Encam

Veguhastina daneya stûnî dikare rêyek bi bandor be da ku berhevokên daneya mezin di nav amûrên analîtîk ên stûnî de mîna pandayan di perçeyên piçûk de bihêle. Karûbarên daneyê yên ku hilanîna rêz-oriented bikar tînin dikarin perçeyên piçûk ên daneya ku ji bo kaşeyên L2 û L3 yên pêvajoya we rehettir in veguhezînin û veguhezînin.

Koda tevahî

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Add a comment