Oversettelsen av artikkelen ble utarbeidet spesielt for studentene på kurset
I løpet av de siste ukene har vi
Streaming av kolonnedata
Et vanlig spørsmål jeg får fra Arrow-brukere er de høye kostnadene ved å migrere store tabellformede datasett fra et rad- eller postorientert format til et kolonneformat. For multi-gigabyte datasett kan transponering i minne eller på disk være overveldende.
For strømmedata, enten kildedataene er rad eller kolonne, er ett alternativ å sende små grupper med rader, som hver inneholder et kolonneoppsett internt.
I Apache Arrow kalles en samling av kolonneformede arrays i minnet som representerer en tabelldel en postbatch. For å representere en enkelt datastruktur i en logisk tabell, kan du samle flere sett med poster.
I det eksisterende "random access"-filformatet skriver vi metadata som inneholder tabellskjemaet og blokkoppsettet på slutten av filen, som lar deg velge en hvilken som helst batch av poster eller hvilken som helst kolonne fra datasettet veldig billig. I et streamingformat sender vi en serie meldinger: et skjema og deretter en eller flere grupper med poster.
De forskjellige formatene ser omtrent slik ut som dette bildet:
Streame data i PyArrow: applikasjon
For å vise deg hvordan det fungerer, lager jeg et eksempeldatasett som representerer en enkelt strømklump:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Anta nå at vi ønsker å skrive 1 GB data, bestående av 1 MB biter hver, for totalt 1024 biter. Først, la oss lage den første 1 MB dataramme med 16 kolonner:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Så konverterer jeg dem til pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nå skal jeg lage en utdatastrøm som vil skrive til RAM og lage StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Deretter vil vi skrive 1024 biter, som til slutt vil utgjøre et 1 GB datasett:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Siden vi skrev i RAM, kan vi få hele strømmen i en buffer:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Fordi disse dataene er i minnet, oppnås lesing av grupper med pilposter ved en nullkopioperasjon. Jeg åpner StreamReader, leser data inn pyarrow.Table
og deretter konvertere dem til DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Alt dette er selvfølgelig bra, men du kan ha spørsmål. Hvor fort skjer det? Hvordan påvirker chunk-størrelsen pandas DataFrame-henting?
Strømmeytelse
Etter hvert som størrelsen på strømmingsdelen reduseres, øker kostnadene ved å rekonstruere en sammenhengende kolonneformet DataFrame i pandaer på grunn av ineffektive hurtigbuffertilgangsordninger. Det er også noen overhead ved å jobbe med C++-datastrukturer og -matriser og deres minnebuffere.
For 1 MB som ovenfor, på min bærbare datamaskin (Quad-core Xeon E3-1505M) viser det seg:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Det viser seg at den effektive gjennomstrømningen er 7.75 Gb/s for å gjenopprette en 1 GB DataFrame fra 1024 1 MB biter. Hva skjer hvis vi bruker større eller mindre biter? Her er resultatene du får:
Ytelsen synker betydelig fra 256K til 64K biter. Jeg ble overrasket over at 1MB-biter ble behandlet raskere enn 16MB-biter. Det er verdt å gjøre en grundigere undersøkelse og forstå om dette er en normalfordeling eller noe annet er involvert.
I den nåværende implementeringen av formatet er dataene i prinsippet ikke komprimert, så størrelsen i minnet og "på ledningen" er omtrent den samme. Komprimering kan bli et alternativ i fremtiden.
Total
Streaming av kolonnedata kan være en effektiv måte å overføre store datasett til kolonneanalyseverktøy som pandaer i små biter. Datatjenester som bruker radorientert lagring kan overføre og transponere små biter av data som er mer praktisk for prosessorens L2- og L3-cache.
Full kode
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Kilde: www.habr.com