Diffuser des données de colonne avec Apache Arrow

La traduction de l'article a été préparée spécifiquement pour les étudiants du cours Ingénieur de données.

Diffuser des données de colonne avec Apache Arrow

Au cours des dernières semaines, nous Nong Li ajouté à Flèche Apache format de flux binaire, complétant le format de fichier à accès aléatoire/IPC déjà existant. Nous avons des implémentations Java et C++ et des liaisons Python. Dans cet article, j'expliquerai le fonctionnement du format et montrerai comment vous pouvez atteindre un débit de données très élevé pour un pandas DataFrame.

Flux de données de colonne

Une question courante que je reçois des utilisateurs d'Arrow est le coût élevé de la migration de grands ensembles de données tabulaires d'un format orienté ligne ou enregistrement vers un format colonne. Pour les ensembles de données de plusieurs gigaoctets, la transposition en mémoire ou sur disque peut être écrasante.

Pour les flux de données, que les données source soient des lignes ou des colonnes, une option consiste à envoyer de petits lots de lignes, chacune contenant une disposition de colonne en interne.

Dans Apache Arrow, une collection de tableaux en colonnes en mémoire représentant un bloc de table est appelée un lot d'enregistrements. Pour représenter une seule structure de données d'une table logique, vous pouvez collecter plusieurs packages d'enregistrements.

Dans le format de fichier "à accès aléatoire" existant, nous écrivons des métadonnées contenant le schéma de la table et la disposition des blocs à la fin du fichier, ce qui vous permet de sélectionner n'importe quel lot d'enregistrements ou n'importe quelle colonne de l'ensemble de données à très bon marché. Dans un format de flux, nous envoyons une série de messages : un schéma, puis un ou plusieurs lots d'enregistrements.

Les différents formats ressemblent à cette image :

Diffuser des données de colonne avec Apache Arrow

Flux de données dans PyArrow : application

Pour vous montrer comment cela fonctionne, je vais créer un exemple d'ensemble de données représentant un seul bloc de flux :

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Supposons maintenant que nous voulions écrire 1 Go de données, composées de morceaux de 1 Mo chacun, pour un total de 1024 morceaux. Commençons par créer le premier bloc de données de 1 Mo avec 16 colonnes :

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Puis je les convertis en pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Maintenant, je vais créer un flux de sortie qui écrira dans la RAM et créera StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Ensuite, nous écrirons 1024 morceaux, qui finiront par constituer un ensemble de données de 1 Go :

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Puisque nous avons écrit en RAM, nous pouvons obtenir le flux entier dans un seul tampon :

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Étant donné que ces données sont en mémoire, la lecture des lots d'enregistrements Arrow est obtenue par une opération de zéro copie. J'ouvre StreamReader, je lis des données dans pyarrow.Tablepuis les convertir en DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tout cela, bien sûr, est bon, mais vous pouvez avoir des questions. À quelle vitesse cela se produit-il ? Comment la taille des fragments affecte-t-elle les performances de récupération de pandas DataFrame ?

Performances de diffusion

À mesure que la taille du segment de streaming diminue, le coût de reconstruction d'un DataFrame en colonnes contiguës dans les pandas augmente en raison de schémas d'accès au cache inefficaces. L'utilisation de structures de données et de tableaux C++ et de leurs mémoires tampons entraîne également une surcharge.

Pour 1 Mo comme ci-dessus, sur mon portable (Quad-core Xeon E3-1505M) il s'avère :

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Il s'avère que le débit effectif est de 7.75 Gb/s pour restaurer une DataFrame de 1 Go à partir de 1024 chunks de 1 Mo. Que se passe-t-il si nous utilisons des morceaux plus gros ou plus petits ? Voici les résultats que vous obtenez :

Diffuser des données de colonne avec Apache Arrow

Les performances chutent considérablement de 256 64 à 1 16 morceaux. J'ai été surpris que les morceaux de XNUMX Mo soient traités plus rapidement que les morceaux de XNUMX Mo. Cela vaut la peine de faire une étude plus approfondie et de comprendre s'il s'agit d'une distribution normale ou si quelque chose d'autre est impliqué.

Dans l'implémentation actuelle du format, les données ne sont en principe pas compressées, de sorte que la taille en mémoire et "sur le fil" est approximativement la même. La compression pourrait devenir une option à l'avenir.

Total

Les données de colonne en continu peuvent être un moyen efficace de transférer de grands ensembles de données vers des outils d'analyse de colonne comme les pandas en petits morceaux. Les services de données qui utilisent le stockage orienté ligne peuvent transférer et transposer de petits morceaux de données plus pratiques pour les caches L2 et L3 de votre processeur.

Code complet

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Source: habr.com

Ajouter un commentaire