اپاچی ایرو کے ساتھ کالم ڈیٹا کو اسٹریم کرنا

مضمون کا ترجمہ خاص طور پر کورس کے طلباء کے لیے تیار کیا گیا تھا۔ ڈیٹا انجینئر.

اپاچی ایرو کے ساتھ کالم ڈیٹا کو اسٹریم کرنا

گزشتہ چند ہفتوں کے دوران، ہم نونگ لی میں اضافہ ہوا اپاچی تیر بائنری سٹریم فارمیٹ، پہلے سے موجود رینڈم رسائی/IPC فائل فارمیٹ کی تکمیل کرتا ہے۔ ہمارے پاس جاوا اور C++ نفاذ اور Python بائنڈنگز ہیں۔ اس آرٹیکل میں، میں وضاحت کروں گا کہ فارمیٹ کیسے کام کرتا ہے اور دکھاؤں گا کہ آپ پانڈاس ڈیٹا فریم کے لیے بہت زیادہ ڈیٹا تھرو پٹ کیسے حاصل کر سکتے ہیں۔

اسٹریمنگ کالم ڈیٹا

ایک عام سوال جو مجھے یرو صارفین سے ملتا ہے وہ ہے بڑے ٹیبلولر ڈیٹاسیٹس کو قطار یا ریکارڈ پر مبنی فارمیٹ سے کالم فارمیٹ میں منتقل کرنے کی زیادہ قیمت۔ ملٹی گیگا بائٹ ڈیٹا سیٹس کے لیے، میموری میں یا ڈسک پر منتقل کرنا بہت زیادہ ہو سکتا ہے۔

سٹریمنگ ڈیٹا کے لیے، چاہے سورس ڈیٹا قطار ہو یا کالم، ایک آپشن یہ ہے کہ قطاروں کے چھوٹے بیچ بھیجے جائیں، ہر ایک کالم لے آؤٹ پر مشتمل ہو۔

اپاچی ایرو میں، ان میموری کالم اریوں کا مجموعہ جو ٹیبل کے حصے کی نمائندگی کرتا ہے اسے ریکارڈ بیچ کہا جاتا ہے۔ منطقی جدول کے واحد ڈیٹا ڈھانچے کی نمائندگی کرنے کے لیے، آپ ریکارڈ کے کئی سیٹ جمع کر سکتے ہیں۔

موجودہ "رینڈم ایکسیس" فائل فارمیٹ میں، ہم فائل کے آخر میں ٹیبل اسکیما اور بلاک لے آؤٹ پر مشتمل میٹا ڈیٹا لکھتے ہیں، جو آپ کو ڈیٹا سیٹ سے کسی بھی ریکارڈ یا کالم کو بہت سستے طریقے سے منتخب کرنے کی اجازت دیتا ہے۔ اسٹریمنگ فارمیٹ میں، ہم پیغامات کی ایک سیریز بھیجتے ہیں: ایک اسکیما، اور پھر ریکارڈ کے ایک یا زیادہ بیچز۔

مختلف شکلیں کچھ اس تصویر کی طرح نظر آتی ہیں:

اپاچی ایرو کے ساتھ کالم ڈیٹا کو اسٹریم کرنا

PyArrow میں سٹریمنگ ڈیٹا: ایپلی کیشن

آپ کو یہ دکھانے کے لیے کہ یہ کیسے کام کرتا ہے، میں ایک مثالی ڈیٹاسیٹ بناؤں گا جو ایک واحد سلسلہ کے حصے کی نمائندگی کرتا ہے:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

اب، فرض کریں کہ ہم کل 1 ٹکڑوں کے لیے 1 MB ٹکڑوں پر مشتمل 1024 GB ڈیٹا لکھنا چاہتے ہیں۔ سب سے پہلے، آئیے 1 کالموں کے ساتھ پہلا 16MB ڈیٹا فریم بنائیں:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

پھر میں ان کو تبدیل کرتا ہوں۔ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

اب میں ایک آؤٹ پٹ سٹریم بناؤں گا جو رام پر لکھے گا اور تخلیق کرے گا۔ StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

پھر ہم 1024 ٹکڑے لکھیں گے، جو آخر کار 1GB ڈیٹاسیٹ بنائے گا:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

چونکہ ہم نے RAM میں لکھا ہے، ہم ایک بفر میں پوری ندی حاصل کر سکتے ہیں:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

چونکہ یہ ڈیٹا میموری میں ہے، اس لیے ایرو ریکارڈ برسٹ کو پڑھنا صفر کاپی آپریشن کے ذریعے حاصل کیا جاتا ہے۔ میں StreamReader کھولتا ہوں، اس میں ڈیٹا پڑھتا ہوں۔ pyarrow.Tableاور پھر ان میں تبدیل کریں۔ DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

یہ سب کچھ، یقیناً اچھا ہے، لیکن آپ کے سوالات ہو سکتے ہیں۔ یہ کتنی تیزی سے ہوتا ہے؟ حصہ کا سائز پانڈا ڈیٹا فریم کی بازیافت کی کارکردگی کو کیسے متاثر کرتا ہے؟

سلسلہ بندی کی کارکردگی

جیسے جیسے سٹریمنگ حصہ کا سائز کم ہوتا ہے، ناکارہ کیشے تک رسائی کی اسکیموں کی وجہ سے پانڈوں میں ایک مربوط کالمی ڈیٹا فریم کو دوبارہ بنانے کی لاگت بڑھ جاتی ہے۔ C++ ڈیٹا ڈھانچے اور صفوں اور ان کے میموری بفرز کے ساتھ کام کرنے سے کچھ اوور ہیڈ بھی ہے۔

اوپر کی طرح 1 MB کے لیے، میرے لیپ ٹاپ (Quad-core Xeon E3-1505M) پر یہ پتہ چلتا ہے:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

یہ پتہ چلتا ہے کہ 7.75 1 MB حصوں سے 1024 GB ڈیٹا فریم کو بحال کرنے کے لیے موثر تھرو پٹ 1 Gb/s ہے۔ اگر ہم بڑے یا چھوٹے ٹکڑوں کا استعمال کریں تو کیا ہوگا؟ آپ کو ملنے والے نتائج یہ ہیں:

اپاچی ایرو کے ساتھ کالم ڈیٹا کو اسٹریم کرنا

کارکردگی نمایاں طور پر 256K سے 64K حصوں تک گر جاتی ہے۔ میں حیران تھا کہ 1MB ٹکڑوں پر 16MB ٹکڑوں سے زیادہ تیزی سے کارروائی کی گئی۔ مزید گہرائی سے مطالعہ کرنے اور یہ سمجھنے کے قابل ہے کہ آیا یہ ایک عام تقسیم ہے یا اس میں کوئی اور چیز شامل ہے۔

فارمیٹ کے موجودہ نفاذ میں، ڈیٹا کو اصولی طور پر کمپریس نہیں کیا جاتا ہے، اس لیے میموری اور "وائر پر" کا سائز تقریباً ایک جیسا ہے۔ کمپریشن مستقبل میں ایک آپشن بن سکتا ہے۔

کل

کالم ڈیٹا کو سٹریم کرنا بڑے ڈیٹا سیٹس کو کالم اینالیٹکس ٹولز جیسے چھوٹے حصوں میں پانڈوں میں منتقل کرنے کا ایک موثر طریقہ ہو سکتا ہے۔ ڈیٹا سروسز جو قطار پر مبنی اسٹوریج کا استعمال کرتی ہیں وہ ڈیٹا کے چھوٹے ٹکڑوں کو منتقل اور منتقل کر سکتی ہیں جو آپ کے پروسیسر کے L2 اور L3 کیشے کے لیے زیادہ آسان ہیں۔

مکمل کوڈ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

ماخذ: www.habr.com

نیا تبصرہ شامل کریں