Þýðing greinarinnar var unnin sérstaklega fyrir nemendur námskeiðsins
Undanfarnar vikur höfum við
Streymi dálkagögn
Algeng spurning sem ég fæ frá Arrow notendum er mikill kostnaður við að flytja stór sett af töflugögnum úr röð- eða færslumiðuðu sniði yfir í dálkamiðað snið. Fyrir margra gígabæta gagnasöfn getur yfirfærsla í minni eða á disk verið yfirþyrmandi verkefni.
Til að streyma gögnum, hvort sem upprunagögnin eru röð eða dálkur, er einn valkostur að senda litlar lotur af línum, sem hver inniheldur dálkaskipulag inni.
Í Apache Arrow er safn dálkafylkja í minni sem tákna töfluklump kallað færslulotu. Til að tákna eina gagnauppbyggingu rökréttrar töflu er hægt að safna nokkrum lotum af skrám.
Í núverandi "random access" skráarsniði, skráum við lýsigögn sem innihalda töfluskema og lokunarstaðsetningar í lok skráarinnar, sem gerir þér kleift að velja á mjög ódýran hátt hvaða lotu af færslum sem er eða hvaða dálk sem er úr gagnasetti. Í streymissniði sendum við röð skilaboða: útlínur og síðan eina eða fleiri færslur.
Mismunandi snið líta einhvern veginn svona út:
Straumspilun gagna í PyArrow: Forrit
Til að sýna þér hvernig þetta virkar, mun ég búa til dæmi um gagnasafn sem táknar einn straumklump:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Segjum nú að við viljum skrifa 1 GB af gögnum, sem samanstanda af 1 MB bitum hvor, fyrir samtals 1024 bita. Til að byrja skulum við búa til fyrsta 1 MB gagnarammann með 16 dálkum:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Síðan breyti ég þeim í pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nú mun ég búa til úttaksstraum sem mun skrifa í vinnsluminni og búa til StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Síðan munum við skrifa 1024 klumpur, sem munu á endanum nema 1GB gagnasetti:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Þar sem við skrifuðum í vinnsluminni getum við fengið allan strauminn í einum biðminni:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Þar sem þessi gögn eru í minni er það að lesa runur af Arrow-skrám núllafritunaraðgerð. Ég opna StreamReader, les gögn inn pyarrow.Table
, og umbreyttu þeim síðan í DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Allt þetta er auðvitað gott, en þú gætir haft spurningar. Hversu fljótt gerist þetta? Hvernig hefur klumpastærð áhrif á afköst Panda DataFrame endurheimtar?
Árangur á streymi
Eftir því sem straumklumpurinn minnkar, eykst kostnaður við að endurbyggja samliggjandi dálka DataFrame í pöndum vegna óhagkvæms skyndiminnisaðgangsmynsturs. Það er líka nokkur kostnaður við að vinna með C++ gagnabyggingu og fylki og minnisbuffum þeirra.
Fyrir 1 MB, eins og hér að ofan, á fartölvunni minni (Quad-core Xeon E3-1505M) kemur í ljós:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Það kemur í ljós að skilvirkt afköst er 7.75 GB/s til að endurheimta 1GB DataFrame úr 1024 1MB klumpur. Hvað gerist ef við notum stærri eða smærri bita? Þetta eru niðurstöðurnar:
Afköst lækka verulega úr 256K í 64K klumpur. Það kom mér á óvart að 1 MB bitar voru unnar hraðar en 16 MB bitar. Það er þess virði að gera ítarlegri rannsókn og átta sig á því hvort um eðlilega dreifingu sé að ræða eða hvort eitthvað annað sé í gangi.
Í núverandi útfærslu sniðsins eru gögnin í grundvallaratriðum ekki þjappuð, þannig að stærðin í minni og „í vírunum“ er um það bil sú sama. Í framtíðinni gæti þjöppun orðið viðbótarvalkostur.
Samtals
Straumspilun dálkagagna getur verið áhrifarík leið til að fæða stór gagnasöfn í dálkagreiningartól eins og pöndur í litlum bitum. Gagnaþjónusta sem notar raðmiðaða geymslu getur flutt og yfirfært litla klumpa af gögnum sem eru þægilegri fyrir L2 og L3 skyndiminni örgjörvans þíns.
Fullur kóði
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Heimild: www.habr.com