Maqolaning tarjimasi kurs talabalari uchun maxsus tayyorlangan
So'nggi bir necha hafta ichida bizda bor
Ustun ma'lumotlarini oqimlash
Arrow foydalanuvchilaridan men oladigan keng tarqalgan savol - bu katta jadval ma'lumotlar to'plamini satr yoki yozuvga yo'naltirilgan formatdan ustunga yo'naltirilgan formatga ko'chirishning yuqori narxi. Ko'p gigabaytli ma'lumotlar to'plamlari uchun xotirada yoki diskda ko'chirish juda qiyin vazifa bo'lishi mumkin.
Ma'lumotlarni uzatish uchun, manba ma'lumotlar satr yoki ustun bo'ladimi, bitta variant - har birida ustunli tartib mavjud bo'lgan kichik qatorlar to'plamini yuborish.
Apache Arrow-da jadval bo'lagini ifodalovchi xotiradagi ustunlar massivlari to'plami rekord to'plam deb ataladi. Mantiqiy jadvalning yagona ma'lumotlar strukturasini ifodalash uchun bir nechta yozuvlar to'plamini to'plash mumkin.
Mavjud "tasodifiy kirish" fayl formatida biz jadval sxemasini o'z ichiga olgan metama'lumotlarni va fayl oxiridagi bloklangan joylarni yozib olamiz, bu sizga ma'lumotlar to'plamidan istalgan yozuvlar to'plamini yoki istalgan ustunni juda arzonga tanlash imkonini beradi. Oqim formatida biz bir qator xabarlarni yuboramiz: kontur, keyin esa bir yoki bir nechta yozuvlar to'plami.
Turli formatlar quyidagicha ko'rinadi:
PyArrow-da ma'lumotlarni uzatish: Ilova
Bu qanday ishlashini ko'rsatish uchun men bitta oqim bo'lagini ifodalovchi misol ma'lumotlar to'plamini yarataman:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Keling, deylik, har biri 1 MB bo'lgan qismlardan iborat bo'lgan 1 Gb ma'lumotni jami 1024 bo'lak uchun yozmoqchimiz. Boshlash uchun 1 ta ustunli birinchi 16 MB ma'lumotlar ramkasini yaratamiz:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Keyin ularni aylantiraman pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Endi men RAMga yozadigan va yaratadigan chiqish oqimini yarataman StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Keyin biz 1024 ta bo'lakni yozamiz, bu oxir-oqibat 1 Gb ma'lumotlar to'plamini tashkil qiladi:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
Biz RAMga yozganimiz sababli, biz butun oqimni bitta buferda olishimiz mumkin:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Ushbu ma'lumotlar xotirada bo'lganligi sababli, Arrow yozuvlarining to'plamlarini o'qish nol nusxadagi operatsiya hisoblanadi. Men StreamReader-ni ochaman, ma'lumotlarni o'qiyman pyarrow.Table
, va keyin ularni aylantiring DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Bularning barchasi, albatta, yaxshi, lekin sizda savollar bo'lishi mumkin. Bu qanchalik tez sodir bo'ladi? Bo'lak o'lchami pandalarning DataFrame-ni qidirish samaradorligiga qanday ta'sir qiladi?
Striming ishlashi
Oqimli boβlak hajmi kamaygani sari, pandalarda qoβshni ustunli DataFrame-ni rekonstruksiya qilish xarajatlari keshga kirishning samarasiz shakllari tufayli oshadi. Bundan tashqari, C++ ma'lumotlar tuzilmalari va massivlari va ularning xotira buferlari bilan ishlashda biroz qo'shimcha xarajatlar mavjud.
1 MB uchun, yuqoridagidek, mening noutbukimda (to'rt yadroli Xeon E3-1505M) shunday bo'ladi:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
Ma'lum bo'lishicha, 7.75 Gb DataFrameni 1 1024MB bo'lakdan tiklash uchun samarali o'tkazish qobiliyati 1 Gb/s. Kattaroq yoki kichikroq bo'laklardan foydalansak nima bo'ladi? Bu natijalar:
Ishlash 256K dan 64K bo'laklarga sezilarli darajada pasayadi. 1 MB bo'laklar 16 MB bo'laklarga qaraganda tezroq qayta ishlanganiga hayron bo'ldim. Bu normal taqsimotmi yoki boshqa narsa bormi, chuqurroq o'rganish va tushunishga arziydi.
Formatni joriy qilishda ma'lumotlar printsipial jihatdan siqilmaydi, shuning uchun xotiradagi va "simlarda" hajmi taxminan bir xil. Kelajakda siqish qo'shimcha variantga aylanishi mumkin.
Xulosa
Ustunli ma'lumotlarni oqimlash katta ma'lumotlar to'plamini kichik bo'laklardagi pandalar kabi ustunli tahlil vositalariga kiritishning samarali usuli bo'lishi mumkin. Qatorga yo'naltirilgan saqlashdan foydalanadigan ma'lumotlar xizmatlari protsessoringizning L2 va L3 keshlari uchun qulayroq bo'lgan kichik ma'lumotlar qismlarini uzatishi va o'zgartirishi mumkin.
To'liq kod
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Manba: www.habr.com