Transmitindo dados de coluna com Apache Arrow

A tradução do artigo foi elaborada especificamente para os alunos do curso Engenheiro de dados.

Transmitindo dados de coluna com Apache Arrow

Nas últimas semanas tivemos Nong Li Adicionado a Flecha Apache formato de streaming binário, complementando o formato de arquivo de acesso aleatório/IPC existente. Temos implementações Java e C++ e ligações Python. Neste artigo, explicarei como o formato funciona e mostrarei como você pode obter uma taxa de transferência de dados muito alta para um DataFrame do pandas.

Dados da coluna de streaming

Uma pergunta comum que recebo dos usuários do Arrow é o alto custo da migração de grandes conjuntos de dados tabulares de um formato orientado a linhas ou registros para um formato orientado a colunas. Para conjuntos de dados de vários gigabytes, a transposição na memória ou no disco pode ser uma tarefa árdua.

Para transmitir dados, sejam os dados de origem linha ou coluna, uma opção é enviar pequenos lotes de linhas, cada um contendo um layout colunar.

No Apache Arrow, a coleção de matrizes de colunas na memória que representam um pedaço da tabela é chamada de lote de registros. Para representar uma única estrutura de dados de uma tabela lógica, vários lotes de registros podem ser coletados.

No formato de arquivo de "acesso aleatório" existente, gravamos metadados contendo o esquema da tabela e locais de bloco no final do arquivo, permitindo que você selecione de forma extremamente barata qualquer lote de registros ou qualquer coluna de um conjunto de dados. No formato streaming, enviamos uma série de mensagens: um esboço e depois um ou mais lotes de registros.

Os diferentes formatos são mais ou menos assim:

Transmitindo dados de coluna com Apache Arrow

Streaming de dados no PyArrow: aplicativo

Para mostrar como isso funciona, criarei um conjunto de dados de exemplo representando um único bloco de fluxo:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Agora, digamos que queremos gravar 1 GB de dados, consistindo em pedaços de 1 MB cada, totalizando 1024 pedaços. Para começar, vamos criar o primeiro quadro de dados de 1 MB com 16 colunas:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Então eu os converto para pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Agora vou criar um fluxo de saída que irá gravar na RAM e criar StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Em seguida, escreveremos 1024 pedaços, que no final totalizarão um conjunto de dados de 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Como escrevemos na RAM, podemos obter todo o fluxo em um buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Como esses dados estão na memória, a leitura de lotes de registros Arrow é uma operação de cópia zero. Abro o StreamReader, leio os dados pyarrow.Tablee, em seguida, converta-os para DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tudo isso é, claro, bom, mas você pode ter dúvidas. Com que rapidez isso acontece? Como o tamanho do pedaço afeta o desempenho de recuperação do DataFrame do pandas?

Desempenho de streaming

À medida que o tamanho do bloco de streaming diminui, o custo de reconstrução de um DataFrame colunar contíguo no pandas aumenta devido a padrões de acesso ao cache ineficientes. Há também alguma sobrecarga no trabalho com estruturas de dados e matrizes C++ e seus buffers de memória.

Para 1 MB, como acima, no meu laptop (Quad-core Xeon E3-1505M) acontece:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Acontece que a taxa de transferência efetiva é de 7.75 GB/s para restaurar um DataFrame de 1 GB a partir de 1024 blocos de 1 MB. O que acontece se usarmos pedaços maiores ou menores? Estes são os resultados:

Transmitindo dados de coluna com Apache Arrow

O desempenho cai significativamente de pedaços de 256K para 64K. Fiquei surpreso ao ver que pedaços de 1 MB foram processados ​​mais rapidamente do que pedaços de 16 MB. Vale a pena fazer um estudo mais aprofundado e entender se se trata de uma distribuição normal ou se há algo mais em jogo.

Na implementação atual do formato, os dados não são compactados em princípio, portanto o tamanho na memória e “nos fios” é aproximadamente o mesmo. No futuro, a compressão poderá se tornar uma opção adicional.

Total

O streaming de dados colunares pode ser uma forma eficaz de alimentar grandes conjuntos de dados em ferramentas de análise colunar, como pandas, em pequenos pedaços. Os serviços de dados que usam armazenamento orientado a linhas podem transferir e transpor pequenos pedaços de dados que são mais convenientes para os caches L2 e L3 do seu processador.

Código completo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

Fonte: habr.com

Adicionar um comentário