สตรีมข้อมูลคอลัมน์ด้วย Apache Arrow

การแปลบทความจัดทำขึ้นเฉพาะสำหรับนักศึกษาของหลักสูตร วิศวกรข้อมูล.

สตรีมข้อมูลคอลัมน์ด้วย Apache Arrow

ในช่วงสองสามสัปดาห์ที่ผ่านมา เรา น้องลี่ เพิ่มไปยัง อาปาเช่ ลูกศร รูปแบบไบนารีสตรีม เสริมรูปแบบไฟล์การเข้าถึงโดยสุ่ม/IPC ที่มีอยู่แล้ว เรามีการใช้งาน Java และ C++ และการโยง Python ในบทความนี้ ฉันจะอธิบายวิธีการทำงานของรูปแบบและแสดงวิธีที่คุณจะได้รับปริมาณงานข้อมูลที่สูงมากสำหรับ DataFrame แพนด้า

ข้อมูลคอลัมน์สตรีมมิ่ง

คำถามทั่วไปที่ฉันได้รับจากผู้ใช้ Arrow คือค่าใช้จ่ายสูงในการย้ายชุดข้อมูลแบบตารางขนาดใหญ่จากรูปแบบแถวหรือระเบียนไปยังรูปแบบคอลัมน์ สำหรับชุดข้อมูลหลายกิกะไบต์ การย้ายข้อมูลในหน่วยความจำหรือบนดิสก์อาจเป็นเรื่องยุ่งยาก

สำหรับการสตรีมข้อมูล ไม่ว่าข้อมูลต้นทางจะเป็นแถวหรือคอลัมน์ ทางเลือกหนึ่งคือการส่งแถวชุดเล็กๆ โดยแต่ละชุดจะมีเค้าโครงคอลัมน์อยู่ภายใน

ใน Apache Arrow คอลเลกชันของอาร์เรย์แบบคอลัมน์ในหน่วยความจำที่เป็นตัวแทนของตารางเรียกว่าชุดบันทึก หากต้องการแสดงโครงสร้างข้อมูลเดียวของตารางแบบลอจิคัล คุณสามารถรวบรวมชุดของเรกคอร์ดได้หลายชุด

ในรูปแบบไฟล์ "การเข้าถึงแบบสุ่ม" ที่มีอยู่ เราเขียนข้อมูลเมตาที่มีสคีมาของตารางและเค้าโครงบล็อกที่ส่วนท้ายของไฟล์ ซึ่งทำให้คุณสามารถเลือกกลุ่มของระเบียนหรือคอลัมน์ใดๆ จากชุดข้อมูลได้ในราคาถูกมาก ในรูปแบบการส่งกระแสข้อมูล เราส่งชุดของข้อความ: แบบแผน และเรกคอร์ดหนึ่งชุดขึ้นไป

รูปแบบต่างๆ มีลักษณะดังนี้:

สตรีมข้อมูลคอลัมน์ด้วย Apache Arrow

สตรีมข้อมูลใน PyArrow: แอปพลิเคชัน

เพื่อแสดงให้คุณเห็นว่ามันทำงานอย่างไร ฉันจะสร้างชุดข้อมูลตัวอย่างที่แสดงสตรีมอันเดียว:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

ตอนนี้ สมมติว่าเราต้องการเขียนข้อมูล 1 GB ประกอบด้วยชิ้นละ 1 MB รวมเป็น 1024 ชิ้น ก่อนอื่นมาสร้าง data frame ขนาด 1MB แรกที่มี 16 คอลัมน์:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

จากนั้นฉันจะแปลงให้เป็น pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

ตอนนี้ฉันจะสร้างเอาต์พุตสตรีมที่จะเขียนไปยัง RAM และสร้าง StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

จากนั้นเราจะเขียน 1024 ชิ้น ซึ่งในที่สุดจะประกอบกันเป็นชุดข้อมูล 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

เนื่องจากเราเขียนใน RAM เราจึงได้รับสตรีมทั้งหมดในบัฟเฟอร์เดียว:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

เนื่องจากข้อมูลนี้อยู่ในหน่วยความจำ การอ่านชุดบันทึก Arrow จึงมีการดำเนินการคัดลอกเป็นศูนย์ ฉันเปิด StreamReader อ่านข้อมูลเข้าไป pyarrow.Tableแล้วแปลงให้เป็น DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

แน่นอนว่าทั้งหมดนี้เป็นสิ่งที่ดี แต่คุณอาจมีคำถาม มันเกิดขึ้นเร็วแค่ไหน? ขนาดก้อนส่งผลต่อประสิทธิภาพการดึงข้อมูลของ Pandas DataFrame อย่างไร

ประสิทธิภาพการสตรีม

เมื่อขนาดสตรีมมิงลดลง ค่าใช้จ่ายในการสร้าง DataFrame แบบคอลัมน์ที่ต่อเนื่องกันในแพนด้าจะเพิ่มขึ้นเนื่องจากรูปแบบการเข้าถึงแคชที่ไม่มีประสิทธิภาพ นอกจากนี้ยังมีค่าใช้จ่ายบางส่วนจากการทำงานกับโครงสร้างข้อมูล C++ และอาร์เรย์และบัฟเฟอร์หน่วยความจำ

สำหรับ 1 MB ข้างต้นบนแล็ปท็อปของฉัน (Quad-core Xeon E3-1505M) ปรากฎว่า:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

ปรากฎว่าปริมาณงานที่มีประสิทธิภาพคือ 7.75 Gb / s สำหรับการกู้คืน DataFrame 1 GB จาก 1024 ชิ้น 1 MB จะเกิดอะไรขึ้นถ้าเราใช้ชิ้นที่ใหญ่ขึ้นหรือเล็กลง นี่คือผลลัพธ์ที่คุณได้รับ:

สตรีมข้อมูลคอลัมน์ด้วย Apache Arrow

ประสิทธิภาพลดลงอย่างมากจาก 256K เป็น 64K ชิ้น ฉันรู้สึกประหลาดใจที่ชิ้นขนาด 1MB ได้รับการประมวลผลเร็วกว่าชิ้นขนาด 16MB ควรทำการศึกษาอย่างละเอียดมากขึ้นและทำความเข้าใจว่านี่เป็นการแจกแจงแบบปกติหรืออย่างอื่นที่เกี่ยวข้อง

ในการใช้งานรูปแบบปัจจุบัน ข้อมูลไม่ได้ถูกบีบอัดโดยหลักการ ดังนั้นขนาดในหน่วยความจำและ "บนสาย" จึงใกล้เคียงกัน การบีบอัดอาจกลายเป็นตัวเลือกในอนาคต

ทั้งหมด

การสตรีมข้อมูลคอลัมน์อาจเป็นวิธีที่มีประสิทธิภาพในการถ่ายโอนชุดข้อมูลขนาดใหญ่ไปยังเครื่องมือวิเคราะห์คอลัมน์ เช่น pandas ในก้อนเล็กๆ บริการข้อมูลที่ใช้พื้นที่จัดเก็บแบบเรียงตามแถวสามารถถ่ายโอนและย้ายข้อมูลกลุ่มเล็กๆ ซึ่งสะดวกกว่าสำหรับแคช L2 และ L3 ของโปรเซสเซอร์ของคุณ

รหัสเต็ม

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

ที่มา: will.com

เพิ่มความคิดเห็น