Apache Arrow ilə sütun məlumatlarının ötürülməsi

Məqalənin tərcüməsi kursun tələbələri üçün xüsusi hazırlanmışdır Məlumat Mühəndisi.

Apache Arrow ilə sütun məlumatlarının ötürülməsi

Son bir neçə həftə ərzində bizdə var Nonq Li əlavə etdi Apache Ox mövcud təsadüfi giriş/IPC fayl formatını tamamlayan ikili axın formatı. Bizdə Java və C++ tətbiqləri və Python bağlamaları var. Bu yazıda mən formatın necə işlədiyini izah edəcəyəm və pandas DataFrame üçün çox yüksək məlumat ötürmə qabiliyyətinə necə nail ola biləcəyinizi göstərəcəyəm.

Axın Sütun Datası

Arrow istifadəçilərindən aldığım ümumi sual, böyük cədvəl məlumat dəstlərinin sətir və ya qeyd yönümlü formatdan sütun yönümlü formata köçürülməsinin yüksək qiymətidir. Çox gigabaytlıq verilənlər dəstləri üçün yaddaşa və ya diskə köçürmək çox çətin bir iş ola bilər.

Mənbə verilənlərin cərgə və ya sütun olmasından asılı olmayaraq, verilənlərin ötürülməsi üçün seçimlərdən biri hər birində sütunlu düzülmə olan kiçik sətir partiyalarını göndərməkdir.

Apache Arrow-da cədvəl yığınını təmsil edən yaddaşdaxili sütun massivlərinin toplusu rekord toplu adlanır. Məntiqi cədvəlin vahid məlumat strukturunu təmsil etmək üçün bir neçə qeyd toplusu toplana bilər.

Mövcud "təsadüfi giriş" fayl formatında biz cədvəl sxemini və faylın sonunda blok yerlərini ehtiva edən metaməlumatları qeyd edirik ki, bu da sizə məlumat dəstindən istənilən qeydlər toplusunu və ya hər hansı sütunu olduqca ucuz şəkildə seçməyə imkan verir. Axın formatında biz bir sıra mesajlar göndəririk: kontur, sonra isə bir və ya daha çox qeydlər toplusu.

Fərqli formatlar belə görünür:

Apache Arrow ilə sütun məlumatlarının ötürülməsi

PyArrow-da məlumat axını: Tətbiq

Bunun necə işlədiyini sizə göstərmək üçün mən tək axın yığınını təmsil edən nümunə verilənlər toplusu yaradacağam:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

İndi deyək ki, biz hər biri 1 MB-lıq hissələrdən ibarət 1 GB məlumat yazmaq istəyirik, cəmi 1024 parça. Başlamaq üçün gəlin 1 sütunlu ilk 16 MB məlumat çərçivəsini yaradaq:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Sonra onları çevirirəm pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

İndi mən RAM-a yazıb yaradacaq bir çıxış axını yaradacağam StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Sonra 1024 parça yazacağıq ki, bu da nəticədə 1 GB məlumat dəstinə bərabər olacaq:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

RAM-a yazdığımız üçün bütün axını bir buferdə əldə edə bilərik:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Bu məlumatlar yaddaşda olduğundan, Arrow qeydlərinin partiyalarını oxumaq sıfır nüsxə əməliyyatıdır. StreamReader-i açıram, məlumatları oxuyuram pyarrow.Table, və sonra onları çevirin DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Bütün bunlar, əlbəttə ki, yaxşıdır, amma suallarınız ola bilər. Bu nə qədər tez baş verir? Parça ölçüsü pandaların DataFrame axtarışının performansına necə təsir edir?

Axın Performansı

Axın yığınının ölçüsü azaldıqca, pandalarda bitişik sütunlu DataFrame-in yenidən qurulması dəyəri səmərəsiz keş giriş nümunələri səbəbindən artır. C++ məlumat strukturları və massivləri və onların yaddaş buferləri ilə işləmək üçün bəzi əlavə xərclər də var.

1 MB üçün, yuxarıdakı kimi, laptopumda (Dörd nüvəli Xeon E3-1505M) belə çıxır:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Məlum oldu ki, 7.75 1MB hissədən 1024GB DataFrame-i bərpa etmək üçün effektiv ötürmə qabiliyyəti 1 GB/s təşkil edir. Daha böyük və ya daha kiçik parçalardan istifadə etsək nə olar? Nəticələr bunlardır:

Apache Arrow ilə sütun məlumatlarının ötürülməsi

Performans 256K-dan 64K parçaya qədər əhəmiyyətli dərəcədə azalır. 1 MB-lıq parçaların 16 MB-lıq hissələrdən daha sürətli işlənməsi məni təəccübləndirdi. Bunun normal bir paylama olub-olmadığını və ya oyunda başqa bir şeyin olub olmadığını daha ətraflı araşdırmağa və anlamaq lazımdır.

Formatın hazırkı tətbiqində məlumatlar prinsipcə sıxılmır, buna görə yaddaşdakı və "tellərdəki" ölçü təxminən eynidır. Gələcəkdə sıxılma əlavə bir seçim ola bilər.

Ümumi

Sütunlu məlumatların ötürülməsi böyük məlumat dəstlərini kiçik hissələrdə pandalar kimi sütunlu analitik alətlərə ötürməyin effektiv yolu ola bilər. Sətir yönümlü yaddaşdan istifadə edən məlumat xidmətləri prosessorunuzun L2 və L3 keşləri üçün daha əlverişli olan məlumatların kiçik hissələrini ötürə və köçürə bilər.

Tam kod

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Mənbə: www.habr.com

Добавить комментарий