Հոդվածի թարգմանությունը պատրաստվել է հատուկ դասընթացի ուսանողների համար
Վերջին մի քանի շաբաթվա ընթացքում մենք ունենք
Սյունակի տվյալների հոսք
Սովորական հարցը, որը ես ստանում եմ Arrow-ի օգտատերերից, աղյուսակային տվյալների մեծ հավաքածուներից տողերի կամ գրառումների վրա հիմնված ձևաչափից սյունակային ձևաչափ տեղափոխելու բարձր արժեքն է: Բազմագիգաբայթանոց տվյալների հավաքածուների համար հիշողության մեջ կամ սկավառակի վրա փոխադրումը կարող է ճնշող խնդիր լինել:
Տվյալների հոսքի համար, անկախ նրանից, թե աղբյուրի տվյալները տող են, թե սյունակ, տարբերակներից մեկը տողերի փոքր խմբաքանակներ ուղարկելն է, որոնցից յուրաքանչյուրը ներսում պարունակում է սյունաձև դասավորություն:
Apache Arrow-ում սեղանի հատվածը ներկայացնող հիշողության մեջ գտնվող սյունակային զանգվածների հավաքածուն կոչվում է ռեկորդային փաթեթ: Տրամաբանական աղյուսակի մեկ տվյալների կառուցվածքը ներկայացնելու համար կարելի է հավաքել գրառումների մի քանի խմբաքանակ:
Գոյություն ունեցող «պատահական մուտքի» ֆայլի ձևաչափում մենք գրանցում ենք մետատվյալներ, որոնք պարունակում են աղյուսակի սխեման և արգելափակում ենք ֆայլի վերջում գտնվող վայրերը, ինչը թույլ է տալիս չափազանց էժան ընտրել գրառումների ցանկացած փաթեթ կամ ցանկացած սյունակ տվյալների հավաքածուից: Հոսքային ձևաչափով մենք ուղարկում ենք մի շարք հաղորդագրություններ՝ ուրվագիծ, այնուհետև գրառումների մեկ կամ մի քանի խմբաքանակ:
Տարբեր ձևաչափերն այսպիսի տեսք ունեն.
Տվյալների հոսք PyArrow-ում. հավելված
Ձեզ ցույց տալու համար, թե ինչպես է սա աշխատում, ես կստեղծեմ տվյալների մի օրինակ, որը ներկայացնում է հոսքի մեկ կտոր.
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Հիմա, ենթադրենք, ուզում ենք գրել 1 ԳԲ տվյալ, որը բաղկացած է յուրաքանչյուրը 1 ՄԲ-անոց հատվածներից, ընդհանուր 1024 կտորով: Սկսելու համար, եկեք ստեղծենք առաջին 1 ՄԲ տվյալների շրջանակը 16 սյունակով.
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Հետո ես դրանք փոխակերպում եմ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Այժմ ես կստեղծեմ ելքային հոսք, որը կգրի RAM-ին և կստեղծի StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Այնուհետև մենք կգրենք 1024 կտոր, որն ի վերջո կկազմի 1 ԳԲ տվյալների հավաքածու.
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Քանի որ մենք գրել ենք RAM-ին, մենք կարող ենք ստանալ ամբողջ հոսքը մեկ բուֆերում.
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Քանի որ այս տվյալները հիշողության մեջ են, Arrow գրառումների խմբաքանակների ընթերցումը զրոյական պատճենման գործողություն է: Ես բացում եմ StreamReader-ը, կարդում եմ տվյալները pyarrow.Table
, և այնուհետև փոխակերպեք դրանք DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Այս ամենը, իհարկե, լավ է, բայց դուք կարող եք հարցեր ունենալ: Որքա՞ն արագ է դա տեղի ունենում: Ինչպե՞ս է կտորի չափը ազդում պանդաների DataFrame որոնման աշխատանքի վրա:
Հոսքային կատարում
Քանի որ հոսքային հատվածի չափը նվազում է, պանդաներում կից սյունակային DataFrame-ի վերակառուցման արժեքը մեծանում է քեշի մուտքի անարդյունավետ ձևերի պատճառով: Գոյություն ունի նաև C++ տվյալների կառուցվածքների և զանգվածների և դրանց հիշողության բուֆերների հետ աշխատելը:
1 ՄԲ-ի համար, ինչպես վերևում, իմ նոութբուքի վրա (Quad-core Xeon E3-1505M) ստացվում է.
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Պարզվում է, որ արդյունավետ թողունակությունը 7.75 Գբ/վ է 1 1024ՄԲ հատվածից 1 ԳԲ DataFrame-ը վերականգնելու համար: Ի՞նչ կլինի, եթե մենք օգտագործենք ավելի մեծ կամ փոքր կտորներ: Սրանք արդյունքներն են.
Արդյունավետությունը զգալիորեն նվազում է 256K-ից մինչև 64K կտոր: Ես զարմացա, որ 1 ՄԲ կտորներն ավելի արագ են մշակվում, քան 16 ՄԲ կտորները: Արժե ավելի մանրակրկիտ ուսումնասիրություն կատարել և հասկանալ՝ սա նորմալ բաշխում է, թե այլ բան կա:
Ձևաչափի ընթացիկ իրականացման դեպքում տվյալները սկզբունքորեն չեն սեղմվում, ուստի հիշողության մեջ և «լարերի մեջ» չափը մոտավորապես նույնն է: Ապագայում սեղմումը կարող է լրացուցիչ տարբերակ դառնալ։
Լրիվ
Սյունակային տվյալների հոսքը կարող է արդյունավետ միջոց լինել տվյալների մեծ հավաքածուները սյունակային վերլուծական գործիքներին սնելու համար, ինչպիսիք են պանդաները փոքր կտորներով: Տվյալների ծառայությունները, որոնք օգտագործում են շարքի վրա հիմնված պահեստավորում, կարող են փոխանցել և փոխադրել տվյալների փոքր կտորներ, որոնք ավելի հարմար են ձեր պրոցեսորի L2 և L3 քեշերի համար:
Ամբողջական կոդը
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Source: www.habr.com