Phetolelo ea sehlooho sena e lokiselitsoe ka ho khetheha liithuti tsa thupelo
Libekeng tse 'maloa tse fetileng re na le
Phallela Column Data
Potso e tloaelehileng eo ke e fumanang ho tsoa ho basebelisi ba Arrow ke litjeho tse phahameng tsa ho fallisa lihlopha tse kholo tsa data ea tabula ho tloha moleng-kapa sebopeho se shebaneng le rekoto ho ea ho sebopeho se sekametseng kholomong. Bakeng sa li-dataset tsa li-gigabyte tse ngata, ho fetisetsa mohopolong kapa ho disk e ka ba mosebetsi o boima.
Ho tsamaisa data, hore na data ea mohloli ke mola kapa kholomo, khetho e le 'ngoe ke ho romela mela e menyenyane, e' ngoe le e 'ngoe e na le sebopeho sa columnar ka hare.
Ho Apache Arrow, pokello ea likholomo tsa mohopolo tse emelang chunk ea tafole e bitsoa batch ea rekoto. Ho emela sebopeho se le seng sa data sa tafole e utloahalang, lihlopha tse 'maloa tsa litlaleho li ka bokelloa.
Ka mokhoa o teng oa "phihlello e sa reroang", re rekota metadata e nang le schema ea tafole le libaka tse thibelang qetellong ea faele, e u lumellang hore u khethe ka theko e tlase haholo batch efe kapa efe ea lirekoto kapa kholomo efe kapa efe ho tsoa ho sete ea data. Ka mokhoa oa ho phallela, re romela letoto la melaetsa: kemiso, ebe joale sehlopha se le seng kapa ho feta tsa lirekoto.
Mefuta e fapaneng e shebahala tjena:
Phallela Data ka PyArrow: Kopo
Ho u bontša hore na sena se sebetsa joang, ke tla etsa mohlala oa dataset e emelang chunk e le 'ngoe ea molapo:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Joale, ha re re re batla ho ngola 1 GB ea data, e nang le likarolo tsa 1 MB ka 'ngoe, bakeng sa kakaretso ea likotoana tse 1024. Ho qala, ha re theheng data ea pele ea 1 MB e nang le likholomo tse 16:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Ebe ke ba fetolela ho pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Joale ke tla theha molapo o hlahisoang o tla ngolla RAM le ho theha StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Ebe re ngola li-chunks tse 1024, tseo qetellong li tla lekana le sete ea data ea 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Kaha re ngoletse RAM, re ka fumana molapo oohle ka har'a buffer e le 'ngoe:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Kaha data ena e ka mohopolong, ho bala lihlopha tsa lirekoto tsa Arrow ke ts'ebetso ea zero-copy. Ke bula StreamReader, ke bala data ho eona pyarrow.Table
, ebe o li fetolela ho DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Sena sohle se lokile, empa o kanna oa ba le lipotso. See se etsahala kapele hakae? Boholo ba chunk bo ama ts'ebetso ea ho khutlisa ea pandas DataFrame joang?
Ts'ebetso ea ho phallela
Ha boholo ba chunk bo ntse bo fokotseha, litšenyehelo tsa ho aha "DataFrame" e kopaneng ho li-pandas lia eketseha ka lebaka la mekhoa e sa sebetseng ea phihlello ea cache. Ho boetse ho na le tse ling tse mabapi le ho sebetsa le C ++ meaho ea data le li-arrays le li-buffers tsa memori.
Bakeng sa 1 MB, joalo ka holimo, ho laptop ea ka (Quad-core Xeon E3-1505M) hoa hlaha:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Hoa fumaneha hore tlhahiso e sebetsang ke 7.75 GB/s ho khutlisetsa 1GB DataFrame ho tsoa ho likotoana tsa 1024 1MB. Ho etsahala'ng haeba re sebelisa likotoana tse kholo kapa tse nyane? Tsena ke liphetho:
Ts'ebetso e theoha haholo ho tloha ho 256K ho isa ho likotoana tsa 64K. Ke ile ka makala ha likotoana tsa 1 MB li sebetsoa ka potlako ho feta likotoana tsa 16 MB. Ke habohlokoa ho khanna thuto e tebileng haholoanyane le ho utloisisa hore na ke kabo e tloaelehileng kapa hore na ho na le ho hong ho bapalang.
Ts'ebetsong ea hajoale ea sebopeho, data ha e hatelloe ka molao-motheo, kahoo boholo ba mohopolong le "ka likhoele" bo batla bo lekana. Nakong e tlang, compression e ka fetoha khetho e eketsehileng.
Phello
Ho phallela data ea columnar e ka ba mokhoa o sebetsang oa ho fepa lisebelisoa tse kholo tsa data ho lisebelisoa tsa li-analytics tsa columnar joaloka li-pandas ka likotoana tse nyenyane. Litšebeletso tsa data tse sebelisang polokelo e shebaneng le mela li ka fetisa le ho fetisa lintlha tse nyane tse loketseng li-cache tsa L2 le L3 tsa processor ea hau.
Khoutu e felletseng
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com