Ροή δεδομένων στηλών με το Apache Arrow

Η μετάφραση του άρθρου ετοιμάστηκε ειδικά για τους φοιτητές του μαθήματος Μηχανικός Δεδομένων.

Ροή δεδομένων στηλών με το Apache Arrow

Τις τελευταίες εβδομάδες, εμείς Νονγκ Λι προστέθηκε Βέλος Apache μορφή δυαδικής ροής, που συμπληρώνει την ήδη υπάρχουσα μορφή αρχείου τυχαίας πρόσβασης/IPC. Έχουμε υλοποιήσεις Java και C++ και δεσμεύσεις Python. Σε αυτό το άρθρο, θα εξηγήσω πώς λειτουργεί η μορφή και θα δείξω πώς μπορείτε να επιτύχετε πολύ υψηλή απόδοση δεδομένων για ένα pandas DataFrame.

Ροή δεδομένων στήλης

Μια κοινή ερώτηση που λαμβάνω από τους χρήστες του Arrow είναι το υψηλό κόστος της μετεγκατάστασης μεγάλων συνόλων δεδομένων σε πίνακα από μια μορφή προσανατολισμένη σε γραμμές ή εγγραφές σε μια μορφή στήλης. Για σύνολα δεδομένων πολλών gigabyte, η μεταφορά στη μνήμη ή στο δίσκο μπορεί να είναι συντριπτική.

Για τα δεδομένα ροής, είτε τα δεδομένα προέλευσης είναι γραμμή είτε στήλη, μια επιλογή είναι να στείλετε μικρές παρτίδες σειρών, καθεμία από τις οποίες περιέχει μια διάταξη στήλης εσωτερικά.

Στο Apache Arrow, μια συλλογή από στηλώδεις πίνακες στη μνήμη που αντιπροσωπεύουν ένα κομμάτι πίνακα ονομάζεται παρτίδα εγγραφών. Για να αναπαραστήσετε μια ενιαία δομή δεδομένων ενός λογικού πίνακα, μπορείτε να συλλέξετε πολλά πακέτα εγγραφών.

Στην υπάρχουσα μορφή αρχείου "τυχαίας πρόσβασης", γράφουμε μεταδεδομένα που περιέχουν το σχήμα πίνακα και τη διάταξη μπλοκ στο τέλος του αρχείου, που σας επιτρέπει να επιλέξετε οποιαδήποτε παρτίδα εγγραφών ή οποιαδήποτε στήλη από το σύνολο δεδομένων πολύ φθηνά. Σε μορφή ροής, στέλνουμε μια σειρά από μηνύματα: ένα σχήμα και, στη συνέχεια, μία ή περισσότερες παρτίδες εγγραφών.

Οι διαφορετικές μορφές μοιάζουν με αυτήν την εικόνα:

Ροή δεδομένων στηλών με το Apache Arrow

Ροή δεδομένων στο PyArrow: εφαρμογή

Για να σας δείξω πώς λειτουργεί, θα δημιουργήσω ένα παράδειγμα συνόλου δεδομένων που αντιπροσωπεύει ένα μεμονωμένο κομμάτι ροής:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Τώρα, ας υποθέσουμε ότι θέλουμε να γράψουμε 1 GB δεδομένων, που αποτελούνται από κομμάτια 1 MB το καθένα, για συνολικά 1024 κομμάτια. Αρχικά, ας δημιουργήσουμε το πρώτο πλαίσιο δεδομένων 1 MB με 16 στήλες:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Μετά τα μετατρέπω σε pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Τώρα θα δημιουργήσω μια ροή εξόδου που θα γράφει στη μνήμη RAM και θα δημιουργεί StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Στη συνέχεια θα γράψουμε 1024 κομμάτια, τα οποία τελικά θα αποτελέσουν ένα σύνολο δεδομένων 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Εφόσον γράψαμε στη μνήμη RAM, μπορούμε να λάβουμε ολόκληρη τη ροή σε ένα buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Επειδή αυτά τα δεδομένα βρίσκονται στη μνήμη, η ανάγνωση παρτίδων εγγραφών Arrow λαμβάνεται με μια λειτουργία μηδενικής αντιγραφής. Ανοίγω το StreamReader, διαβάζω δεδομένα σε pyarrow.Tableκαι στη συνέχεια να τα μετατρέψετε σε DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Όλα αυτά, φυσικά, είναι καλά, αλλά μπορεί να έχετε ερωτήσεις. Πόσο γρήγορα συμβαίνει; Πώς επηρεάζει το μέγεθος του κομματιού την απόδοση ανάκτησης του DataFrame των panda;

Απόδοση ροής

Καθώς το μέγεθος του κομματιού ροής μειώνεται, το κόστος ανακατασκευής ενός συνεχόμενου στηλών DataFrame στα panda αυξάνεται λόγω αναποτελεσματικών σχημάτων πρόσβασης στην κρυφή μνήμη. Υπάρχει επίσης κάποια επιβάρυνση από την εργασία με δομές και πίνακες δεδομένων C++ και με τις προσωρινές αποθήκες μνήμης τους.

Για 1 MB όπως παραπάνω, στον φορητό υπολογιστή μου (Quad-core Xeon E3-1505M) αποδεικνύεται:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Αποδεικνύεται ότι η αποτελεσματική απόδοση είναι 7.75 Gb / s για την επαναφορά ενός DataFrame 1 GB από 1024 κομμάτια του 1 MB. Τι θα συμβεί αν χρησιμοποιήσουμε μεγαλύτερα ή μικρότερα κομμάτια; Εδώ είναι τα αποτελέσματα που λαμβάνετε:

Ροή δεδομένων στηλών με το Apache Arrow

Η απόδοση μειώνεται σημαντικά από 256K σε 64K κομμάτια. Με εξέπληξε το γεγονός ότι τα κομμάτια του 1MB επεξεργάζονταν πιο γρήγορα από τα κομμάτια των 16MB. Αξίζει να κάνετε μια πιο ενδελεχή μελέτη και να καταλάβετε αν πρόκειται για κανονική κατανομή ή για κάτι άλλο.

Στην τρέχουσα εφαρμογή της μορφής, τα δεδομένα δεν συμπιέζονται κατ 'αρχήν, επομένως το μέγεθος στη μνήμη και "στο καλώδιο" είναι περίπου το ίδιο. Η συμπίεση μπορεί να γίνει μια επιλογή στο μέλλον.

Σύνολο

Η ροή δεδομένων στηλών μπορεί να είναι ένας αποτελεσματικός τρόπος μεταφοράς μεγάλων συνόλων δεδομένων σε εργαλεία ανάλυσης στηλών, όπως τα πάντα σε μικρά κομμάτια. Οι υπηρεσίες δεδομένων που χρησιμοποιούν αποθηκευτικό χώρο προσανατολισμένο στη σειρά μπορούν να μεταφέρουν και να μεταφέρουν μικρά κομμάτια δεδομένων που είναι πιο βολικά για τη μνήμη cache L2 και L3 του επεξεργαστή σας.

Πλήρης κωδικός

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο