Artikli tõlge koostati spetsiaalselt kursuse üliõpilastele
Viimase paari nädala jooksul oleme
Veeruandmete voogesitus
Arrow kasutajatelt küsitav tavaline küsimus on suurte tabeliandmete kogumite rea- või kirjeorienteeritud vormingust veerupõhisesse vormingusse üleviimise kõrge hind. Mitme gigabaidiste andmekogumite puhul võib mällu või kettale ümberpaigutamine olla üle jõu käiv ülesanne.
Andmete voogesitamiseks, olenemata sellest, kas lähteandmeteks on rida või veerg, on üks võimalus saata väikesed ridade komplektid, millest igaüks sisaldab veergude paigutust.
Apache Nooles nimetatakse tabelitükki esindavate mälusiseste veerumassiivide kogumit kirjepartiiks. Loogilise tabeli ühe andmestruktuuri esitamiseks saab koguda mitu kirjete partiid.
Olemasolevas "juhusliku juurdepääsu" failivormingus salvestame metaandmed, mis sisaldavad tabeli skeemi ja plokkide asukohti faili lõppu, võimaldades teil ülimalt odavalt valida andmekogust mis tahes kirjete partii või mis tahes veeru. Voogesituse vormingus saadame rea sõnumeid: ülevaate ja seejärel ühe või mitu kirjete partiid.
Erinevad vormingud näevad välja umbes sellised:
Andmete voogesitamine PyArrowis: rakendus
Et näidata teile, kuidas see toimib, loon näidisandmestiku, mis esindab ühte voojuppi:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Oletame nüüd, et tahame kirjutada 1 GB andmeid, millest igaüks koosneb 1 MB suurustest tükkidest, kokku 1024 tükki. Alustuseks loome esimese 1 MB andmeraami 16 veeruga:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Seejärel teisendan need teiseks pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nüüd loon väljundvoo, mis kirjutab RAM-i ja loob StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Seejärel kirjutame 1024 tükki, mis lõpuks moodustab 1 GB andmestiku:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Kuna kirjutasime RAM-i, saame kogu voo ühte puhvris:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Kuna need andmed on mälus, on Arrow kirjete partiide lugemine nullkoopia toiming. Avan StreamReaderi, loen andmed sisse pyarrow.Table
ja seejärel teisendage need järgmiseks DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Kõik see on muidugi hea, kuid teil võib tekkida küsimusi. Kui kiiresti see juhtub? Kuidas mõjutab tüki suurus pandade DataFrame'i otsingu jõudlust?
Voogesituse jõudlus
Voogesituse tüki suuruse vähenedes suurenevad pandades külgneva veerulise DataFrame'i rekonstrueerimise kulud ebatõhusate vahemälu juurdepääsumustrite tõttu. C++ andmestruktuuride ja -massiivide ning nende mälupuhvritega töötamisega kaasneb ka mõningane lisakulu.
1 MB jaoks, nagu ülal, selgub minu sülearvutis (neljatuumaline Xeon E3-1505M):
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Selgub, et efektiivne läbilaskevõime on 7.75 GB/s, et taastada 1 GB DataFrame 1024 1 MB tükkist. Mis juhtub, kui kasutame suuremaid või väiksemaid tükke? Need on tulemused:
Jõudlus langeb märkimisväärselt, 256 64 tükki 1 16 tükki. Olin üllatunud, et XNUMX MB tükke töödeldi kiiremini kui XNUMX MB tükke. Tasub teha põhjalikum uuring ja mõista, kas tegemist on normaaljaotusega või on mängus midagi muud.
Vormingu praeguses teostuses andmeid põhimõtteliselt ei tihendata, seega on suurus mälus ja "juhtmetes" ligikaudu sama. Tulevikus võib tihendamine saada lisavõimaluseks.
Summaarne
Veerukujuliste andmete voogesitamine võib olla tõhus viis suurte andmekogumite sisestamiseks veergude analüüsitööriistadesse, nagu pandad väikeste tükkidena. Andmeteenused, mis kasutavad ridadele orienteeritud salvestusruumi, saavad edastada ja transponeerida väikeseid andmetükke, mis on teie protsessori L2- ja L3-vahemälu jaoks mugavamad.
Täielik kood
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Allikas: www.habr.com