Apache Arrow ile sütun verilerini akışa alma

Makalenin çevirisi ders öğrencilerine özel olarak hazırlanmıştır. Veri Mühendisi.

Apache Arrow ile sütun verilerini akışa alma

Geçtiğimiz birkaç hafta boyunca elimizde Nong Li ilave Apaçi Oku Mevcut rastgele erişim/IPC dosya formatını tamamlayan ikili akış formatı. Java ve C++ uygulamalarımız ve Python bağlamalarımız var. Bu makalede formatın nasıl çalıştığını açıklayacağım ve bir pandas DataFrame için nasıl çok yüksek veri çıkışı elde edebileceğinizi göstereceğim.

Sütun Verilerini Akışlandırma

Arrow kullanıcılarından sık karşılaştığım sorulardan biri, büyük tablo veri kümelerini satır veya kayıt odaklı formattan sütun odaklı formata taşımanın yüksek maliyetidir. Çoklu gigabaytlık veri kümeleri için bellekte veya diskte aktarım yapmak çok zor bir görev olabilir.

Kaynak veri ister satır ister sütun olsun, veri akışı sağlamak için seçeneklerden biri, her biri içinde sütunlu bir düzen içeren küçük satır grupları göndermektir.

Apache Arrow'da, bir tablo yığınını temsil eden bellek içi sütun dizilerinin koleksiyonuna kayıt kümesi adı verilir. Mantıksal bir tablonun tek bir veri yapısını temsil etmek için birkaç kayıt kümesi toplanabilir.

Mevcut "rastgele erişim" dosya formatında, tablo şemasını içeren meta verileri kaydediyoruz ve dosyanın sonundaki blok konumlarını kaydediyoruz, böylece herhangi bir kayıt kümesini veya bir veri kümesinden herhangi bir sütunu son derece ucuz bir şekilde seçmenize olanak sağlıyoruz. Akış formatında bir dizi mesaj göndeririz: bir taslak ve ardından bir veya daha fazla kayıt grubu.

Farklı formatlar şuna benzer:

Apache Arrow ile sütun verilerini akışa alma

PyArrow'da Veri Akışı: Uygulama

Bunun nasıl çalıştığını size göstermek için tek bir akış parçasını temsil eden örnek bir veri kümesi oluşturacağım:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Şimdi diyelim ki her biri 1 MB’lık parçalardan oluşan 1 GB’lık veriyi toplam 1024 parçaya yazmak istiyoruz. Başlamak için 1 sütunlu ilk 16 MB veri çerçevesini oluşturalım:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Sonra onları dönüştürüyorum pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Şimdi RAM'e yazacak ve oluşturacak bir çıktı akışı oluşturacağım. StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Daha sonra 1024 parça yazacağız ve bu da sonuçta 1 GB'lık bir veri kümesine tekabül edecek:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

RAM'e yazdığımız için akışın tamamını tek bir arabellekte alabiliriz:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Bu veriler bellekte olduğundan, Arrow kayıtlarının toplu olarak okunması sıfır kopyalama işlemidir. StreamReader'ı açıyorum, içindeki verileri okuyorum pyarrow.Tableve ardından bunları şuna dönüştürün: DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Bütün bunlar elbette güzel ama sorularınız olabilir. Bu ne kadar çabuk oluyor? Parça boyutu pandaların DataFrame alma performansını nasıl etkiler?

Akış Performansı

Akış öbek boyutu azaldıkça, verimsiz önbellek erişim düzenleri nedeniyle pandalarda bitişik bir sütunlu DataFrame'i yeniden oluşturmanın maliyeti artar. Ayrıca C++ veri yapıları, dizileri ve bunların bellek arabellekleriyle çalışmaktan kaynaklanan bir miktar ek yük vardır.

Yukarıdaki gibi 1 MB için dizüstü bilgisayarımda (Dört çekirdekli Xeon E3-1505M) şu çıkıyor:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75 GB'lik bir DataFrame'i 1 adet 1024 MB'lık parçadan geri yüklemek için etkili verimin 1 GB/sn olduğu ortaya çıktı. Daha büyük veya daha küçük parçalar kullanırsak ne olur? Sonuçlar şunlardır:

Apache Arrow ile sütun verilerini akışa alma

Performans 256K'dan 64K'ya önemli ölçüde düşüyor. 1 MB'lık parçaların 16 MB'lık parçalardan daha hızlı işlenmesine şaşırdım. Daha kapsamlı bir çalışma yürütüp bunun normal bir dağılım mı olduğunu yoksa başka bir şeyin mi işin içinde olduğunu anlamakta fayda var.

Formatın mevcut uygulamasında, veriler prensip olarak sıkıştırılmaz, dolayısıyla bellekteki ve "kablolardaki" boyut yaklaşık olarak aynıdır. Gelecekte sıkıştırma ek bir seçenek haline gelebilir.

sonuç

Sütunlu verilerin akışı, büyük veri kümelerini küçük parçalar halinde pandalar gibi sütunlu analiz araçlarına beslemenin etkili bir yolu olabilir. Satır odaklı depolamayı kullanan veri hizmetleri, işlemcinizin L2 ve L3 önbellekleri için daha uygun olan küçük veri parçalarını aktarabilir ve aktarabilir.

Tam kod

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Kaynak: habr.com

Yorum ekle