A’ sruthadh dàta colbh le Apache Arrow

Chaidh eadar-theangachadh an artaigil ullachadh gu sònraichte airson oileanaich a’ chùrsa Einnseanair dàta.

A’ sruthadh dàta colbh le Apache Arrow

Anns na beagan sheachdainean a dh’ fhalbh tha againn Nong Li air a chur ris Saighead Apache cruth sruthadh dà-chànanach, a’ cur ris an fhòrmat faidhle ruigsinneachd air thuaiream / IPC a th’ ann. Tha buileachadh Java agus C ++ againn agus ceanglachan Python. San artaigil seo, mìnichidh mi mar a tha an cruth ag obair agus seallaidh mi mar as urrainn dhut trochur dàta fìor àrd a choileanadh airson pandathan DataFrame.

A’ sruthadh dàta colbh

Is e ceist chumanta a gheibh mi bho luchd-cleachdaidh Arrow a’ chosgais àrd a bhith ag imrich seataichean mòra de dhàta clàir bho chruth sreath no clàr gu cruth colbh. Airson stòran-dàta ioma-gigabyte, faodaidh gluasad thairis mar chuimhne no air diosc a bhith na obair uamhasach.

Gus dàta a shruthladh, ge bith an e sreath no colbh a th’ anns an dàta stòr, is e aon roghainn batches beaga de shreathan a chuir, gach fear le cruth colbh a-staigh.

Ann an Apache Arrow, canar baidse clàraidh ris a’ chruinneachadh de chlàran colbh in-chuimhne a tha a’ riochdachadh pìos bùird. Gus structar dàta singilte de bhòrd loidsigeach a riochdachadh, faodar grunn bhallachan de chlàran a chruinneachadh.

Anns an fhòrmat faidhle “ruigsinneachd air thuaiream” a th’ ann mar-thà, bidh sinn a’ clàradh meata-dàta anns a bheil sgeama a’ bhùird agus cruth a’ bhloca aig deireadh an fhaidhle, a’ toirt cothrom dhut taghadh gu math saor baidse de chlàran no colbh sam bith bho sheata dàta. Ann an cruth sruthadh, bidh sinn a’ cur sreath de theachdaireachdan: geàrr-chunntas, agus an uairsin aon no barrachd baidse de chlàran.

Tha na diofar chruthan a’ coimhead rudeigin mar seo:

A’ sruthadh dàta colbh le Apache Arrow

Dàta sruthadh ann am PyArrow: Iarrtas

Gus sealltainn dhut mar a tha seo ag obair, cruthaichidh mi eisimpleir de sheata-dàta a’ riochdachadh aon phìos srutha:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

A-nis, canaidh sinn gu bheil sinn airson 1 GB de dhàta a sgrìobhadh, anns a bheil pìosan de 1 MB gach fear, airson 1024 pìosan gu h-iomlan. Gus tòiseachadh, cruthaichidh sinn a’ chiad fhrèam dàta 1 MB le 16 colbhan:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

An uairsin tionndaidhidh mi iad gu pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

A-nis cruthaichidh mi sruth toraidh a sgrìobhas gu RAM agus a chruthaicheas StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

An uairsin sgrìobhaidh sinn 1024 pìosan, a thig gu crìch gu seata dàta 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Bhon a sgrìobh sinn gu RAM, gheibh sinn an t-sruth gu lèir ann an aon bhufair:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Leis gu bheil an dàta seo mar chuimhneachan, is e obair leth-bhreac neoni a th’ ann a bhith a’ leughadh batches de chlàran Arrow. Bidh mi a’ fosgladh StreamReader, leugh dàta a-steach pyarrow.Table, agus an uairsin tionndaidh iad gu DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tha seo uile, gu dearbh, math, ach is dòcha gu bheil ceistean agad. Dè cho luath sa tha seo a’ tachairt? Ciamar a bheir meud pìos buaidh air coileanadh pandathan DataFrame fhaighinn air ais?

Coileanadh Streaming

Mar a bhios meud a’ phìos sruthadh a’ dol sìos, tha a’ chosgais airson a bhith ag ath-chruthachadh DataFrame colbh ri thaobh ann am pandathan a’ dol am meud mar thoradh air pàtrain ruigsinneachd tasgadan neo-èifeachdach. Tha beagan cosgais ann cuideachd bho bhith ag obair le structaran dàta C ++ agus arrays agus na bufairean cuimhne aca.

Airson 1 MB, mar gu h-àrd, air an laptop agam (Quad-core Xeon E3-1505M) thionndaidh e a-mach:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Tha e a ’tionndadh a-mach gur e 7.75 GB / s an toradh èifeachdach airson DataFrame 1GB ath-nuadhachadh bho chnapan 1024 1MB. Dè thachras ma chleachdas sinn pìosan nas motha no nas lugha? Seo na toraidhean:

A’ sruthadh dàta colbh le Apache Arrow

Bidh coileanadh a’ tuiteam gu mòr bho 256K gu pìosan 64K. Chuir e iongnadh orm gun deach pìosan 1 MB a phròiseasadh nas luaithe na pìosan 16 MB. Is fhiach sgrùdadh nas mionaidiche a dhèanamh agus tuigsinn an e cuairteachadh àbhaisteach a tha seo no a bheil rudeigin eile ri chluich.

Ann am buileachadh an cruth gnàthach, chan eil an dàta air a dhlùthadh ann am prionnsapal, agus mar sin tha am meud ann an cuimhne agus “anns na uèirichean” timcheall air an aon rud. Anns an àm ri teachd, faodaidh teannachadh a bhith na roghainn a bharrachd.

An toradh

Faodaidh sruthadh dàta colbh a bhith na dhòigh èifeachdach air seataichean dàta mòra a bhiathadh a-steach do innealan anailis colbh leithid pandathan ann am pìosan beaga. Faodaidh seirbheisean dàta a bhios a’ cleachdadh stòradh stèidhichte air sreath pìosan beaga de dhàta a ghluasad agus a thar-chuir a tha nas freagarraiche airson caches L2 agus L3 do phròiseasar.

Còd slàn

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Cuir beachd ann