ترجمه مقاله به طور اختصاصی برای دانشجویان دوره تهیه شده است
طی چند هفته گذشته، ما
جریان داده های ستون
یک سوال متداول که من از کاربران Arrow دریافت میکنم هزینه بالای انتقال مجموعه دادههای جدولی بزرگ از قالب ردیفی یا رکورد محور به قالب ستونی است. برای مجموعه دادههای چند گیگابایتی، جابهجایی در حافظه یا روی دیسک میتواند بسیار زیاد باشد.
برای جریان دادهها، چه دادههای منبع ردیف یا ستون باشد، یکی از گزینهها ارسال دستههای کوچکی از ردیفها است که هر کدام حاوی طرحبندی ستون داخلی هستند.
در Apache Arrow، مجموعهای از آرایههای ستونی در حافظه که یک تکه جدول را نشان میدهند، دسته رکورد نامیده میشوند. برای نمایش یک ساختار داده واحد از یک جدول منطقی، می توانید چندین مجموعه از رکوردها را جمع آوری کنید.
در قالب فایل "دسترسی تصادفی" موجود، ما متادیتا حاوی طرح جدول و طرح بندی بلوک را در انتهای فایل می نویسیم، که به شما امکان می دهد هر دسته ای از رکوردها یا هر ستونی را از مجموعه داده بسیار ارزان انتخاب کنید. در قالب جریان، یک سری پیام ارسال می کنیم: یک طرحواره، و سپس یک یا چند دسته از رکوردها.
فرمت های مختلف چیزی شبیه به این تصویر هستند:
جریان داده ها در PyArrow: برنامه
برای اینکه به شما نشان دهم چگونه کار می کند، یک مجموعه داده نمونه ایجاد می کنم که یک تکه جریان را نشان می دهد:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
حال، فرض کنید میخواهیم 1 گیگابایت داده بنویسیم که از هر تکه 1 مگابایت تشکیل شده و در مجموع 1024 قطعه میشود. ابتدا بیایید اولین قاب داده 1 مگابایتی را با 16 ستون ایجاد کنیم:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
سپس آنها را به pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
اکنون یک جریان خروجی ایجاد می کنم که در RAM می نویسد و ایجاد می کند StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
سپس 1024 قطعه را می نویسیم که در نهایت یک مجموعه داده 1 گیگابایتی را تشکیل می دهد:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
از آنجایی که ما در RAM نوشتیم، می توانیم کل جریان را در یک بافر دریافت کنیم:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
از آنجایی که این داده ها در حافظه هستند، خواندن دسته های رکوردهای Arrow با یک عملیات کپی صفر به دست می آید. StreamReader را باز می کنم، داده ها را در آن می خوانم pyarrow.Table
و سپس آنها را به DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
همه اینها، البته، خوب است، اما ممکن است سؤالاتی داشته باشید. چقدر سریع اتفاق می افتد؟ اندازه تکه چگونه بر عملکرد بازیابی DataFrame پانداها تأثیر می گذارد؟
عملکرد جریان
با کاهش اندازه قطعه جریان، هزینه بازسازی یک DataFrame ستونی پیوسته در پانداها به دلیل طرحهای دسترسی ناکارآمد به حافظه پنهان افزایش مییابد. همچنین مقداری سربار از کار با ساختارهای داده و آرایه های ++C و بافرهای حافظه آنها وجود دارد.
برای 1 مگابایت مانند بالا، در لپ تاپ من (چهار هسته ای Xeon E3-1505M) معلوم می شود:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
به نظر می رسد که توان عملیاتی موثر 7.75 گیگابیت بر ثانیه برای بازیابی یک دیتا فریم 1 گیگابایتی از 1024 قطعه 1 مگابایتی است. اگر از تکه های بزرگتر یا کوچکتر استفاده کنیم چه اتفاقی می افتد؟ در اینجا نتایجی که به دست می آورید آمده است:
عملکرد به طور قابل توجهی از 256 هزار به 64 هزار قطعه کاهش می یابد. من تعجب کردم که تکه های 1 مگابایتی سریعتر از تکه های 16 مگابایتی پردازش می شوند. ارزش آن را دارد که مطالعه دقیق تری انجام دهید و درک کنید که آیا این یک توزیع نرمال است یا چیز دیگری دخیل است.
در اجرای فعلی قالب، داده ها در اصل فشرده نمی شوند، بنابراین اندازه در حافظه و "روی سیم" تقریباً یکسان است. فشرده سازی ممکن است در آینده به یک گزینه تبدیل شود.
مجموع
پخش جریانی داده های ستون می تواند راهی کارآمد برای انتقال مجموعه داده های بزرگ به ابزارهای تحلیل ستونی مانند پانداها در قطعات کوچک باشد. سرویسهای دادهای که از فضای ذخیرهسازی ردیفگرا استفاده میکنند، میتوانند تکههای کوچکی از دادهها را که برای حافظه نهان L2 و L3 پردازنده شما راحتتر است، انتقال داده و جابهجا کنند.
کد کامل
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
منبع: www.habr.com