Streaming Columnar Data with Apache Arrow

Articuli translatio specialiter pro alumnis curriculi praeparata est Data Engineer.

Streaming Columnar Data with Apache Arrow

Praeteritum paucos hebdomades habemus Nong Li additae sunt Apache Arrow forma binaria effusis, accessum temere exsistentem/IPC forma fasciculi complens. Habemus Java et C++ opera et ligamenta Python. In hoc articulo, quomodo opera format et monstrare possis quomodo altissimas notitias consequi possis, propter pandas DataFrame.

Streaming Column Data

Communis quaestio ab usoribus Sagittae accipio est summus sumptus migrandi magnas tabulas notitias tabulares ex forma actuaria seu forma electronica ad formas ordinandas. Nam multi-gigabytae datastae, in memoriam transponentes vel in orbe, munus esse maximum possunt.

Ad data rivulum, sive fons notitiae sive columnae sit ordo, una optio mittenda est batchellas versuum, in quovis schedula columnaria intus continetur.

In Apache Sagitta, collectio columnae memoriae in- vestit tabulam repraesentans FRUSTUM dicitur record batch. Ad unicam notae structuram tabulae logicae repraesentandam, plures fasciculi monumentorum colligi possunt.

In exsistens "temere accessum" forma lima, metadata memoramus continens schema mensae et loca clausula in fine tabellae, permittens te perquam vilie aliquem batchrum deligere monumentorum vel columnam quamlibet e notitia certa. In profluente forma, seriem nuntiorum mittimus: adumbratum, ac deinde unum vel plures batches monumentorum.

Formae diversae aliquid simile hoc spectant:

Streaming Columnar Data with Apache Arrow

Data in PyArrow streaming: Application

Ut tibi haec opera doceam, dabo exemplum dataset ut unicum flumen FRUSTUM;

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Nunc dicamus nos velle 1 GB notitiarum scribere, e chunkis 1 MB singulis constans, ad summam 1024 chunks. Incipere, creare primam 1 MB datam machinam cum 16 columnis:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Et convertam eos ad pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Nunc rivum output creabo qui RAM scribet et creabo StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Tunc scribemus 1024 chunks, quae tandem ad 1GB notitiarum copiarum summam componunt:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Cum ad RAM scripserimus, totum rivum in uno quiddam obtinere possumus;

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Cum haec notitia in memoria sit, batches tabularum sagittariorum legendi nulla est operatio exemplaris. Aperio StreamReader, data in lege pyarrow.Tableet converte eos DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Haec quidem bona sunt, sed habeas quaestiones. Quam cito hoc fit? Quomodo FRUSTUM amplitudo afficit effectum retrievalis pandas DataFrame?

Vestibulum euismod

Ut profluentia FRUSTUM decrescit magnitudo, sumptus restaurandi contiguum DataFrame columnaris in pandas auget ob inhabilis ad exemplaria accessus cache. Est etiam aliquid supra caput ab operibus C++ datae structurae et vestit ac memoria eorum buffers.

Nam 1 MB, ut supra, in laptop meo (Quad-core Xeon E3-1505M) evenit:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Evenit ut efficax throughput sit 7.75 GB/s ut 1GB DataFrame ex 1024 1MB chunks restituat. Quid fit, si utamur majoribus vel minoribus chunkis? Hi eventus sunt:

Streaming Columnar Data with Apache Arrow

Euismod guttae significanter a 256K ad 64K chunks. Miratus sum quod 1 MB chunks velociores quam 16 MB chunks discursum esse. Operaepretium est pervestigationis et intelligentiae pertractandae num haec distributio normalis sit an aliquid aliud in ludo.

In current exsequenda forma, notitia in principio non comprimitur, ergo magnitudo in memoria et "in filis" eadem fere est. In futurum, compressio optionis additae fieri potest.

exitum

Data columnaria streaming efficax esse potest modus ad magnas notitias pascendas in instrumenta analyticorum columnarum ponit sicut pandas in parvis chunkis. Data officia quae reposita actuaria utuntur, possunt transferre et transponere parvas rerum notitias, quae commodiores sunt pro cella processus tui L2 et L3.

Plena code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com