Articuli translatio specialiter pro alumnis curriculi praeparata est
Praeteritum paucos hebdomades habemus
Streaming Column Data
Communis quaestio ab usoribus Sagittae accipio est summus sumptus migrandi magnas tabulas notitias tabulares ex forma actuaria seu forma electronica ad formas ordinandas. Nam multi-gigabytae datastae, in memoriam transponentes vel in orbe, munus esse maximum possunt.
Ad data rivulum, sive fons notitiae sive columnae sit ordo, una optio mittenda est batchellas versuum, in quovis schedula columnaria intus continetur.
In Apache Sagitta, collectio columnae memoriae in- vestit tabulam repraesentans FRUSTUM dicitur record batch. Ad unicam notae structuram tabulae logicae repraesentandam, plures fasciculi monumentorum colligi possunt.
In exsistens "temere accessum" forma lima, metadata memoramus continens schema mensae et loca clausula in fine tabellae, permittens te perquam vilie aliquem batchrum deligere monumentorum vel columnam quamlibet e notitia certa. In profluente forma, seriem nuntiorum mittimus: adumbratum, ac deinde unum vel plures batches monumentorum.
Formae diversae aliquid simile hoc spectant:
Data in PyArrow streaming: Application
Ut tibi haec opera doceam, dabo exemplum dataset ut unicum flumen FRUSTUM;
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Nunc dicamus nos velle 1 GB notitiarum scribere, e chunkis 1 MB singulis constans, ad summam 1024 chunks. Incipere, creare primam 1 MB datam machinam cum 16 columnis:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Et convertam eos ad pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nunc rivum output creabo qui RAM scribet et creabo StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Tunc scribemus 1024 chunks, quae tandem ad 1GB notitiarum copiarum summam componunt:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Cum ad RAM scripserimus, totum rivum in uno quiddam obtinere possumus;
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Cum haec notitia in memoria sit, batches tabularum sagittariorum legendi nulla est operatio exemplaris. Aperio StreamReader, data in lege pyarrow.Table
et converte eos DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Haec quidem bona sunt, sed habeas quaestiones. Quam cito hoc fit? Quomodo FRUSTUM amplitudo afficit effectum retrievalis pandas DataFrame?
Vestibulum euismod
Ut profluentia FRUSTUM decrescit magnitudo, sumptus restaurandi contiguum DataFrame columnaris in pandas auget ob inhabilis ad exemplaria accessus cache. Est etiam aliquid supra caput ab operibus C++ datae structurae et vestit ac memoria eorum buffers.
Nam 1 MB, ut supra, in laptop meo (Quad-core Xeon E3-1505M) evenit:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Evenit ut efficax throughput sit 7.75 GB/s ut 1GB DataFrame ex 1024 1MB chunks restituat. Quid fit, si utamur majoribus vel minoribus chunkis? Hi eventus sunt:
Euismod guttae significanter a 256K ad 64K chunks. Miratus sum quod 1 MB chunks velociores quam 16 MB chunks discursum esse. Operaepretium est pervestigationis et intelligentiae pertractandae num haec distributio normalis sit an aliquid aliud in ludo.
In current exsequenda forma, notitia in principio non comprimitur, ergo magnitudo in memoria et "in filis" eadem fere est. In futurum, compressio optionis additae fieri potest.
exitum
Data columnaria streaming efficax esse potest modus ad magnas notitias pascendas in instrumenta analyticorum columnarum ponit sicut pandas in parvis chunkis. Data officia quae reposita actuaria utuntur, possunt transferre et transponere parvas rerum notitias, quae commodiores sunt pro cella processus tui L2 et L3.
Plena code
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com