Pag-stream sa datos sa kolum gamit ang Apache Arrow

Ang paghubad sa artikulo espesipikong giandam alang sa mga estudyante sa kurso Data Engineer.

Pag-stream sa datos sa kolum gamit ang Apache Arrow

Sa miaging mga semana naa mi Nong Li gidugang sa Apache Arrow binary streaming format, nga nagsuporta sa kasamtangan nga random access/IPC file format. Kami adunay Java ug C++ nga mga pagpatuman ug Python bindings. Sa kini nga artikulo, akong ipasabut kung giunsa ang pormat molihok ug ipakita kung giunsa nimo makab-ot ang labi ka taas nga data throughput alang sa usa ka pandas DataFrame.

Streaming Column Data

Usa ka kasagarang pangutana nga akong nadawat gikan sa mga tiggamit sa Arrow mao ang taas nga gasto sa pagbalhin sa dagkong mga set sa tabular data gikan sa usa ka row- o record-oriented nga format ngadto sa column-oriented nga format. Para sa multi-gigabyte datasets, ang transposing sa memorya o sa disk mahimong usa ka bug-at nga buluhaton.

Sa pag-stream sa datos, bisan ang tinubdan nga datos usa ka laray o kolum, usa ka kapilian mao ang pagpadala og gagmay nga mga hugpong sa mga laray, ang matag usa adunay sulod nga kolumnar nga layout.

Sa Apache Arrow, ang koleksyon sa in-memory column arrays nga nagrepresentar sa table chunk gitawag nga record batch. Aron magrepresentar sa usa ka istruktura sa datos sa usa ka lohikal nga lamesa, daghang mga hugpong sa mga rekord ang mahimong kolektahon.

Sa naglungtad nga "random access" nga format sa file, among girekord ang metadata nga adunay sulud nga laraw sa lamesa ug gibabagan ang mga lokasyon sa katapusan sa file, nga gitugotan ka nga labi ka barato nga pagpili sa bisan unsang hugpong sa mga rekord o bisan unsang kolum gikan sa usa ka set sa datos. Sa streaming nga format, nagpadala kami og sunod-sunod nga mga mensahe: usa ka outline, ug dayon usa o daghan pa nga batch sa mga rekord.

Ang lain-laing mga format ingon niini:

Pag-stream sa datos sa kolum gamit ang Apache Arrow

Streaming Data sa PyArrow: Aplikasyon

Aron ipakita kanimo kung giunsa kini molihok, maghimo ako usa ka pananglitan nga dataset nga nagrepresentar sa usa ka tipik sa sapa:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Karon, ingnon ta nga gusto namong isulat ang 1 GB nga datos, nga gilangkuban sa mga tipik nga 1 MB matag usa, sa kinatibuk-an nga 1024 nga mga tipak. Sa pagsugod, maghimo kita sa unang 1 MB data frame nga adunay 16 ka kolum:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Unya gikabig nako sila sa pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Karon maghimo ko og output stream nga mosulat sa RAM ug maghimo StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Dayon magsulat kami og 1024 ka mga tipak, nga sa katapusan mokabat sa 1GB data set:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Tungod kay nagsulat kami sa RAM, makuha namon ang tibuuk nga sapa sa usa ka buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Tungod kay kini nga datos anaa sa panumduman, ang pagbasa sa mga hugpong sa mga rekord sa Arrow usa ka zero-copy nga operasyon. Giablihan nako ang StreamReader, gibasa ang datos pyarrow.Table, ug dayon i-convert sila ngadto sa DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Siyempre, kining tanan maayo, apan tingali adunay mga pangutana. Unsa kadali kini mahitabo? Sa unsang paagi ang gidak-on sa tipak makaapekto sa pasundayag sa pagkuha sa DataFrame sa pandas?

Pagganap sa Pag-stream

Samtang nagkunhod ang gidak-on sa streaming chunk, ang gasto sa pagtukod pag-usab sa usa ka magkadugtong nga kolumnar nga DataFrame sa mga pandas motaas tungod sa dili maayo nga mga pattern sa pag-access sa cache. Adunay usab pipila ka overhead gikan sa pagtrabaho sa C++ nga mga istruktura sa datos ug mga arrays ug ang ilang mga buffer sa panumduman.

Alang sa 1 MB, sama sa ibabaw, sa akong laptop (Quad-core Xeon E3-1505M) kini nahimo:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Kini turns nga ang epektibo nga throughput mao ang 7.75 GB/s sa pagpasig-uli sa usa ka 1GB DataFrame gikan sa 1024 1MB chunks. Unsa ang mahitabo kon mogamit kita og dagko o gagmay nga mga tipik? Mao kini ang mga resulta:

Pag-stream sa datos sa kolum gamit ang Apache Arrow

Ang performance mikunhod pag-ayo gikan sa 256K ngadto sa 64K nga mga tipak. Natingala ko nga ang 1 MB chunks kay mas paspas kay sa 16 MB chunks. Angayan nga maghimo ug mas bug-os nga pagtuon ug pagsabot kung normal ba kini nga pag-apod-apod o kung adunay lain pa nga gidula.

Sa kasamtangan nga pagpatuman sa format, ang datos wala gi-compress sa prinsipyo, mao nga ang gidak-on sa memorya ug "sa mga alambre" halos parehas. Sa umaabot, ang compression mahimong usa ka dugang nga kapilian.

Ang resulta

Ang pag-streaming sa columnar data mahimong epektibong paagi sa pagpakaon sa dagkong mga set sa datos ngadto sa columnar analytics nga mga himan sama sa mga pandas sa gagmay nga mga tipak. Ang mga serbisyo sa datos nga naggamit sa row-oriented storage mahimong magbalhin ug mobalhin sa gagmay nga mga tipik sa datos nga mas sayon ​​alang sa L2 ug L3 cache sa imong processor.

Bug-os nga kodigo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Idugang sa usa ka comment