Makalenin çevirisi ders öğrencilerine özel olarak hazırlanmıştır.
Geçtiğimiz birkaç hafta boyunca elimizde
Sütun Verilerini Akışlandırma
Arrow kullanıcılarından sık karşılaştığım sorulardan biri, büyük tablo veri kümelerini satır veya kayıt odaklı formattan sütun odaklı formata taşımanın yüksek maliyetidir. Çoklu gigabaytlık veri kümeleri için bellekte veya diskte aktarım yapmak çok zor bir görev olabilir.
Kaynak veri ister satır ister sütun olsun, veri akışı sağlamak için seçeneklerden biri, her biri içinde sütunlu bir düzen içeren küçük satır grupları göndermektir.
Apache Arrow'da, bir tablo yığınını temsil eden bellek içi sütun dizilerinin koleksiyonuna kayıt kümesi adı verilir. Mantıksal bir tablonun tek bir veri yapısını temsil etmek için birkaç kayıt kümesi toplanabilir.
Mevcut "rastgele erişim" dosya formatında, tablo şemasını içeren meta verileri kaydediyoruz ve dosyanın sonundaki blok konumlarını kaydediyoruz, böylece herhangi bir kayıt kümesini veya bir veri kümesinden herhangi bir sütunu son derece ucuz bir şekilde seçmenize olanak sağlıyoruz. Akış formatında bir dizi mesaj göndeririz: bir taslak ve ardından bir veya daha fazla kayıt grubu.
Farklı formatlar şuna benzer:
PyArrow'da Veri Akışı: Uygulama
Bunun nasıl çalıştığını size göstermek için tek bir akış parçasını temsil eden örnek bir veri kümesi oluşturacağım:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
Şimdi diyelim ki her biri 1 MB’lık parçalardan oluşan 1 GB’lık veriyi toplam 1024 parçaya yazmak istiyoruz. Başlamak için 1 sütunlu ilk 16 MB veri çerçevesini oluşturalım:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
Sonra onları dönüştürüyorum pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
Şimdi RAM'e yazacak ve oluşturacak bir çıktı akışı oluşturacağım. StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
Daha sonra 1024 parça yazacağız ve bu da sonuçta 1 GB'lık bir veri kümesine tekabül edecek:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
RAM'e yazdığımız için akışın tamamını tek bir arabellekte alabiliriz:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
Bu veriler bellekte olduğundan, Arrow kayıtlarının toplu olarak okunması sıfır kopyalama işlemidir. StreamReader'ı açıyorum, içindeki verileri okuyorum pyarrow.Table
ve ardından bunları şuna dönüştürün: DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
Bütün bunlar elbette güzel ama sorularınız olabilir. Bu ne kadar çabuk oluyor? Parça boyutu pandaların DataFrame alma performansını nasıl etkiler?
Akış Performansı
Akış öbek boyutu azaldıkça, verimsiz önbellek erişim düzenleri nedeniyle pandalarda bitişik bir sütunlu DataFrame'i yeniden oluşturmanın maliyeti artar. Ayrıca C++ veri yapıları, dizileri ve bunların bellek arabellekleriyle çalışmaktan kaynaklanan bir miktar ek yük vardır.
Yukarıdaki gibi 1 MB için dizüstü bilgisayarımda (Dört çekirdekli Xeon E3-1505M) şu çıkıyor:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
7.75 GB'lik bir DataFrame'i 1 adet 1024 MB'lık parçadan geri yüklemek için etkili verimin 1 GB/sn olduğu ortaya çıktı. Daha büyük veya daha küçük parçalar kullanırsak ne olur? Sonuçlar şunlardır:
Performans 256K'dan 64K'ya önemli ölçüde düşüyor. 1 MB'lık parçaların 16 MB'lık parçalardan daha hızlı işlenmesine şaşırdım. Daha kapsamlı bir çalışma yürütüp bunun normal bir dağılım mı olduğunu yoksa başka bir şeyin mi işin içinde olduğunu anlamakta fayda var.
Formatın mevcut uygulamasında, veriler prensip olarak sıkıştırılmaz, dolayısıyla bellekteki ve "kablolardaki" boyut yaklaşık olarak aynıdır. Gelecekte sıkıştırma ek bir seçenek haline gelebilir.
sonuç
Sütunlu verilerin akışı, büyük veri kümelerini küçük parçalar halinde pandalar gibi sütunlu analiz araçlarına beslemenin etkili bir yolu olabilir. Satır odaklı depolamayı kullanan veri hizmetleri, işlemcinizin L2 ve L3 önbellekleri için daha uygun olan küçük veri parçalarını aktarabilir ve aktarabilir.
Tam kod
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
Kaynak: habr.com