Sonraí colún a shruthú le Apache Arrow

Ullmhaíodh aistriúchán an ailt go sonrach do mhic léinn an chúrsa Innealtóir Sonraí.

Sonraí colún a shruthú le Apache Arrow

Le cúpla seachtain anuas ní mór dúinn Nán Li curtha le Saighead Apache formáid dhénártha sruthú, ag comhlánú an fhormáid comhaid rochtana randamach/IPC atá ann cheana féin. Tá feidhmithe Java agus C++ againn agus ceangail Python. San Airteagal seo, míneoidh mé conas a oibríonn an fhormáid agus taispeánfaidh mé conas is féidir leat tréchur sonraí an-ard a bhaint amach le haghaidh pandas DataFrame.

Sonraí Colúin Sruthaithe

Ceist choitianta a fhaighim ó úsáideoirí Arrow is ea an costas ard a bhaineann le tacair mhóra sonraí táblaí a aistriú ó fhormáid atá dírithe ar shraith nó taifead go formáid atá dírithe ar cholúin. I gcás tacair sonraí il-gigabyte, is féidir le trasuíomh sa chuimhne nó ar dhiosca a bheith ina thasc iomarcach.

Chun sonraí a shruthú, cibé acu is sraith nó colún iad na sonraí foinseacha, is é rogha amháin baisceanna beaga sraitheanna a sheoladh, agus leagan amach colún ar gach ceann díobh.

In Apache Arrow, tugtar baisc taifid ar bhailiúchán na n-eagair cholúin in-chuimhne a sheasann do smután boird. Chun struchtúr sonraí aonair de thábla loighciúil a léiriú, is féidir roinnt baisceanna taifead a bhailiú.

San fhormáid comhaid "rochtain randamach" atá ann cheana féin, déanaimid taifead ar mheiteashonraí ina bhfuil scéimre an tábla agus láithreacha blocála ag deireadh an chomhaid, rud a ligeann duit aon bhaisc taifead nó aon cholún a roghnú go han-saor ó thacar sonraí. I bhformáid sruthú, cuirimid sraith teachtaireachtaí: breac-chuntas, agus ansin baisc amháin nó níos mó de thaifid.

Breathnaíonn na formáidí éagsúla rud éigin mar seo:

Sonraí colún a shruthú le Apache Arrow

Sonraí Sruthaithe i PyArrow: Feidhmchlár

Chun a thaispeáint duit conas a oibríonn sé seo, cruthóidh mé tacar sonraí samplach a léiríonn smután aon srutha:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Anois, a ligean ar a rá ba mhaith linn a scríobh 1 GB de shonraí, comhdhéanta de smután de 1 MB an ceann, le haghaidh iomlán de 1024 smután. Chun tús a chur leis, cruthaimis an chéad fhráma sonraí 1 MB le 16 cholún:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Ansin mé iad a thiontú go pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Anois cruthóidh mé sruth aschuir a scríobhfaidh chuig RAM agus a chruthóidh StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Ansin scríobhfaimid 1024 smután, arb ionann iad ar deireadh thiar agus tacar sonraí 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Ó scríobhamar chuig RAM, is féidir linn an sruth iomlán a fháil in aon mhaolán amháin:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Ós rud é go bhfuil na sonraí seo i gcuimhne, is oibríocht chóip nialasach é baisceanna de thaifid Arrow a léamh. Osclaím StreamReader, léigh sonraí isteach pyarrow.Table, agus ansin iad a thiontú go DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tá sé seo go léir, ar ndóigh, go maith, ach b'fhéidir go mbeadh ceisteanna agat. Cé chomh tapa agus a tharlaíonn sé seo? Conas a théann méid smután i bhfeidhm ar fheidhmíocht aisghabhála pandas DataFrame?

Feidhmíocht Sruthaithe

De réir mar a laghdaítear an smután sruthú, méadaíonn an costas a bhaineann le DataFrame colúnach tadhlach a athchruthú i pandas mar gheall ar phatrúin neamhéifeachtúla rochtana taisce. Tá roinnt forchostais i gceist freisin ó bheith ag obair le struchtúir agus eagair sonraí C++ agus a gcuid maoláin chuimhne.

Le haghaidh 1 MB, mar atá thuas, ar mo ríomhaire glúine (Quad-core Xeon E3-1505M) casadh sé amach:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Tarlaíonn sé gurb é 7.75 GB/s an tréchur éifeachtach chun DataFrame 1GB a aischur ó smután 1024 1MB. Cad a tharlaíonn má úsáidimid smután níos mó nó níos lú? Seo iad na torthaí:

Sonraí colún a shruthú le Apache Arrow

Titeann feidhmíocht go suntasach ó 256K go 64K. Bhí ionadh orm gur próiseáladh smután 1 MB níos tapúla ná smután 16 MB. Is fiú staidéar níos críochnúla a dhéanamh agus tuiscint a fháil ar cibé an gnáthdháileadh é seo nó an bhfuil rud éigin eile i gceist.

I gcur i bhfeidhm reatha na formáide, níl na sonraí comhbhrúite i bprionsabal, mar sin tá an méid sa chuimhne agus "sna sreanga" thart ar an gcéanna. Sa todhchaí, d'fhéadfadh comhbhrú a bheith ina rogha breise.

Iomlán na

Is féidir le sruthú sonraí colúnacha a bheith ina bhealach éifeachtach chun tacair mhóra sonraí a chur in uirlisí anailíse colúnacha amhail pandas i smutáin bheaga. Is féidir le seirbhísí sonraí a úsáideann stóráil atá dírithe ar rónna smután beag sonraí a aistriú agus a thrasuíomh atá níos áisiúla do thaisce L2 agus L3 do phróiseálaithe.

Cód iomlán

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Foinse: will.com

Add a comment