Raksta tulkojums tika sagatavots speciāli kursa studentiem
Pēdējo nedēļu laikā mēs
Straumēšanas kolonnas dati
Izplatīts jautājums, ko saņemu no Arrow lietotājiem, ir lielas tabulas datu kopu migrēšanas izmaksas no rindu vai ierakstu formāta uz kolonnu formātu. Vairāku gigabaitu datu kopām transponēšana atmiņā vai diskā var būt milzīga.
Datu straumēšanai neatkarīgi no tā, vai avota dati ir rinda vai kolonna, viena iespēja ir nosūtīt nelielas rindu partijas, katrai no kurām ir iekšējais kolonnu izkārtojums.
Programmā Apache Arrow atmiņā esošo kolonnu masīvu kolekciju, kas attēlo tabulas daļu, sauc par ierakstu partiju. Lai attēlotu vienu loģiskās tabulas datu struktūru, varat apkopot vairākas ierakstu kopas.
Esošajā "brīvpiekļuves" faila formātā mēs faila beigās rakstām metadatus, kas satur tabulas shēmu un bloku izkārtojumu, kas ļauj ļoti lēti atlasīt jebkuru ierakstu partiju vai jebkuru kolonnu no datu kopas. Straumēšanas formātā mēs nosūtām virkni ziņojumu: shēmu un pēc tam vienu vai vairākas ierakstu grupas.
Dažādie formāti izskatās apmēram šādi:
Datu straumēšana PyArrow: lietojumprogramma
Lai parādītu, kā tas darbojas, es izveidošu datu kopas piemēru, kas attēlo vienu straumes daļu:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Tagad pieņemsim, ka mēs vēlamies ierakstīt 1 GB datu, kas sastāv no 1 MB gabaliem, kopā 1024 datus. Vispirms izveidosim pirmo 1 MB datu rāmi ar 16 kolonnām:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Tad es tos pārvēršu par pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Tagad es izveidošu izvades straumi, kas rakstīs RAM un izveidos StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Pēc tam mēs ierakstīsim 1024 gabalus, kas galu galā veidos 1 GB datu kopu:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Tā kā mēs rakstījām RAM, mēs varam iegūt visu straumi vienā buferī:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Tā kā šie dati atrodas atmiņā, Arrow ierakstu partiju nolasīšana tiek iegūta ar nulles kopiju. Es atveru StreamReader, nolasu datus pyarrow.Table
un pēc tam pārvērst tos par DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Tas viss, protams, ir labi, taču jums var būt jautājumi. Cik ātri tas notiek? Kā gabala lielums ietekmē pandas DataFrame izguves veiktspēju?
Straumēšanas veiktspēja
Samazinoties straumēšanas gabala lielumam, neefektīvu kešatmiņas piekļuves shēmu dēļ palielinās blakus esošā kolonnu datu rāmja atjaunošanas izmaksas pandās. Darbs ar C++ datu struktūrām un masīviem un to atmiņas buferiem ir arī nedaudz papildu.
Par 1 MB, kā norādīts iepriekš, manā klēpjdatorā (četrkodolu Xeon E3-1505M) izrādās:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Izrādās, ka efektīva caurlaidspēja ir 7.75 Gb / s, lai atjaunotu 1 GB DataFrame no 1024 1 MB gabaliem. Kas notiek, ja izmantosim lielākus vai mazākus gabalus? Šeit ir iegūtie rezultāti:
Veiktspēja ievērojami samazinās no 256 64 līdz 1 16 gabaliem. Es biju pārsteigts, ka XNUMX MB gabali tika apstrādāti ātrāk nekā XNUMX MB gabali. Ir vērts veikt rūpīgāku izpēti un saprast, vai tas ir normāls sadalījums, vai tas ir saistīts ar kaut ko citu.
Pašreizējā formāta ieviešanā dati principā netiek saspiesti, tāpēc izmērs atmiņā un "uz vada" ir aptuveni vienāds. Saspiešana var kļūt par iespēju nākotnē.
Kopsavilkums
Kolonnu datu straumēšana var būt efektīvs veids, kā pārsūtīt lielas datu kopas uz kolonnu analīzes rīkiem, piemēram, pandām mazos gabalos. Datu pakalpojumi, kas izmanto uz rindām orientētu krātuvi, var pārsūtīt un transponēt nelielas datu daļas, kas ir ērtākas jūsu procesora L2 un L3 kešatmiņai.
Pilns kods
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Avots: www.habr.com