Wergera gotarê bi taybetî ji bo xwendekarên qursê hatiye amadekirin
Di van çend hefteyên borî de em hene
Streaming Daneyên Stûnê
Pirseke hevpar a ku ez ji bikarhênerên Arrow distînim lêçûna zêde ya koçkirina komên mezin ên daneyên tabloyê ji formatek rêz-an-an-koordar berbi formatek stûn-oriented e. Ji bo danûstendinên pir-gigabyte, veguheztina di bîranînê de an li ser dîskê dikare bibe karekî giran.
Ji bo guheztina daneyan, ka daneya çavkaniyê rêz be an stûn be, vebijarkek ev e ku hûn komên piçûk ên rêzan bişînin, ku her yek di hundurê de xêzek stûnek heye.
Di Apache Arrow de, berhevoka rêzikên stûnên nav-bîrê yên ku perçeyek tabloyê temsîl dikin jê re komek tomar tê gotin. Ji bo temsîlkirina avahiyek daneya yekane ya tabloyek mentiqî, çend beşên tomar dikarin werin berhev kirin.
Di forma pelê ya "gihîştina tesadufî" ya heyî de, em metadaneyên ku şemaya tabloyê vedihewîne tomar dikin û cîhên li dawiya pelê bloke dikin, dihêlin hûn bi erzanî ji komek tomar an stûnek ji komek daneyê hilbijêrin. Di formata weşanê de, em rêzek peyaman dişînin: xêzek, û dûv re yek an jî çend komek tomar.
Formatên cûda mîna vê xuya dikin:
Streaming Daneyên li PyArrow: Serlêdan
Ji bo ku nîşanî we bidim ka ev çawa dixebite, ez ê danehevek mînakek ku perçeyek yekane temsîl dike biafirînim:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Naha, em bibêjin ku em dixwazin 1 GB daneyan binivîsin, ku her yek ji perçeyên 1 MB pêk tê, bi tevahî 1024 parçe. Ji bo destpêkirinê, bila em çarçoweya daneya yekem a 1 MB bi 16 stûnan biafirînin:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Paşê ez wan vediguherînim pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Naha ez ê diherikek encamekê biafirînim ku dê li RAM-ê binivîsîne û biafirîne StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Dûv re em ê 1024 perçeyan binivîsin, ku dê di dawiyê de bigihîje komek daneya 1 GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Ji ber ku me ji RAM-ê re nivîsand, em dikarin tevahiyê di yek tamponê de bistînin:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Ji ber ku ev dane di bîranînê de ye, xwendina komikên tomarên Arrow operasyonek sifir-kopî ye. Ez StreamReader vedikim, daneyan tê de dixwînim pyarrow.Table
, û paşê wan veguherînin DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Hemî ev, bê guman, baş e, lê dibe ku pirsên we hebin. Ev çiqas zû dibe? Mezinahiya perçeyê çawa bandorê li ser performansa hilanîna DataFrame ya panda dike?
Performansa Streaming
Her ku mezinahiya perçeya tîrêjê kêm dibe, lêçûna ji nû veavakirina DataFrame-ya stûnek a hevgirtî di pandayan de ji ber qalibên gihîştina cache-ya bêbandor zêde dibe. Di heman demê de ji xebata bi strukturên daneya C++ û array û tamponên bîranîna wan re hin serf jî heye.
Ji bo 1 MB, wekî li jor, li ser laptopa min (Quad-core Xeon E3-1505M) derdikeve holê:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Derdikeve holê ku rêjeya bandorker 7.75 GB / s e ku 1 GB DataFrame ji 1024 perçeyên 1 MB vegerîne. Ger em perçeyên mezin an piçûktir bikar bînin çi dibe? Ev encam ev in:
Performansa bi girîngî ji 256K berbi 64K perçeyan dadikeve. Ez şaş bûm ku perçeyên 1 MB ji perçeyên 16 MB zûtir têne pêvajoyê kirin. Hêja ye ku lêkolînek berfirehtir were kirin û têgihîştin ka gelo ev belavkirinek normal e an tiştek din di lîstikê de heye.
Di pêkanîna heyî ya formatê de, dane di prensîbê de nayên qewirandin, ji ber vê yekê mezinahiya bîranîn û "di têlan" de bi qasî hev e. Di pêşerojê de, compression dibe ku bibe vebijarkek din.
Encam
Veguhastina daneya stûnî dikare rêyek bi bandor be da ku berhevokên daneya mezin di nav amûrên analîtîk ên stûnî de mîna pandayan di perçeyên piçûk de bihêle. Karûbarên daneyê yên ku hilanîna rêz-oriented bikar tînin dikarin perçeyên piçûk ên daneya ku ji bo kaşeyên L2 û L3 yên pêvajoya we rehettir in veguhezînin û veguhezînin.
Koda tevahî
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com