Artikkelin käännös on tehty erityisesti kurssin opiskelijoille
Viime viikkoina meillä on
Suoratoisto saraketiedot
Arrow-käyttäjiltä saamani yleinen kysymys on suurten taulukkotietojen siirtämisen korkeat kustannukset rivi- tai tietuesuuntautuneesta muodosta sarakesuuntautuneeseen muotoon. Usean gigatavun tietojoukoissa transponointi muistiin tai levylle voi olla ylivoimainen tehtävä.
Datan suoratoistoa varten, olipa lähdetieto rivi tai sarake, yksi vaihtoehto on lähettää pieniä rivieriä, joista jokainen sisältää sarakeasettelun.
Apache Arrowssa taulukkopalaa edustavien muistissa olevien saraketaulukoiden kokoelmaa kutsutaan tietueeriksi. Loogisen taulukon yksittäisen tietorakenteen edustamiseksi voidaan kerätä useita tietueeriä.
Olemassa olevaan "random access" -tiedostomuotoon tallennamme metatiedot, jotka sisältävät taulukon skeeman ja lohkon sijainnit tiedoston loppuun, jolloin voit erittäin edullisesti valita minkä tahansa tietueerän tai minkä tahansa sarakkeen tietojoukosta. Suoratoistomuodossa lähetämme sarjan viestejä: luonnoksen ja sitten yhden tai useamman tietueerän.
Eri muodot näyttävät suunnilleen tältä:
Tietojen suoratoisto PyArrowissa: Sovellus
Näytän sinulle, miten tämä toimii, luomalla esimerkkitietojoukon, joka edustaa yhtä stream-osaa:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Oletetaan nyt, että haluamme kirjoittaa 1 Gt:n dataa, joka koostuu kukin 1 Mt:n paloista, yhteensä 1024 kappaletta. Aluksi luodaan ensimmäinen 1 Mt:n tietokehys, jossa on 16 saraketta:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Sitten muuntelen ne sellaisiksi pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Nyt luon ulostulovirran, joka kirjoittaa RAM-muistiin ja luo StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Sitten kirjoitamme 1024 palaa, joka lopulta vastaa 1 Gt:n tietojoukkoa:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Koska kirjoitimme RAM-muistiin, voimme saada koko streamin yhteen puskuriin:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Koska nämä tiedot ovat muistissa, Arrow-tietueiden erien lukeminen on nollakopiointitoiminto. Avaan StreamReaderin, luen tiedot sisään pyarrow.Table
ja muuntaa ne sitten muotoon DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Kaikki tämä on tietysti hyvä, mutta sinulla voi olla kysyttävää. Kuinka nopeasti tämä tapahtuu? Miten palan koko vaikuttaa pandan DataFrame-haun suorituskykyyn?
Suoratoiston suorituskyky
Kun suoratoistokappaleen koko pienenee, pandassa olevan peräkkäisen sarakepohjaisen DataFrame-kehyksen rekonstruoinnin kustannukset kasvavat tehottomien välimuistin käyttötapojen vuoksi. Myös C++-tietorakenteiden ja -taulukoiden sekä niiden muistipuskurien kanssa työskentely aiheuttaa jonkin verran lisäkustannuksia.
Yhdellä megatavulla, kuten yllä, kannettavassa tietokoneessani (neliytiminen Xeon E1-3M) käy:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Osoittautuu, että tehokas suoritusnopeus on 7.75 Gt/s palauttamaan 1 Gt DataFrame 1024:stä 1 Mt:n paloista. Mitä tapahtuu, jos käytämme suurempia tai pienempiä paloja? Nämä ovat tulokset:
Suorituskyky putoaa merkittävästi 256 64 kappaleesta 1 16 paloiksi. Olin yllättynyt, että XNUMX Mt:n palaset käsiteltiin nopeammin kuin XNUMX Mt:n palaset. Kannattaa tehdä perusteellisempi tutkimus ja ymmärtää, onko tämä normaalijakauma vai onko pelissä jotain muuta.
Formaatin nykyisessä toteutuksessa dataa ei periaatteessa pakata, joten koko muistissa ja "johdoissa" on suunnilleen sama. Jatkossa pakkaus voi olla lisävaihtoehto.
Koko
Saraketietojen suoratoisto voi olla tehokas tapa syöttää suuria tietojoukkoja sarakeanalytiikkatyökaluihin, kuten pandoihin pieninä paloina. Rivipohjaista tallennusta käyttävät tietopalvelut voivat siirtää ja transponoida pieniä tietopaloja, jotka sopivat paremmin prosessorisi L2- ja L3-välimuistiin.
Täysi koodi
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Lähde: will.com