Ke kahe nei i ka ʻikepili kolamu me Apache Arrow

Ua hoʻomākaukau ʻia ka unuhi ʻana o ka ʻatikala no nā haumāna o ka papa ʻEnekinia ʻIkepili.

Ke kahe nei i ka ʻikepili kolamu me Apache Arrow

I nā pule i hala iho nei ua loaʻa iā mākou Nong Li hoʻohui ʻia i Puna Apache ʻano hoʻoheheʻe binary, hoʻopiha i ke ʻano o ke komo ʻana o ka waihona / IPC file. Loaʻa iā mākou nā hoʻokō Java a me C++ a me nā paʻa Python. Ma kēia ʻatikala, e wehewehe au i ka hana ʻana o ke ʻano a hōʻike pehea e hiki ai iā ʻoe ke hoʻokō i ka throughput data kiʻekiʻe loa no kahi pandas DataFrame.

Ke hoʻoheheʻe nei i ka ʻikepili kolamu

ʻO kahi nīnau maʻamau i loaʻa iaʻu mai nā mea hoʻohana ʻo Arrow ʻo ke kumukūʻai kiʻekiʻe o ka neʻe ʻana i nā pūʻulu nui o ka ʻikepili tabular mai kahi hōʻano lālani a i ʻole ka hoʻopaʻa moʻolelo i kahi ʻano kolamu. No ka nui-gigabyte datasets, hiki ke hoʻololi i ka hoʻomanaʻo a i ʻole ma ka disk he hana nui.

No ke kahe ʻana i ka ʻikepili, inā he lālani a kolamu paha ka ʻikepili kumu, hoʻokahi koho e hoʻouna i nā pūʻulu liʻiliʻi o nā lālani, aia kēlā me kēia me kahi hoʻolālā columnar i loko.

Ma Apache Arrow, ua kapa ʻia ka hōʻiliʻili o nā kolamu i loko o ka hoʻomanaʻo e hōʻike ana i kahi puʻupuʻu papaʻaina he pūʻulu moʻolelo. No ka hōʻike ʻana i kahi hoʻonohonoho ʻikepili hoʻokahi o kahi papa logical, hiki ke hōʻiliʻili ʻia kekahi mau pūʻulu o nā moʻolelo.

Ma ka ʻano waihona "ākeʻakeʻa" i loaʻa, hoʻopaʻa mākou i nā metadata e loaʻa ana i ka schema papaʻaina a me nā wahi poloka ma ka hope o ka faila, e ʻae iā ʻoe e koho maʻalahi i nā pūʻulu o nā moʻolelo a i ʻole kekahi kolamu mai kahi pūʻulu ʻikepili. Ma ke ʻano hoʻoheheʻe ʻana, hoʻouna mākou i kahi pūʻulu o nā memo: kahi outline, a laila hoʻokahi a ʻoi aku paha nā pūʻulu o nā moʻolelo.

ʻO nā ʻano ʻano like ʻole e like me kēia:

Ke kahe nei i ka ʻikepili kolamu me Apache Arrow

Kaʻikepili kahe ma PyArrow: Noi

No ka hōʻike ʻana iā ʻoe pehea e hana ai kēia, e hana wau i kahi hiʻohiʻona hiʻohiʻona e hōʻike ana i kahi ʻāpana kahawai hoʻokahi:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

I kēia manawa, e ʻōlelo mākou makemake mākou e kākau i 1 GB o ka ʻikepili, me nā ʻāpana o 1 MB i kēlā me kēia, no ka huina o 1024 mau ʻāpana. No ka hoʻomaka ʻana, e hana kāua i ka pahu ʻikepili mua 1 MB me 16 kolamu:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

A laila hoʻohuli wau iā lākou i pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

I kēia manawa e hana wau i kahi kahawai puka e kākau iā RAM a hana StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

A laila e kākau mākou i nā puʻupuʻu 1024, ʻo ia ka mea e helu ai i kahi hoʻonohonoho data 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

No ka mea ua kākau mākou iā RAM, hiki iā mākou ke loaʻa ke kahawai holoʻokoʻa i hoʻokahi buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

No ka mea aia kēia ʻikepili i ka hoʻomanaʻo, ʻo ka heluhelu ʻana i nā pūʻulu o nā moʻolelo Arrow he hana kope kope ʻole. Wehe au i StreamReader, heluhelu i ka ʻikepili i loko pyarrow.Table, a laila hoʻohuli iā lākou i DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

He mea maikaʻi kēia mau mea a pau, akā he mau nīnau paha kāu. Pehea ka wikiwiki o kēia hana? Pehea e pili ai ka nui o ka ʻāpana i ka hana kiʻi ʻana i nā pandas DataFrame?

Hoʻokō kahe

I ka emi ʻana o ka nui o ke kahe ʻana, piʻi ke kumukūʻai o ke kūkulu hou ʻana i ka DataFrame kolamu pili i nā pandas ma muli o nā kumu hoʻohālike ʻole o ka cache. Aia kekahi ma luna o ka hana ʻana me nā ʻōnaehana data C++ a me nā arrays a me kā lākou mau mea hoʻomanaʻo.

No ka 1 MB, e like me luna, ma kaʻu kamepiula (Quad-core Xeon E3-1505M) ʻike ʻia:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

ʻIke ʻia ʻo 7.75 GB/s ka hopena kūpono e hoʻihoʻi i kahi 1GB DataFrame mai 1024 1MB chunks. He aha ka hopena inā hoʻohana mākou i nā ʻāpana nui a liʻiliʻi paha? Eia nā hopena:

Ke kahe nei i ka ʻikepili kolamu me Apache Arrow

Ua emi nui ka hana mai ka 256K a i ka 64K chunks. Pīhoihoi au i ka wikiwiki ʻana o nā ʻāpana 1 MB ma mua o nā ʻāpana 16 MB. Pono e hana i kahi noiʻi ʻoi aku ka maikaʻi a me ka hoʻomaopopo ʻana inā he mahele maʻamau kēia a i ʻole he mea ʻē aʻe i ka pāʻani.

I ka hoʻokō ʻana i kēia manawa o ke ʻano, ʻaʻole i hoʻopili ʻia ka ʻikepili ma ke kumu, no laila ua like ka nui o ka hoʻomanaʻo a me "i nā uea". I ka wā e hiki mai ana, hiki ke lilo ka hoʻoemi i kahi koho hou.

ʻO ka hopena

Hiki i ka hoʻoheheʻe ʻana i ka ʻikepili columnar ke ala maikaʻi e hānai ai i nā pūʻulu ʻikepili nui i loko o nā mea hana ʻikepili columnar e like me nā pandas i nā ʻāpana liʻiliʻi. Hiki i nā lawelawe ʻikepili e hoʻohana ana i ka waiho ʻana i ka lālani ke hoʻololi a hoʻololi i nā ʻāpana liʻiliʻi o ka ʻikepili i ʻoi aku ka maʻalahi no nā huna L2 a me L3 o kāu kaʻina hana.

Code piha

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka