การแปลบทความจัดทำขึ้นเฉพาะสำหรับนักศึกษาของหลักสูตร
ในช่วงสองสามสัปดาห์ที่ผ่านมา เรา
ข้อมูลคอลัมน์สตรีมมิ่ง
คำถามทั่วไปที่ฉันได้รับจากผู้ใช้ Arrow คือค่าใช้จ่ายสูงในการย้ายชุดข้อมูลแบบตารางขนาดใหญ่จากรูปแบบแถวหรือระเบียนไปยังรูปแบบคอลัมน์ สำหรับชุดข้อมูลหลายกิกะไบต์ การย้ายข้อมูลในหน่วยความจำหรือบนดิสก์อาจเป็นเรื่องยุ่งยาก
สำหรับการสตรีมข้อมูล ไม่ว่าข้อมูลต้นทางจะเป็นแถวหรือคอลัมน์ ทางเลือกหนึ่งคือการส่งแถวชุดเล็กๆ โดยแต่ละชุดจะมีเค้าโครงคอลัมน์อยู่ภายใน
ใน Apache Arrow คอลเลกชันของอาร์เรย์แบบคอลัมน์ในหน่วยความจำที่เป็นตัวแทนของตารางเรียกว่าชุดบันทึก หากต้องการแสดงโครงสร้างข้อมูลเดียวของตารางแบบลอจิคัล คุณสามารถรวบรวมชุดของเรกคอร์ดได้หลายชุด
ในรูปแบบไฟล์ "การเข้าถึงแบบสุ่ม" ที่มีอยู่ เราเขียนข้อมูลเมตาที่มีสคีมาของตารางและเค้าโครงบล็อกที่ส่วนท้ายของไฟล์ ซึ่งทำให้คุณสามารถเลือกกลุ่มของระเบียนหรือคอลัมน์ใดๆ จากชุดข้อมูลได้ในราคาถูกมาก ในรูปแบบการส่งกระแสข้อมูล เราส่งชุดของข้อความ: แบบแผน และเรกคอร์ดหนึ่งชุดขึ้นไป
รูปแบบต่างๆ มีลักษณะดังนี้:
สตรีมข้อมูลใน PyArrow: แอปพลิเคชัน
เพื่อแสดงให้คุณเห็นว่ามันทำงานอย่างไร ฉันจะสร้างชุดข้อมูลตัวอย่างที่แสดงสตรีมอันเดียว:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
ตอนนี้ สมมติว่าเราต้องการเขียนข้อมูล 1 GB ประกอบด้วยชิ้นละ 1 MB รวมเป็น 1024 ชิ้น ก่อนอื่นมาสร้าง data frame ขนาด 1MB แรกที่มี 16 คอลัมน์:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
จากนั้นฉันจะแปลงให้เป็น pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
ตอนนี้ฉันจะสร้างเอาต์พุตสตรีมที่จะเขียนไปยัง RAM และสร้าง StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
จากนั้นเราจะเขียน 1024 ชิ้น ซึ่งในที่สุดจะประกอบกันเป็นชุดข้อมูล 1GB:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
เนื่องจากเราเขียนใน RAM เราจึงได้รับสตรีมทั้งหมดในบัฟเฟอร์เดียว:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
เนื่องจากข้อมูลนี้อยู่ในหน่วยความจำ การอ่านชุดบันทึก Arrow จึงมีการดำเนินการคัดลอกเป็นศูนย์ ฉันเปิด StreamReader อ่านข้อมูลเข้าไป pyarrow.Table
แล้วแปลงให้เป็น DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
แน่นอนว่าทั้งหมดนี้เป็นสิ่งที่ดี แต่คุณอาจมีคำถาม มันเกิดขึ้นเร็วแค่ไหน? ขนาดก้อนส่งผลต่อประสิทธิภาพการดึงข้อมูลของ Pandas DataFrame อย่างไร
ประสิทธิภาพการสตรีม
เมื่อขนาดสตรีมมิงลดลง ค่าใช้จ่ายในการสร้าง DataFrame แบบคอลัมน์ที่ต่อเนื่องกันในแพนด้าจะเพิ่มขึ้นเนื่องจากรูปแบบการเข้าถึงแคชที่ไม่มีประสิทธิภาพ นอกจากนี้ยังมีค่าใช้จ่ายบางส่วนจากการทำงานกับโครงสร้างข้อมูล C++ และอาร์เรย์และบัฟเฟอร์หน่วยความจำ
สำหรับ 1 MB ข้างต้นบนแล็ปท็อปของฉัน (Quad-core Xeon E3-1505M) ปรากฎว่า:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
ปรากฎว่าปริมาณงานที่มีประสิทธิภาพคือ 7.75 Gb / s สำหรับการกู้คืน DataFrame 1 GB จาก 1024 ชิ้น 1 MB จะเกิดอะไรขึ้นถ้าเราใช้ชิ้นที่ใหญ่ขึ้นหรือเล็กลง นี่คือผลลัพธ์ที่คุณได้รับ:
ประสิทธิภาพลดลงอย่างมากจาก 256K เป็น 64K ชิ้น ฉันรู้สึกประหลาดใจที่ชิ้นขนาด 1MB ได้รับการประมวลผลเร็วกว่าชิ้นขนาด 16MB ควรทำการศึกษาอย่างละเอียดมากขึ้นและทำความเข้าใจว่านี่เป็นการแจกแจงแบบปกติหรืออย่างอื่นที่เกี่ยวข้อง
ในการใช้งานรูปแบบปัจจุบัน ข้อมูลไม่ได้ถูกบีบอัดโดยหลักการ ดังนั้นขนาดในหน่วยความจำและ "บนสาย" จึงใกล้เคียงกัน การบีบอัดอาจกลายเป็นตัวเลือกในอนาคต
ทั้งหมด
การสตรีมข้อมูลคอลัมน์อาจเป็นวิธีที่มีประสิทธิภาพในการถ่ายโอนชุดข้อมูลขนาดใหญ่ไปยังเครื่องมือวิเคราะห์คอลัมน์ เช่น pandas ในก้อนเล็กๆ บริการข้อมูลที่ใช้พื้นที่จัดเก็บแบบเรียงตามแถวสามารถถ่ายโอนและย้ายข้อมูลกลุ่มเล็กๆ ซึ่งสะดวกกว่าสำหรับแคช L2 และ L3 ของโปรเซสเซอร์ของคุณ
รหัสเต็ม
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
ที่มา: will.com