Tsamaisa lintlha tsa kholomo ka Apache Arrow

Phetolelo ea sehlooho sena e lokiselitsoe ka ho khetheha liithuti tsa thupelo Moenjiniere oa Boitsebiso.

Tsamaisa lintlha tsa kholomo ka Apache Arrow

Libekeng tse 'maloa tse fetileng re na le Nong Li ekeletsoa ho Motsu oa Apache binary streaming format, e tlatsanang le sebopeho sa faele sa IPC se teng se seng se ntse se le teng. Re na le ts'ebetsong ea Java le C ++ le litlamo tsa Python. Sehloohong sena, ke tla hlalosa hore na sebopeho se sebetsa joang le ho bonts'a hore na u ka fihlela ts'ebetso e phahameng haholo ea data bakeng sa pandas DataFrame.

Phallela Column Data

Potso e tloaelehileng eo ke e fumanang ho tsoa ho basebelisi ba Arrow ke litjeho tse phahameng tsa ho fallisa lihlopha tse kholo tsa data ea tabula ho tloha moleng-kapa sebopeho se shebaneng le rekoto ho ea ho sebopeho se sekametseng kholomong. Bakeng sa li-dataset tsa li-gigabyte tse ngata, ho fetisetsa mohopolong kapa ho disk e ka ba mosebetsi o boima.

Ho tsamaisa data, hore na data ea mohloli ke mola kapa kholomo, khetho e le 'ngoe ke ho romela mela e menyenyane, e' ngoe le e 'ngoe e na le sebopeho sa columnar ka hare.

Ho Apache Arrow, pokello ea likholomo tsa mohopolo tse emelang chunk ea tafole e bitsoa batch ea rekoto. Ho emela sebopeho se le seng sa data sa tafole e utloahalang, lihlopha tse 'maloa tsa litlaleho li ka bokelloa.

Ka mokhoa o teng oa "phihlello e sa reroang", re rekota metadata e nang le schema ea tafole le libaka tse thibelang qetellong ea faele, e u lumellang hore u khethe ka theko e tlase haholo batch efe kapa efe ea lirekoto kapa kholomo efe kapa efe ho tsoa ho sete ea data. Ka mokhoa oa ho phallela, re romela letoto la melaetsa: kemiso, ebe joale sehlopha se le seng kapa ho feta tsa lirekoto.

Mefuta e fapaneng e shebahala tjena:

Tsamaisa lintlha tsa kholomo ka Apache Arrow

Phallela Data ka PyArrow: Kopo

Ho u bontša hore na sena se sebetsa joang, ke tla etsa mohlala oa dataset e emelang chunk e le 'ngoe ea molapo:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Joale, ha re re re batla ho ngola 1 GB ea data, e nang le likarolo tsa 1 MB ka 'ngoe, bakeng sa kakaretso ea likotoana tse 1024. Ho qala, ha re theheng data ea pele ea 1 MB e nang le likholomo tse 16:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Ebe ke ba fetolela ho pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Joale ke tla theha molapo o hlahisoang o tla ngolla RAM le ho theha StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Ebe re ngola li-chunks tse 1024, tseo qetellong li tla lekana le sete ea data ea 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Kaha re ngoletse RAM, re ka fumana molapo oohle ka har'a buffer e le 'ngoe:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Kaha data ena e ka mohopolong, ho bala lihlopha tsa lirekoto tsa Arrow ke ts'ebetso ea zero-copy. Ke bula StreamReader, ke bala data ho eona pyarrow.Table, ebe o li fetolela ho DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Sena sohle se lokile, empa o kanna oa ba le lipotso. See se etsahala kapele hakae? Boholo ba chunk bo ama ts'ebetso ea ho khutlisa ea pandas DataFrame joang?

Ts'ebetso ea ho phallela

Ha boholo ba chunk bo ntse bo fokotseha, litšenyehelo tsa ho aha "DataFrame" e kopaneng ho li-pandas lia eketseha ka lebaka la mekhoa e sa sebetseng ea phihlello ea cache. Ho boetse ho na le tse ling tse mabapi le ho sebetsa le C ++ meaho ea data le li-arrays le li-buffers tsa memori.

Bakeng sa 1 MB, joalo ka holimo, ho laptop ea ka (Quad-core Xeon E3-1505M) hoa hlaha:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Hoa fumaneha hore tlhahiso e sebetsang ke 7.75 GB/s ho khutlisetsa 1GB DataFrame ho tsoa ho likotoana tsa 1024 1MB. Ho etsahala'ng haeba re sebelisa likotoana tse kholo kapa tse nyane? Tsena ke liphetho:

Tsamaisa lintlha tsa kholomo ka Apache Arrow

Ts'ebetso e theoha haholo ho tloha ho 256K ho isa ho likotoana tsa 64K. Ke ile ka makala ha likotoana tsa 1 MB li sebetsoa ka potlako ho feta likotoana tsa 16 MB. Ke habohlokoa ho khanna thuto e tebileng haholoanyane le ho utloisisa hore na ke kabo e tloaelehileng kapa hore na ho na le ho hong ho bapalang.

Ts'ebetsong ea hajoale ea sebopeho, data ha e hatelloe ka molao-motheo, kahoo boholo ba mohopolong le "ka likhoele" bo batla bo lekana. Nakong e tlang, compression e ka fetoha khetho e eketsehileng.

Phello

Ho phallela data ea columnar e ka ba mokhoa o sebetsang oa ho fepa lisebelisoa tse kholo tsa data ho lisebelisoa tsa li-analytics tsa columnar joaloka li-pandas ka likotoana tse nyenyane. Litšebeletso tsa data tse sebelisang polokelo e shebaneng le mela li ka fetisa le ho fetisa lintlha tse nyane tse loketseng li-cache tsa L2 le L3 tsa processor ea hau.

Khoutu e felletseng

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Eketsa ka tlhaloso