د اپاچي تیر سره د کالم ډیټا جریان کول

د مقالې ژباړه په ځانګړي ډول د کورس زده کونکو لپاره چمتو شوې وه د معلوماتو انجنیر.

د اپاچي تیر سره د کالم ډیټا جریان کول

په تیرو څو اونیو کې موږ لرو نونګ لي ته اضافه کړه اپاچی تیر د بائنری سټیمینګ بڼه، د موجوده تصادفي لاسرسي/IPC فایل فارمیټ بشپړوي. موږ جاوا او C++ پلي کول او د پایتون پابندۍ لرو. پدې مقاله کې ، زه به تشریح کړم چې څنګه فارمیټ کار کوي او وښیې چې تاسو څنګه کولی شئ د پانډا ډیټا فریم لپاره خورا لوړ ډیټا انټرپټ ترلاسه کړئ.

د کالم ډاټا جریان کول

یوه عامه پوښتنه چې زه د تیر کاروونکو څخه ترلاسه کوم د قطار یا ریکارډ پلوه ب formatې څخه د کالم پر بنسټ ب formatه ته د جدول ډیټا لوی سیټونو لیږدولو لوړ لګښت دی. د ملټي ګیګابایټ ډیټاسیټونو لپاره ، په حافظه یا ډیسک کې لیږد کول خورا لوی کار کیدی شي.

د ډیټا جریان کولو لپاره ، ایا د سرچینې ډیټا قطار وي یا کالم ، یو اختیار دا دی چې د قطارونو کوچنۍ بستې واستول شي ، هر یو دننه د کالم ترتیب لري.

په اپاچي تیر کې، د حافظې د کالم اریونو ټولګه چې د میز ټوټه استازیتوب کوي د ریکارډ بیچ په نوم یادیږي. د منطقي جدول د واحد ډیټا جوړښت نمایندګي کولو لپاره ، د ریکارډونو څو بستې راټول کیدی شي.

په موجوده "تصادفي لاسرسي" فایل فارمیټ کې ، موږ میټاډاټا ثبت کوو چې د میز سکیما لري او د فایل په پای کې ځایونه بلاک کوي ، تاسو ته اجازه درکوي په خورا ارزانه توګه د ریکارډونو کومه ډله یا د ډیټا سیټ څخه کوم کالم غوره کړئ. په سټرینګ بڼه کې، موږ یو لړ پیغامونه لیږو: یو خاکه، او بیا د ریکارډونو یو یا څو بستې.

مختلف شکلونه داسې ښکاري:

د اپاچي تیر سره د کالم ډیټا جریان کول

په PyArrow کې د معلوماتو جریان کول: غوښتنلیک

تاسو ته د ښودلو لپاره چې دا څنګه کار کوي، زه به یو مثال ډیټاسیټ جوړ کړم چې د یو واحد جریان برخه استازیتوب کوي:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

اوس، راځئ چې ووایو چې موږ غواړو د 1 GB ډیټا ولیکئ، چې د 1 MB هر یو برخه لري، د 1024 ټوټو لپاره. د پیل کولو لپاره، راځئ چې د 1 کالمونو سره لومړی 16 MB ډیټا چوکاټ جوړ کړو:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

بیا زه دوی ته بدل کړم pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

اوس به زه یو تولیدي جریان رامینځته کړم چې RAM ته به لیکي او رامینځته به کړي StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

بیا به موږ 1024 ټوټې ولیکو، کوم چې په نهایت کې به د 1GB ډیټا سیټ ته مقدار ورکړي:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

څرنګه چې موږ رام ته لیکلي، موږ کولی شو ټول جریان په یو بفر کې ترلاسه کړو:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

څرنګه چې دا ډاټا په حافظه کې ده، د تیر ریکارډونو بستې لوستل د صفر کاپي عملیات دي. زه StreamReader خلاصوم، په کې ډاټا ولولئ pyarrow.Table، او بیا یې بدل کړئ DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

دا ټول، البته، ښه دي، مګر تاسو ممکن پوښتنې ولرئ. دا څومره ژر پیښیږي؟ د ټوټې اندازه څنګه د پانډا ډیټا فریم بیرته ترلاسه کولو فعالیت اغیزه کوي؟

د جریان فعالیت

لکه څنګه چې د سټرینګ برخې اندازه کمیږي ، په پانډا کې د متضاد کالمر ډیټا فریم بیارغونې لګښت د ناکافي کیچ لاسرسي نمونو له امله ډیریږي. د C++ ډیټا جوړښتونو او صفونو او د دوی حافظې بفرونو سره کار کولو څخه یو څه سر هم شتون لري.

د 1 MB لپاره، لکه څنګه چې پورته، زما په لیپ ټاپ کې (Quad-core Xeon E3-1505M) دا معلومه شوه:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

دا معلومه شوه چې د 7.75 1MB ټوټو څخه د 1024GB ډیټا فریم بحالولو لپاره مؤثره وسیله 1 GB/s ده. څه پیښیږي که موږ لوی یا کوچني ټوټې وکاروو؟ دا پایلې دي:

د اپاچي تیر سره د کالم ډیټا جریان کول

فعالیت د پام وړ له 256K څخه تر 64K پورې کمیږي. زه حیران وم چې د 1 MB ټوټې د 16 MB ټوټې په پرتله ګړندي پروسس شوي. دا د لا ژورې مطالعې ترسره کولو او پوهیدو ارزښت لري چې ایا دا یو نورمال توزیع دی یا ایا په لوبې کې بل څه شتون لري.

د فارمیټ په اوسني پلي کولو کې، ډاټا په اصولو کې نه کمیږي، نو د حافظې اندازه او "په تارونو کې" نږدې ورته ده. په راتلونکي کې، کمپریشن ممکن یو اضافي اختیار شي.

نتیجه

د کالم ډیټا جریان کول د کالم تحلیلي وسیلو لکه پانډا په کوچنیو برخو کې د لوی ډیټا سیټونو تغذیه کولو لپاره مؤثره لاره کیدی شي. د ډیټا خدمتونه چې د قطار پراساس ذخیره کاروي کولی شي د ډیټا کوچنۍ برخې لیږد او لیږد کړي چې ستاسو د پروسیسر L2 او L3 کیچونو لپاره خورا اسانه دي.

بشپړ کوډ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

سرچینه: www.habr.com

Add a comment