Սյունակի տվյալների հոսք Apache Arrow-ով

Հոդվածի թարգմանությունը պատրաստվել է հատուկ դասընթացի ուսանողների համար Տվյալների ինժեներ.

Սյունակի տվյալների հոսք Apache Arrow-ով

Վերջին մի քանի շաբաթվա ընթացքում մենք ունենք Նոնգ Լի ավելացվել է Apache Arrow երկուական հոսքային ձևաչափ, որը լրացնում է առկա պատահական մուտքի/IPC ֆայլի ձևաչափը: Մենք ունենք Java և C++ իրականացումներ և Python կապեր: Այս հոդվածում ես կբացատրեմ, թե ինչպես է աշխատում ձևաչափը և ցույց կտամ, թե ինչպես կարող եք տվյալների շատ բարձր թողունակություն ձեռք բերել պանդաների DataFrame-ի համար:

Սյունակի տվյալների հոսք

Սովորական հարցը, որը ես ստանում եմ Arrow-ի օգտատերերից, աղյուսակային տվյալների մեծ հավաքածուներից տողերի կամ գրառումների վրա հիմնված ձևաչափից սյունակային ձևաչափ տեղափոխելու բարձր արժեքն է: Բազմագիգաբայթանոց տվյալների հավաքածուների համար հիշողության մեջ կամ սկավառակի վրա փոխադրումը կարող է ճնշող խնդիր լինել:

Տվյալների հոսքի համար, անկախ նրանից, թե աղբյուրի տվյալները տող են, թե սյունակ, տարբերակներից մեկը տողերի փոքր խմբաքանակներ ուղարկելն է, որոնցից յուրաքանչյուրը ներսում պարունակում է սյունաձև դասավորություն:

Apache Arrow-ում սեղանի հատվածը ներկայացնող հիշողության մեջ գտնվող սյունակային զանգվածների հավաքածուն կոչվում է ռեկորդային փաթեթ: Տրամաբանական աղյուսակի մեկ տվյալների կառուցվածքը ներկայացնելու համար կարելի է հավաքել գրառումների մի քանի խմբաքանակ:

Գոյություն ունեցող «պատահական մուտքի» ֆայլի ձևաչափում մենք գրանցում ենք մետատվյալներ, որոնք պարունակում են աղյուսակի սխեման և արգելափակում ենք ֆայլի վերջում գտնվող վայրերը, ինչը թույլ է տալիս չափազանց էժան ընտրել գրառումների ցանկացած փաթեթ կամ ցանկացած սյունակ տվյալների հավաքածուից: Հոսքային ձևաչափով մենք ուղարկում ենք մի շարք հաղորդագրություններ՝ ուրվագիծ, այնուհետև գրառումների մեկ կամ մի քանի խմբաքանակ:

Տարբեր ձևաչափերն այսպիսի տեսք ունեն.

Սյունակի տվյալների հոսք Apache Arrow-ով

Տվյալների հոսք PyArrow-ում. հավելված

Ձեզ ցույց տալու համար, թե ինչպես է սա աշխատում, ես կստեղծեմ տվյալների մի օրինակ, որը ներկայացնում է հոսքի մեկ կտոր.

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Հիմա, ենթադրենք, ուզում ենք գրել 1 ԳԲ տվյալ, որը բաղկացած է յուրաքանչյուրը 1 ՄԲ-անոց հատվածներից, ընդհանուր 1024 կտորով: Սկսելու համար, եկեք ստեղծենք առաջին 1 ՄԲ տվյալների շրջանակը 16 սյունակով.

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Հետո ես դրանք փոխակերպում եմ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Այժմ ես կստեղծեմ ելքային հոսք, որը կգրի RAM-ին և կստեղծի StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Այնուհետև մենք կգրենք 1024 կտոր, որն ի վերջո կկազմի 1 ԳԲ տվյալների հավաքածու.

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Քանի որ մենք գրել ենք RAM-ին, մենք կարող ենք ստանալ ամբողջ հոսքը մեկ բուֆերում.

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Քանի որ այս տվյալները հիշողության մեջ են, Arrow գրառումների խմբաքանակների ընթերցումը զրոյական պատճենման գործողություն է: Ես բացում եմ StreamReader-ը, կարդում եմ տվյալները pyarrow.Table, և այնուհետև փոխակերպեք դրանք DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Այս ամենը, իհարկե, լավ է, բայց դուք կարող եք հարցեր ունենալ: Որքա՞ն արագ է դա տեղի ունենում: Ինչպե՞ս է կտորի չափը ազդում պանդաների DataFrame որոնման աշխատանքի վրա:

Հոսքային կատարում

Քանի որ հոսքային հատվածի չափը նվազում է, պանդաներում կից սյունակային DataFrame-ի վերակառուցման արժեքը մեծանում է քեշի մուտքի անարդյունավետ ձևերի պատճառով: Գոյություն ունի նաև C++ տվյալների կառուցվածքների և զանգվածների և դրանց հիշողության բուֆերների հետ աշխատելը:

1 ՄԲ-ի համար, ինչպես վերևում, իմ նոութբուքի վրա (Quad-core Xeon E3-1505M) ստացվում է.

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Պարզվում է, որ արդյունավետ թողունակությունը 7.75 Գբ/վ է 1 1024ՄԲ հատվածից 1 ԳԲ DataFrame-ը վերականգնելու համար: Ի՞նչ կլինի, եթե մենք օգտագործենք ավելի մեծ կամ փոքր կտորներ: Սրանք արդյունքներն են.

Սյունակի տվյալների հոսք Apache Arrow-ով

Արդյունավետությունը զգալիորեն նվազում է 256K-ից մինչև 64K կտոր: Ես զարմացա, որ 1 ՄԲ կտորներն ավելի արագ են մշակվում, քան 16 ՄԲ կտորները: Արժե ավելի մանրակրկիտ ուսումնասիրություն կատարել և հասկանալ՝ սա նորմալ բաշխում է, թե այլ բան կա:

Ձևաչափի ընթացիկ իրականացման դեպքում տվյալները սկզբունքորեն չեն սեղմվում, ուստի հիշողության մեջ և «լարերի մեջ» չափը մոտավորապես նույնն է: Ապագայում սեղմումը կարող է լրացուցիչ տարբերակ դառնալ։

Լրիվ

Սյունակային տվյալների հոսքը կարող է արդյունավետ միջոց լինել տվյալների մեծ հավաքածուները սյունակային վերլուծական գործիքներին սնելու համար, ինչպիսիք են պանդաները փոքր կտորներով: Տվյալների ծառայությունները, որոնք օգտագործում են շարքի վրա հիմնված պահեստավորում, կարող են փոխանցել և փոխադրել տվյալների փոքր կտորներ, որոնք ավելի հարմար են ձեր պրոցեսորի L2 և L3 քեշերի համար:

Ամբողջական կոդը

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: www.habr.com

Добавить комментарий