Ullmhaíodh aistriúchán an ailt go sonrach do mhic léinn an chúrsa
Le cúpla seachtain anuas ní mór dúinn
Sonraí Colúin Sruthaithe
Ceist choitianta a fhaighim ó úsáideoirí Arrow is ea an costas ard a bhaineann le tacair mhóra sonraí táblaí a aistriú ó fhormáid atá dírithe ar shraith nó taifead go formáid atá dírithe ar cholúin. I gcás tacair sonraí il-gigabyte, is féidir le trasuíomh sa chuimhne nó ar dhiosca a bheith ina thasc iomarcach.
Chun sonraí a shruthú, cibé acu is sraith nó colún iad na sonraí foinseacha, is é rogha amháin baisceanna beaga sraitheanna a sheoladh, agus leagan amach colún ar gach ceann díobh.
In Apache Arrow, tugtar baisc taifid ar bhailiúchán na n-eagair cholúin in-chuimhne a sheasann do smután boird. Chun struchtúr sonraí aonair de thábla loighciúil a léiriú, is féidir roinnt baisceanna taifead a bhailiú.
San fhormáid comhaid "rochtain randamach" atá ann cheana féin, déanaimid taifead ar mheiteashonraí ina bhfuil scéimre an tábla agus láithreacha blocála ag deireadh an chomhaid, rud a ligeann duit aon bhaisc taifead nó aon cholún a roghnú go han-saor ó thacar sonraí. I bhformáid sruthú, cuirimid sraith teachtaireachtaí: breac-chuntas, agus ansin baisc amháin nó níos mó de thaifid.
Breathnaíonn na formáidí éagsúla rud éigin mar seo:
Sonraí Sruthaithe i PyArrow: Feidhmchlár
Chun a thaispeáint duit conas a oibríonn sé seo, cruthóidh mé tacar sonraí samplach a léiríonn smután aon srutha:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Anois, a ligean ar a rá ba mhaith linn a scríobh 1 GB de shonraí, comhdhéanta de smután de 1 MB an ceann, le haghaidh iomlán de 1024 smután. Chun tús a chur leis, cruthaimis an chéad fhráma sonraí 1 MB le 16 cholún:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Ansin mé iad a thiontú go pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Anois cruthóidh mé sruth aschuir a scríobhfaidh chuig RAM agus a chruthóidh StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Ansin scríobhfaimid 1024 smután, arb ionann iad ar deireadh thiar agus tacar sonraí 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Ó scríobhamar chuig RAM, is féidir linn an sruth iomlán a fháil in aon mhaolán amháin:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Ós rud é go bhfuil na sonraí seo i gcuimhne, is oibríocht chóip nialasach é baisceanna de thaifid Arrow a léamh. Osclaím StreamReader, léigh sonraí isteach pyarrow.Table
, agus ansin iad a thiontú go DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Tá sé seo go léir, ar ndóigh, go maith, ach b'fhéidir go mbeadh ceisteanna agat. Cé chomh tapa agus a tharlaíonn sé seo? Conas a théann méid smután i bhfeidhm ar fheidhmíocht aisghabhála pandas DataFrame?
Feidhmíocht Sruthaithe
De réir mar a laghdaítear an smután sruthú, méadaíonn an costas a bhaineann le DataFrame colúnach tadhlach a athchruthú i pandas mar gheall ar phatrúin neamhéifeachtúla rochtana taisce. Tá roinnt forchostais i gceist freisin ó bheith ag obair le struchtúir agus eagair sonraí C++ agus a gcuid maoláin chuimhne.
Le haghaidh 1 MB, mar atá thuas, ar mo ríomhaire glúine (Quad-core Xeon E3-1505M) casadh sé amach:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Tarlaíonn sé gurb é 7.75 GB/s an tréchur éifeachtach chun DataFrame 1GB a aischur ó smután 1024 1MB. Cad a tharlaíonn má úsáidimid smután níos mó nó níos lú? Seo iad na torthaí:
Titeann feidhmíocht go suntasach ó 256K go 64K. Bhí ionadh orm gur próiseáladh smután 1 MB níos tapúla ná smután 16 MB. Is fiú staidéar níos críochnúla a dhéanamh agus tuiscint a fháil ar cibé an gnáthdháileadh é seo nó an bhfuil rud éigin eile i gceist.
I gcur i bhfeidhm reatha na formáide, níl na sonraí comhbhrúite i bprionsabal, mar sin tá an méid sa chuimhne agus "sna sreanga" thart ar an gcéanna. Sa todhchaí, d'fhéadfadh comhbhrú a bheith ina rogha breise.
Iomlán na
Is féidir le sruthú sonraí colúnacha a bheith ina bhealach éifeachtach chun tacair mhóra sonraí a chur in uirlisí anailíse colúnacha amhail pandas i smutáin bheaga. Is féidir le seirbhísí sonraí a úsáideann stóráil atá dírithe ar rónna smután beag sonraí a aistriú agus a thrasuíomh atá níos áisiúla do thaisce L2 agus L3 do phróiseálaithe.
Cód iomlán
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Foinse: will.com