Kuturikirwa kwechinyorwa kwakagadzirirwa zvakananga vadzidzi vekosi
Mumavhiki mashoma apfuura tine
Kutenderera Column Data
Mubvunzo wandinogashira kubva kuvashandisi veArrow mutengo wakakwirisa wekutamisa mahombe edata retabular kubva pamutsara- kana fomati yakarerekera kufomati yakatarisana nekoramu. Kune akawanda-gigabyte datasets, transposing mundangariro kana pa diski inogona kuve basa rakakura.
Kufambisa data, ingave iyo sosi data iri mutsara kana koramu, imwe sarudzo ndeyekutumira mabheti madiki emitsara, imwe neimwe iine columnar marongero mukati.
MuApache Arrow, muunganidzwa we-mu-memory column arrays inomiririra tafura chunk inonzi rekodhi batch. Kumiririra imwe data chimiro chetafura inonzwisisika, akati wandei mabheti erekodhi anogona kuunganidzwa.
Mune iripo "random access" faira fomati, tinorekodha metadata ine tafura schema uye nzvimbo dzekuvharira pakupera kwefaira, zvichikubvumidza kuti usarudze zvakachipa chero batch rekodhi kana chero column kubva pane data set. Mukutepfenyura fomati, tinotumira akatevedzana mameseji: rondedzero, uyezve rimwe kana akawanda mabheti emarekodhi.
Mafomu akasiyana anotaridzika seizvi:
Yekufambisa Data muPyArrow: Kushandisa
Kuti ndikuratidze kuti izvi zvinoshanda sei, ini ndichagadzira muenzaniso dataset inomiririra imwechete rwizi chunk:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Zvino, ngatitii tinoda kunyora 1 GB yedata, ine chunks ye1 MB imwe neimwe, kwehuwandu hwe1024 chunks. Kutanga, ngatigadzire yekutanga 1 MB data furemu ine gumi nematanhatu makoramu:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Ndobva ndavashandura kuti pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Iye zvino ini ndichagadzira rwizi rwunobuda rwunonyora ku RAM uye kugadzira StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Zvadaro tichanyora 1024 chunks, iyo inozopedzisira yasvika ku1GB yedata set:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Sezvo isu takanyorera RAM, tinogona kuwana rwizi rwese mune imwe buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Sezvo iyi data iri mundangariro, kuverenga mabheti eArrow marekodhi ibasa rezero-kopi. Ini ndinovhura StreamReader, ndinoverenga data mukati pyarrow.Table
, wobva waashandura kuva DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Zvese izvi, hongu, zvakanaka, asi unogona kunge uine mibvunzo. Izvi zvinokurumidza sei kuitika? Saizi yechunk inokanganisa sei kuita kwepandas DataFrame kutora?
Streaming Performance
Sezvo saizi yekutepfenyura chunk inodzikira, mutengo wekuvakazve inobatika columnar DataFrame mupandas inowedzera nekuda kwekusashanda cache yekuwana mapatani. Iko kune zvakare kumwe pamusoro kubva pakushanda neC ++ data zvimiro uye arrays uye yavo yekurangarira buffers.
Kune 1 MB, sepamusoro, palaptop yangu (Quad-core Xeon E3-1505M) zvinobuda:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Zvinoitika kuti iyo inoshanda yekufambisa ndeye 7.75 GB/s kudzoreredza 1GB DataFrame kubva ku1024 1MB chunks. Chii chinoitika kana tikashandisa machunks makuru kana madiki? Izvi ndizvo zvabuda:
Kuita kunodonha zvakanyanya kubva pa256K kusvika ku64K chunks. Ndakashamisika kuti 1 MB chunks yakagadziriswa nekukurumidza kupfuura 16 MB chunks. Zvakakodzera kuita chidzidzo chakadzama uye kunzwisisa kana uku kuri kugoverwa kwenguva dzose kana kuti pane chimwe chinhu chiri kutamba.
Mukushandiswa kwemazuva ano kwefomati, iyo data haina kumanikidzwa mumusimboti, saka saizi mundangariro uye "mumawaya" inenge yakafanana. Mune ramangwana, compression inogona kuve imwe sarudzo.
Mugumisiro
Kutepfenyura columnar data inogona kuva nzira inoshanda yekudyisa yakakura data seti mune columnar analytics maturusi se pandas mudiki chunks. Masevhisi edata anoshandisa mitsara yekuchengetera anogona kutamisa uye kutamisa madiki madiki e data ari nyore kune processor yako L2 uye L3 cache.
Kodhi yakazara
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com