Pag-stream ng data ng column gamit ang Apache Arrow

Ang pagsasalin ng artikulo ay partikular na inihanda para sa mga mag-aaral ng kurso Data Engineer.

Pag-stream ng data ng column gamit ang Apache Arrow

Sa nakalipas na ilang linggo, kami Nong Li idinagdag sa Apache Arrow binary stream format, na nagdaragdag sa mayroon nang random na pag-access/IPC na format ng file. Mayroon kaming Java at C++ na mga pagpapatupad at Python binding. Sa artikulong ito, ipapaliwanag ko kung paano gumagana ang format at ipapakita kung paano mo makakamit ang napakataas na throughput ng data para sa isang pandas DataFrame.

Pag-stream ng data ng column

Ang isang karaniwang tanong na nakukuha ko mula sa mga user ng Arrow ay ang mataas na halaga ng paglipat ng malalaking tabular na dataset mula sa isang row- o record-oriented na format patungo sa isang format ng column. Para sa mga multi-gigabyte na dataset, ang paglipat sa memorya o sa disk ay maaaring maging napakalaki.

Para sa streaming ng data, row o column man ang source data, ang isang opsyon ay magpadala ng maliliit na batch ng mga row, bawat isa ay naglalaman ng layout ng column sa loob.

Sa Apache Arrow, ang isang koleksyon ng mga in-memory na columnar array na kumakatawan sa isang table chunk ay tinatawag na record batch. Upang kumatawan sa isang solong istraktura ng data ng isang lohikal na talahanayan, maaari kang mangolekta ng ilang mga pakete ng mga tala.

Sa umiiral na "random access" na format ng file, nagsusulat kami ng metadata na naglalaman ng schema ng talahanayan at layout ng block sa dulo ng file, na nagbibigay-daan sa iyong pumili ng anumang batch ng mga tala o anumang column mula sa dataset nang napakamura. Sa isang format ng streaming, nagpapadala kami ng isang serye ng mga mensahe: isang schema, at pagkatapos ay isa o higit pang mga batch ng mga tala.

Ang iba't ibang mga format ay katulad ng larawang ito:

Pag-stream ng data ng column gamit ang Apache Arrow

Pag-stream ng data sa PyArrow: application

Para ipakita sa iyo kung paano ito gumagana, gagawa ako ng halimbawang dataset na kumakatawan sa isang stream chunk:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Ngayon, ipagpalagay na gusto naming magsulat ng 1 GB ng data, na binubuo ng 1 MB chunks bawat isa, para sa kabuuang 1024 chunks. Una, gawin natin ang unang 1MB data frame na may 16 na column:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Pagkatapos ay kino-convert ko sila sa pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Ngayon gagawa ako ng output stream na magsusulat sa RAM at lilikha StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Pagkatapos ay magsusulat kami ng 1024 na chunks, na sa kalaunan ay bubuo ng 1GB na dataset:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Dahil sumulat kami sa RAM, makukuha namin ang buong stream sa isang buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Dahil ang data na ito ay nasa memorya, ang pagbabasa ng mga batch ng mga tala ng Arrow ay nakuha sa pamamagitan ng isang zero-copy na operasyon. Binuksan ko ang StreamReader, nagbasa ng data pyarrow.Tableat pagkatapos ay i-convert ang mga ito sa DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Ang lahat ng ito, siyempre, ay mabuti, ngunit maaaring mayroon kang mga katanungan. Gaano kabilis ito mangyari? Paano nakakaapekto ang laki ng tipak ng pandas DataFrame retrieval performance?

Pagganap ng streaming

Habang bumababa ang sukat ng streaming chunk, ang halaga ng muling pagtatayo ng magkadikit na columnar na DataFrame sa mga pandas ay tumataas dahil sa hindi mahusay na mga scheme ng pag-access sa cache. Mayroon ding ilang overhead mula sa pagtatrabaho sa mga istruktura at array ng data ng C++ at mga buffer ng kanilang memorya.

Para sa 1 MB tulad ng nasa itaas, sa aking laptop (Quad-core Xeon E3-1505M) lumalabas:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Lumalabas na ang epektibong throughput ay 7.75 Gb / s para sa pagpapanumbalik ng 1 GB DataFrame mula sa 1024 1 MB na mga chunks. Ano ang mangyayari kung gumamit tayo ng mas malaki o mas maliliit na tipak? Narito ang mga resultang makukuha mo:

Pag-stream ng data ng column gamit ang Apache Arrow

Malaki ang pagbaba ng performance mula 256K hanggang 64K na chunks. Nagulat ako na ang 1MB chunks ay naproseso nang mas mabilis kaysa sa 16MB chunks. Ito ay nagkakahalaga ng paggawa ng isang mas masusing pag-aaral at maunawaan kung ito ay isang normal na pamamahagi o iba pa ang nasasangkot.

Sa kasalukuyang pagpapatupad ng format, ang data ay hindi naka-compress sa prinsipyo, kaya ang laki sa memorya at "sa wire" ay humigit-kumulang pareho. Maaaring maging opsyon ang compression sa hinaharap.

Kabuuan

Ang pag-stream ng data ng column ay maaaring maging isang mahusay na paraan upang ilipat ang malalaking dataset sa column analytics tool tulad ng mga pandas sa maliliit na chunks. Ang mga serbisyo ng data na gumagamit ng row-oriented na storage ay maaaring maglipat at maglipat ng maliliit na chunks ng data na mas maginhawa para sa L2 at L3 cache ng iyong processor.

Buong code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Pinagmulan: www.habr.com

Magdagdag ng komento