Apache Arrow yordamida ustunli ma'lumotlarni uzatish

Maqolaning tarjimasi kurs talabalari uchun maxsus tayyorlangan Ma'lumotlar muhandisi.

Apache Arrow yordamida ustunli ma'lumotlarni uzatish

So'nggi bir necha hafta ichida bizda bor Nong Li ga qo'shildi Apache strelkasi mavjud tasodifiy kirish/IPC fayl formatini to'ldiruvchi ikkilik oqim formati. Bizda Java va C++ ilovalari va Python ulanishlari mavjud. Ushbu maqolada men format qanday ishlashini tushuntirib beraman va pandas DataFrame uchun qanday qilib juda yuqori ma'lumot o'tkazuvchanligiga erishish mumkinligini ko'rsataman.

Ustun ma'lumotlarini oqimlash

Arrow foydalanuvchilaridan men oladigan keng tarqalgan savol - bu katta jadval ma'lumotlar to'plamini satr yoki yozuvga yo'naltirilgan formatdan ustunga yo'naltirilgan formatga ko'chirishning yuqori narxi. Ko'p gigabaytli ma'lumotlar to'plamlari uchun xotirada yoki diskda ko'chirish juda qiyin vazifa bo'lishi mumkin.

Ma'lumotlarni uzatish uchun, manba ma'lumotlar satr yoki ustun bo'ladimi, bitta variant - har birida ustunli tartib mavjud bo'lgan kichik qatorlar to'plamini yuborish.

Apache Arrow-da jadval bo'lagini ifodalovchi xotiradagi ustunlar massivlari to'plami rekord to'plam deb ataladi. Mantiqiy jadvalning yagona ma'lumotlar strukturasini ifodalash uchun bir nechta yozuvlar to'plamini to'plash mumkin.

Mavjud "tasodifiy kirish" fayl formatida biz jadval sxemasini o'z ichiga olgan metama'lumotlarni va fayl oxiridagi bloklangan joylarni yozib olamiz, bu sizga ma'lumotlar to'plamidan istalgan yozuvlar to'plamini yoki istalgan ustunni juda arzonga tanlash imkonini beradi. Oqim formatida biz bir qator xabarlarni yuboramiz: kontur, keyin esa bir yoki bir nechta yozuvlar to'plami.

Turli formatlar quyidagicha ko'rinadi:

Apache Arrow yordamida ustunli ma'lumotlarni uzatish

PyArrow-da ma'lumotlarni uzatish: Ilova

Bu qanday ishlashini ko'rsatish uchun men bitta oqim bo'lagini ifodalovchi misol ma'lumotlar to'plamini yarataman:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Keling, deylik, har biri 1 MB bo'lgan qismlardan iborat bo'lgan 1 Gb ma'lumotni jami 1024 bo'lak uchun yozmoqchimiz. Boshlash uchun 1 ta ustunli birinchi 16 MB ma'lumotlar ramkasini yaratamiz:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Keyin ularni aylantiraman pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Endi men RAMga yozadigan va yaratadigan chiqish oqimini yarataman StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Keyin biz 1024 ta bo'lakni yozamiz, bu oxir-oqibat 1 Gb ma'lumotlar to'plamini tashkil qiladi:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Biz RAMga yozganimiz sababli, biz butun oqimni bitta buferda olishimiz mumkin:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Ushbu ma'lumotlar xotirada bo'lganligi sababli, Arrow yozuvlarining to'plamlarini o'qish nol nusxadagi operatsiya hisoblanadi. Men StreamReader-ni ochaman, ma'lumotlarni o'qiyman pyarrow.Table, va keyin ularni aylantiring DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Bularning barchasi, albatta, yaxshi, lekin sizda savollar bo'lishi mumkin. Bu qanchalik tez sodir bo'ladi? Bo'lak o'lchami pandalarning DataFrame-ni qidirish samaradorligiga qanday ta'sir qiladi?

Striming ishlashi

Oqimli boβ€˜lak hajmi kamaygani sari, pandalarda qoβ€˜shni ustunli DataFrame-ni rekonstruksiya qilish xarajatlari keshga kirishning samarasiz shakllari tufayli oshadi. Bundan tashqari, C++ ma'lumotlar tuzilmalari va massivlari va ularning xotira buferlari bilan ishlashda biroz qo'shimcha xarajatlar mavjud.

1 MB uchun, yuqoridagidek, mening noutbukimda (to'rt yadroli Xeon E3-1505M) shunday bo'ladi:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ma'lum bo'lishicha, 7.75 Gb DataFrameni 1 1024MB bo'lakdan tiklash uchun samarali o'tkazish qobiliyati 1 Gb/s. Kattaroq yoki kichikroq bo'laklardan foydalansak nima bo'ladi? Bu natijalar:

Apache Arrow yordamida ustunli ma'lumotlarni uzatish

Ishlash 256K dan 64K bo'laklarga sezilarli darajada pasayadi. 1 MB bo'laklar 16 MB bo'laklarga qaraganda tezroq qayta ishlanganiga hayron bo'ldim. Bu normal taqsimotmi yoki boshqa narsa bormi, chuqurroq o'rganish va tushunishga arziydi.

Formatni joriy qilishda ma'lumotlar printsipial jihatdan siqilmaydi, shuning uchun xotiradagi va "simlarda" hajmi taxminan bir xil. Kelajakda siqish qo'shimcha variantga aylanishi mumkin.

Xulosa

Ustunli ma'lumotlarni oqimlash katta ma'lumotlar to'plamini kichik bo'laklardagi pandalar kabi ustunli tahlil vositalariga kiritishning samarali usuli bo'lishi mumkin. Qatorga yo'naltirilgan saqlashdan foydalanadigan ma'lumotlar xizmatlari protsessoringizning L2 va L3 keshlari uchun qulayroq bo'lgan kichik ma'lumotlar qismlarini uzatishi va o'zgartirishi mumkin.

To'liq kod

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Manba: www.habr.com

a Izoh qo'shish