Rere Raraunga Tiwae me te Pere Apache

Ko te whakamaoritanga o te tuhinga i whakaritea mo nga akonga o te akoranga Kaihanga Raraunga.

Rere Raraunga Tiwae me te Pere Apache

I roto i nga wiki kua pahure ake nei Nong Li tāpiri ki Pere Apache te whakatakotoranga roma rua, hei whakakii i te whakatakotoranga konae uru matapōkere/IPC. Kei a matou nga whakatinanatanga Java me C++ me nga here Python. I roto i tenei tuhinga, ka whakamarama ahau me pehea te mahi o te whakatakotoranga me te whakaatu me pehea e taea ai e koe te whakaputa raraunga tino teitei mo te Pandas DataFrame.

Rererenga Raraunga Tiwae

Ko tetahi patai noa mai i nga kaiwhakamahi Pere ko te utu nui mo te heke i nga huinga raraunga ripanga nui mai i te whakatakotoranga haupae, rekoata ranei ki te whakatakotoranga-tiwae. Mo nga huingararaunga maha-kikapaita, ko te whakawhiti i roto i te mahara, i runga kōpae ranei he mahi tino nui.

Ki te rere raraunga, ahakoa he rarangi, he pou ranei te puna raraunga, ko tetahi o nga whiringa ko te tuku i nga roopu rarangi iti, kei roto he whakatakotoranga pou kei roto.

I roto i te Pere Apache, ko te kohikohinga o nga rarangi poumahara-a-roto e tohu ana i te wahanga ripanga ka kiia he puranga rekoata. Hei tohu i te hanganga raraunga kotahi o te ripanga arorau, ka taea te kohia etahi puranga rekoata.

I roto i te whakatakotoranga konae "whakauru tupurangi", ka tuhi matou i nga metadata kei roto te aronuinga ripanga me nga waahi aukati i te mutunga o te konae, ka taea e koe te kowhiri i nga kohinga rekoata me tetahi pou mai i te huinga raraunga. I roto i te whakatakotoranga roma, ka tukuna e matou he raupapa o nga karere: he whakahuahua, katahi ka kotahi, neke atu ranei nga roopu rekoata.

He penei te ahua o nga ahuatanga rereke:

Rere Raraunga Tiwae me te Pere Apache

Rere Raraunga i PyArrow: Taupānga

Hei whakaatu ki a koe me pehea te mahi, ka hanga e ahau he tauira huingararaunga e tohu ana i te waahanga awa kotahi:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Inaianei, me kii e hiahia ana matou ki te tuhi i te 1 GB o nga raraunga, kei roto i nga wahanga o te 1 MB ia ia, mo te katoa o nga wahanga 1024. Hei timata, me hanga te anga raraunga tuatahi 1 MB me nga pou 16:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Na ka hurihia e ahau ki a raatau pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Inaianei ka hangaia e ahau he awa whakaputa ka tuhi ki te RAM me te hanga StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Na ka tuhia e matou nga wahanga 1024, ka tae ki te huinga raraunga 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

I te mea i tuhi matou ki te RAM, ka taea e matou te tiki i te awa katoa i roto i te parapara kotahi:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

I te mea kei te maumahara enei raraunga, he mahi kore kape te panui i nga puranga o nga rekoata Pere. Ka whakatuwherahia e ahau te StreamReader, panuihia nga raraunga ki roto pyarrow.Table, ka huri ki DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Ko enei mea katoa, he pai, engari kei a koe etahi patai. Kia pehea te tere o tenei? He pehea te paanga o te rahi o te tiihi ki nga mahi tangohanga anga Raraunga pandas?

Mahinga Rererangi

I te hekenga o te rahi o te tiihi roma, ka piki ake te utu mo te hanga hou i te Raraunga Raraunga pourangi i roto i nga pandas na te koretake o nga tauira uru keteroki. He nui ano te mahi mai i te mahi me nga hanganga raraunga C++ me nga raupapa me o raatau papaa mahara.

Mo te 1 MB, penei i runga ake nei, i runga i taku pona (Quad-core Xeon E3-1505M) ka puta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ka puta ko te whakaputanga whai hua ko te 7.75 GB/s ki te whakahoki mai i te 1GB DataFrame mai i te 1024 1MB nga wahanga. Ka aha mena ka whakamahia e tatou nga waahanga nui, iti ake ranei? Ko enei nga hua:

Rere Raraunga Tiwae me te Pere Apache

Ka tino heke te mahinga mai i te 256K ki te 64K nga wahanga. I miharo ahau na te 1 MB nga waahanga i tere ake i te 16 MB nga waahanga. He pai ake te whakahaere rangahau me te maarama mehemea he tohatoha noa tenei, he mea ke atu ranei kei te takaro.

I roto i te whakatinanatanga o te whakatakotoranga o naianei, kaore i te kopaki nga raraunga i runga i te maapono, no reira he rite tonu te rahi o te mahara me te "i nga waea". I te wa kei te heke mai, ka taea pea te taapiri hei whiringa taapiri.

Ko te hua

Ko te whakawhiti raraunga poupou he huarahi whai hua ki te whangai i nga huinga raraunga nui ki nga taputapu tātari poupou penei i nga pandas i roto i nga waahanga iti. Ka taea e nga ratonga Raraunga e whakamahi ana i te rokiroki rarangi haupae te whakawhiti me te whakawhiti i nga wahanga iti o nga raraunga e pai ake ana mo nga keteroki L2 me L3 o to tukatuka.

Waehere katoa

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: will.com

Tāpiri i te kōrero