استریم داده های ستون با پیکان آپاچی

ترجمه مقاله به طور اختصاصی برای دانشجویان دوره تهیه شده است مهندس داده.

استریم داده های ستون با پیکان آپاچی

طی چند هفته گذشته، ما نونگ لی اضافه شده به پیکان آپاچی فرمت جریان باینری، تکمیل فرمت فایل دسترسی تصادفی/IPC موجود. ما پیاده سازی های جاوا و سی پلاس پلاس و پیوندهای پایتون داریم. در این مقاله، نحوه کار این فرمت را توضیح می‌دهم و نشان می‌دهم که چگونه می‌توانید به توان داده بسیار بالایی برای DataFrame پانداها دست پیدا کنید.

جریان داده های ستون

یک سوال متداول که من از کاربران Arrow دریافت می‌کنم هزینه بالای انتقال مجموعه داده‌های جدولی بزرگ از قالب ردیفی یا رکورد محور به قالب ستونی است. برای مجموعه داده‌های چند گیگابایتی، جابه‌جایی در حافظه یا روی دیسک می‌تواند بسیار زیاد باشد.

برای جریان داده‌ها، چه داده‌های منبع ردیف یا ستون باشد، یکی از گزینه‌ها ارسال دسته‌های کوچکی از ردیف‌ها است که هر کدام حاوی طرح‌بندی ستون داخلی هستند.

در Apache Arrow، مجموعه‌ای از آرایه‌های ستونی در حافظه که یک تکه جدول را نشان می‌دهند، دسته رکورد نامیده می‌شوند. برای نمایش یک ساختار داده واحد از یک جدول منطقی، می توانید چندین مجموعه از رکوردها را جمع آوری کنید.

در قالب فایل "دسترسی تصادفی" موجود، ما متادیتا حاوی طرح جدول و طرح بندی بلوک را در انتهای فایل می نویسیم، که به شما امکان می دهد هر دسته ای از رکوردها یا هر ستونی را از مجموعه داده بسیار ارزان انتخاب کنید. در قالب جریان، یک سری پیام ارسال می کنیم: یک طرحواره، و سپس یک یا چند دسته از رکوردها.

فرمت های مختلف چیزی شبیه به این تصویر هستند:

استریم داده های ستون با پیکان آپاچی

جریان داده ها در PyArrow: برنامه

برای اینکه به شما نشان دهم چگونه کار می کند، یک مجموعه داده نمونه ایجاد می کنم که یک تکه جریان را نشان می دهد:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

حال، فرض کنید می‌خواهیم 1 گیگابایت داده بنویسیم که از هر تکه 1 مگابایت تشکیل شده و در مجموع 1024 قطعه می‌شود. ابتدا بیایید اولین قاب داده 1 مگابایتی را با 16 ستون ایجاد کنیم:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

سپس آنها را به pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

اکنون یک جریان خروجی ایجاد می کنم که در RAM می نویسد و ایجاد می کند StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

سپس 1024 قطعه را می نویسیم که در نهایت یک مجموعه داده 1 گیگابایتی را تشکیل می دهد:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

از آنجایی که ما در RAM نوشتیم، می توانیم کل جریان را در یک بافر دریافت کنیم:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

از آنجایی که این داده ها در حافظه هستند، خواندن دسته های رکوردهای Arrow با یک عملیات کپی صفر به دست می آید. StreamReader را باز می کنم، داده ها را در آن می خوانم pyarrow.Tableو سپس آنها را به DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

همه اینها، البته، خوب است، اما ممکن است سؤالاتی داشته باشید. چقدر سریع اتفاق می افتد؟ اندازه تکه چگونه بر عملکرد بازیابی DataFrame پانداها تأثیر می گذارد؟

عملکرد جریان

با کاهش اندازه قطعه جریان، هزینه بازسازی یک DataFrame ستونی پیوسته در پانداها به دلیل طرح‌های دسترسی ناکارآمد به حافظه پنهان افزایش می‌یابد. همچنین مقداری سربار از کار با ساختارهای داده و آرایه های ++C و بافرهای حافظه آنها وجود دارد.

برای 1 مگابایت مانند بالا، در لپ تاپ من (چهار هسته ای Xeon E3-1505M) معلوم می شود:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

به نظر می رسد که توان عملیاتی موثر 7.75 گیگابیت بر ثانیه برای بازیابی یک دیتا فریم 1 گیگابایتی از 1024 قطعه 1 مگابایتی است. اگر از تکه های بزرگتر یا کوچکتر استفاده کنیم چه اتفاقی می افتد؟ در اینجا نتایجی که به دست می آورید آمده است:

استریم داده های ستون با پیکان آپاچی

عملکرد به طور قابل توجهی از 256 هزار به 64 هزار قطعه کاهش می یابد. من تعجب کردم که تکه های 1 مگابایتی سریعتر از تکه های 16 مگابایتی پردازش می شوند. ارزش آن را دارد که مطالعه دقیق تری انجام دهید و درک کنید که آیا این یک توزیع نرمال است یا چیز دیگری دخیل است.

در اجرای فعلی قالب، داده ها در اصل فشرده نمی شوند، بنابراین اندازه در حافظه و "روی سیم" تقریباً یکسان است. فشرده سازی ممکن است در آینده به یک گزینه تبدیل شود.

مجموع

پخش جریانی داده های ستون می تواند راهی کارآمد برای انتقال مجموعه داده های بزرگ به ابزارهای تحلیل ستونی مانند پانداها در قطعات کوچک باشد. سرویس‌های داده‌ای که از فضای ذخیره‌سازی ردیف‌گرا استفاده می‌کنند، می‌توانند تکه‌های کوچکی از داده‌ها را که برای حافظه نهان L2 و L3 پردازنده شما راحت‌تر است، انتقال داده و جابه‌جا کنند.

کد کامل

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

منبع: www.habr.com

اضافه کردن نظر