Truyền dữ liệu cột bằng Mũi tên Apache

Bản dịch của bài viết được chuẩn bị riêng cho các học viên của khóa học Kỹ sư dữ liệu.

Truyền dữ liệu cột bằng Mũi tên Apache

Trong vài tuần qua, chúng tôi Nông Lý thêm vào Mũi tên Apache định dạng luồng nhị phân, bổ sung cho định dạng tệp IPC/truy cập ngẫu nhiên hiện có. Chúng tôi có các triển khai Java và C++ và các ràng buộc Python. Trong bài viết này, tôi sẽ giải thích cách thức hoạt động của định dạng này và chỉ ra cách bạn có thể đạt được thông lượng dữ liệu rất cao cho DataFrame gấu trúc.

Truyền dữ liệu cột

Một câu hỏi phổ biến mà tôi nhận được từ những người dùng Arrow là chi phí cao để di chuyển các bộ dữ liệu dạng bảng lớn từ định dạng theo hàng hoặc theo bản ghi sang định dạng theo cột. Đối với bộ dữ liệu nhiều gigabyte, việc chuyển đổi trong bộ nhớ hoặc trên đĩa có thể quá sức.

Để truyền dữ liệu, cho dù dữ liệu nguồn là hàng hay cột, một tùy chọn là gửi các lô hàng nhỏ, mỗi lô chứa bố cục cột bên trong.

Trong Mũi tên Apache, một tập hợp các mảng cột trong bộ nhớ đại diện cho một đoạn bảng được gọi là lô bản ghi. Để biểu diễn một cấu trúc dữ liệu duy nhất của một bảng logic, bạn có thể thu thập một số bộ bản ghi.

Ở định dạng tệp "truy cập ngẫu nhiên" hiện có, chúng tôi viết siêu dữ liệu chứa lược đồ bảng và bố cục khối ở cuối tệp, cho phép bạn chọn bất kỳ lô bản ghi nào hoặc bất kỳ cột nào từ tập dữ liệu với giá rất rẻ. Ở định dạng phát trực tuyến, chúng tôi gửi một loạt thông báo: lược đồ và sau đó là một hoặc nhiều lô bản ghi.

Các định dạng khác nhau trông giống như hình ảnh này:

Truyền dữ liệu cột bằng Mũi tên Apache

Truyền dữ liệu trong PyArrow: ứng dụng

Để cho bạn thấy nó hoạt động như thế nào, tôi sẽ tạo một tập dữ liệu mẫu đại diện cho một đoạn luồng đơn:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Bây giờ, giả sử chúng ta muốn ghi 1 GB dữ liệu, bao gồm mỗi khối 1 MB, với tổng số 1024 khối. Trước tiên, hãy tạo khung dữ liệu 1 MB đầu tiên với 16 cột:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Sau đó, tôi chuyển đổi chúng thành pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Bây giờ tôi sẽ tạo một luồng đầu ra sẽ ghi vào RAM và tạo StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Sau đó, chúng tôi sẽ viết 1024 khối, cuối cùng sẽ tạo thành tập dữ liệu 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Vì chúng tôi đã viết bằng RAM, nên chúng tôi có thể lấy toàn bộ luồng trong một bộ đệm:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Vì dữ liệu này nằm trong bộ nhớ nên việc đọc các lô bản ghi Mũi tên có được bằng thao tác không sao chép. Tôi mở StreamReader, đọc dữ liệu vào pyarrow.Tablevà sau đó chuyển đổi chúng thành DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tất nhiên, tất cả điều này là tốt, nhưng bạn có thể có thắc mắc. Nó xảy ra nhanh như thế nào? Làm thế nào để kích thước chunk ảnh hưởng đến hiệu suất truy xuất DataFrame của gấu trúc?

Hiệu suất phát trực tuyến

Khi kích thước khối phát trực tuyến giảm, chi phí xây dựng lại DataFrame dạng cột liền kề trong gấu trúc tăng lên do các sơ đồ truy cập bộ đệm không hiệu quả. Ngoài ra còn có một số chi phí hoạt động với cấu trúc dữ liệu C++ và mảng và bộ đệm bộ nhớ của chúng.

Đối với 1 MB như trên, trên máy tính xách tay của tôi (Xeon lõi tứ E3-1505M), hóa ra:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Hóa ra thông lượng hiệu quả là 7.75 Gb / giây để khôi phục Khung dữ liệu 1 GB từ 1024 khối 1 MB. Điều gì xảy ra nếu chúng ta sử dụng các khối lớn hơn hoặc nhỏ hơn? Đây là kết quả bạn nhận được:

Truyền dữ liệu cột bằng Mũi tên Apache

Hiệu suất giảm đáng kể từ 256K xuống 64K khối. Tôi rất ngạc nhiên khi khối 1 MB được xử lý nhanh hơn khối 16 MB. Cần thực hiện một nghiên cứu kỹ lưỡng hơn và hiểu liệu đây có phải là phân phối bình thường hay có liên quan gì khác.

Trong cách triển khai định dạng hiện tại, về nguyên tắc, dữ liệu không được nén, do đó kích thước trong bộ nhớ và "trên dây" xấp xỉ nhau. Nén có thể trở thành một tùy chọn trong tương lai.

Tổng

Truyền dữ liệu cột có thể là một cách hiệu quả để chuyển các tập dữ liệu lớn sang các công cụ phân tích cột như gấu trúc theo các khối nhỏ. Các dịch vụ dữ liệu sử dụng lưu trữ theo hàng có thể truyền và hoán đổi các khối dữ liệu nhỏ thuận tiện hơn cho bộ nhớ đệm L2 và L3 của bộ xử lý.

Mã đầy đủ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Nguồn: www.habr.com

Thêm một lời nhận xét