Analisando 25 TB usando AWK e R

Analisando 25 TB usando AWK e R
Como ler este artigo: Peço desculpas pelo texto ser tão longo e caótico. Para economizar seu tempo, começo cada capítulo com uma introdução “O que aprendi”, que resume a essência do capítulo em uma ou duas frases.

“Apenas me mostre a solução!” Se você quiser apenas saber de onde vim, pule para o capítulo “Tornando-se mais inventivo”, mas acho que é mais interessante e útil ler sobre o fracasso.

Recentemente fui encarregado de configurar um processo para processar um grande volume de sequências brutas de DNA (tecnicamente um chip SNP). A necessidade era obter rapidamente dados sobre uma determinada localização genética (chamada SNP) para posterior modelagem e outras tarefas. Utilizando R e AWK, consegui limpar e organizar os dados de forma natural, agilizando bastante o processamento de consultas. Isso não foi fácil para mim e exigiu inúmeras iterações. Este artigo irá ajudá-lo a evitar alguns dos meus erros e mostrar o que acabei fazendo.

Primeiro, algumas explicações introdutórias.

Dados

Nosso centro universitário de processamento de informações genéticas nos forneceu dados na forma de um TSV de 25 TB. Recebi-os divididos em 5 pacotes, compactados por Gzip, cada um contendo cerca de 240 arquivos de quatro gigabytes. Cada linha continha dados para um SNP de um indivíduo. No total, foram transmitidos dados sobre cerca de 2,5 milhões de SNPs e cerca de 60 mil pessoas. Além das informações do SNP, os arquivos continham inúmeras colunas com números refletindo diversas características, como intensidade de leitura, frequência de diferentes alelos, etc. No total, havia cerca de 30 colunas com valores únicos.

Meta

Como acontece com qualquer projeto de gerenciamento de dados, o mais importante era determinar como os dados seriam utilizados. Nesse caso selecionaremos principalmente modelos e fluxos de trabalho para SNP com base em SNP. Ou seja, precisaremos apenas de dados de um SNP por vez. Tive que aprender como recuperar todos os registros associados a um dos 2,5 milhões de SNPs da maneira mais fácil, rápida e barata possível.

Como não fazer isso

Para citar um clichê adequado:

Não falhei mil vezes, apenas descobri mil maneiras de evitar a análise de um monte de dados em um formato amigável para consulta.

A primeira tentativa

O que eu aprendi: Não existe uma maneira barata de analisar 25 TB por vez.

Tendo feito o curso “Métodos Avançados para Processamento de Big Data” na Universidade Vanderbilt, eu tinha certeza de que o truque estava na mão. Provavelmente levará uma ou duas horas para configurar o servidor Hive para analisar todos os dados e relatar o resultado. Como nossos dados estão armazenados no AWS S3, usei o serviço Atena, que permite aplicar consultas SQL do Hive a dados do S3. Você não precisa configurar/criar um cluster Hive e também paga apenas pelos dados que procura.

Depois de mostrar ao Athena meus dados e seu formato, executei alguns testes com consultas como esta:

select * from intensityData limit 10;

E rapidamente recebeu resultados bem estruturados. Preparar.

Até que tentamos usar os dados em nosso trabalho...

Pediram-me para extrair todas as informações do SNP para testar o modelo. Executei a consulta:


select * from intensityData 
where snp = 'rs123456';

...e comecei a esperar. Após oito minutos e mais de 4 TB de dados solicitados, recebi o resultado. Athena cobra pelo volume de dados encontrados, US$ 5 por terabyte. Portanto, essa única solicitação custou US$ 20 e oito minutos de espera. Para executar o modelo com todos os dados, tivemos que esperar 38 anos e pagar US$ 50 milhões. Obviamente, isso não era adequado para nós.

Foi necessário usar Parquet...

O que eu aprendi: Tenha cuidado com o tamanho dos seus arquivos Parquet e sua organização.

Primeiro tentei consertar a situação convertendo todos os TSVs para Arquivos parquet. Eles são convenientes para trabalhar com grandes conjuntos de dados porque as informações neles contidas são armazenadas em formato colunar: cada coluna fica em seu próprio segmento de memória/disco, em contraste com os arquivos de texto, nos quais as linhas contêm elementos de cada coluna. E se precisar encontrar algo, basta ler a coluna obrigatória. Além disso, cada arquivo armazena um intervalo de valores em uma coluna, portanto, se o valor que você procura não estiver no intervalo da coluna, o Spark não perderá tempo verificando o arquivo inteiro.

Executei uma tarefa simples Cola AWS para converter nossos TSVs para Parquet e colocar os novos arquivos no Athena. Demorou cerca de 5 horas. Mas quando executei a solicitação, levei quase o mesmo tempo e um pouco menos de dinheiro para ser concluída. O fato é que o Spark, tentando otimizar a tarefa, simplesmente descompactou um pedaço do TSV e colocou-o em seu próprio pedaço do Parquet. E como cada pedaço era grande o suficiente para conter todos os registros de muitas pessoas, cada arquivo continha todos os SNPs, então o Spark teve que abrir todos os arquivos para extrair as informações necessárias.

Curiosamente, o tipo de compactação padrão (e recomendado) do Parquet, snappy, não pode ser dividido. Portanto, cada executor ficou preso na tarefa de descompactar e baixar o conjunto de dados completo de 3,5 GB.

Analisando 25 TB usando AWK e R

Vamos entender o problema

O que eu aprendi: a classificação é difícil, especialmente se os dados forem distribuídos.

Pareceu-me que agora entendia a essência do problema. Eu só precisei classificar os dados por coluna SNP, não por pessoas. Em seguida, vários SNPs serão armazenados em um bloco de dados separado, e então a função “inteligente” do Parquet “abrir somente se o valor estiver no intervalo” se mostrará em toda a sua glória. Infelizmente, classificar bilhões de linhas espalhadas por um cluster provou ser uma tarefa difícil.

A AWS definitivamente não deseja emitir um reembolso devido ao motivo "Sou um estudante distraído". Depois de executar a classificação no Amazon Glue, ele funcionou por 2 dias e travou.

E quanto ao particionamento?

O que eu aprendi: As partições no Spark devem ser balanceadas.

Então tive a ideia de particionar os dados em cromossomos. Existem 23 deles (e vários mais se levarmos em conta o DNA mitocondrial e as regiões não mapeadas).
Isso permitirá que você divida os dados em partes menores. Se você adicionar apenas uma linha à função de exportação do Spark no script Glue partition_by = "chr", os dados deverão ser divididos em intervalos.

Analisando 25 TB usando AWK e R
O genoma consiste em numerosos fragmentos chamados cromossomos.

Infelizmente, não funcionou. Os cromossomos têm tamanhos diferentes, o que significa diferentes quantidades de informação. Isso significa que as tarefas que o Spark enviou aos trabalhadores não foram balanceadas e concluídas lentamente porque alguns nós terminaram mais cedo e ficaram ociosos. No entanto, as tarefas foram concluídas. Mas ao solicitar um SNP, o desequilíbrio voltou a causar problemas. O custo de processamento de SNPs em cromossomos maiores (isto é, onde queremos obter dados) diminuiu apenas cerca de um fator de 10. Muito, mas não o suficiente.

E se o dividirmos em partes ainda menores?

O que eu aprendi: Nunca tente fazer 2,5 milhões de partições.

Decidi fazer tudo e particionar cada SNP. Isso garantiu que as partições fossem do mesmo tamanho. FOI UMA IDEIA RUIM. Usei cola e adicionei uma linha inocente partition_by = 'snp'. A tarefa foi iniciada e começou a ser executada. Um dia depois verifiquei e vi que ainda não havia nada escrito no S3, então encerrei a tarefa. Parece que o Glue estava gravando arquivos intermediários em um local oculto no S3, muitos arquivos, talvez alguns milhões. Como resultado, meu erro custou mais de mil dólares e não agradou meu mentor.

Particionamento + classificação

O que eu aprendi: A classificação ainda é difícil, assim como o ajuste do Spark.

Minha última tentativa de particionamento envolveu particionar os cromossomos e depois classificar cada partição. Em teoria, isso aceleraria cada consulta porque os dados SNP desejados deveriam estar dentro de alguns blocos Parquet dentro de um determinado intervalo. Infelizmente, classificar até mesmo dados particionados revelou-se uma tarefa difícil. Como resultado, mudei para o EMR para um cluster personalizado e usei oito instâncias poderosas (C5.4xl) e Sparklyr para criar um fluxo de trabalho mais flexível...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...no entanto, a tarefa ainda não foi concluída. Eu configurei de diferentes maneiras: aumentei a alocação de memória para cada executor de consulta, usei nós com grande quantidade de memória, usei variáveis ​​​​de transmissão (variáveis ​​​​de transmissão), mas cada vez acabaram sendo meias medidas e gradualmente os executores começaram falhar até que tudo parasse.

Estou me tornando mais criativo

O que eu aprendi: Às vezes, dados especiais requerem soluções especiais.

Cada SNP possui um valor de posição. Este é um número correspondente ao número de bases ao longo de seu cromossomo. Esta é uma maneira agradável e natural de organizar nossos dados. No começo eu queria particionar por regiões de cada cromossomo. Por exemplo, posições 1 - 2000, 2001 - 4000, etc. Mas o problema é que os SNPs não estão distribuídos uniformemente pelos cromossomos, portanto os tamanhos dos grupos variam muito.

Analisando 25 TB usando AWK e R

Como resultado, cheguei a uma divisão das posições em categorias (classificação). Usando os dados já baixados, executei uma solicitação para obter uma lista de SNPs únicos, suas posições e cromossomos. Em seguida, classifiquei os dados dentro de cada cromossomo e coletei os SNPs em grupos (bin) de um determinado tamanho. Digamos 1000 SNPs cada. Isso me deu a relação SNP-grupo-por-cromossomo.

Ao final fiz grupos (bin) de 75 SNPs, o motivo será explicado a seguir.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Primeira tentativa com Spark

O que eu aprendi: a agregação do Spark é rápida, mas o particionamento ainda é caro.

Eu queria ler esse pequeno quadro de dados (2,5 milhões de linhas) no Spark, combiná-lo com os dados brutos e particioná-lo pela coluna recém-adicionada bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

eu usei sdf_broadcast(), então o Spark sabe que deve enviar o quadro de dados para todos os nós. Isso é útil se os dados forem pequenos e necessários para todas as tarefas. Caso contrário, o Spark tenta ser inteligente e distribui os dados conforme necessário, o que pode causar lentidão.

E novamente minha ideia não funcionou: as tarefas funcionaram por algum tempo, completaram a união e então, como os executores lançados pelo particionamento, começaram a falhar.

Adicionando AWK

O que eu aprendi: Não durma enquanto estiver aprendendo o básico. Certamente alguém já resolveu seu problema na década de 1980.

Até este ponto, o motivo de todas as minhas falhas com o Spark foi a confusão de dados no cluster. Talvez a situação possa ser melhorada com o pré-tratamento. Decidi tentar dividir os dados de texto brutos em colunas de cromossomos, então esperava fornecer ao Spark dados “pré-particionados”.

Pesquisei no StackOverflow como dividir por valores de coluna e encontrei uma ótima resposta. Com o AWK você pode dividir um arquivo de texto por valores de coluna, escrevendo-o em um script em vez de enviar os resultados para stdout.

Eu escrevi um script Bash para testar. Baixei um dos TSVs empacotados e descompactei-o usando gzip e enviado para awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Funcionou!

Preenchendo os núcleos

O que eu aprendi: gnu parallel - é uma coisa mágica, todos deveriam usar.

A separação foi bastante lenta e quando comecei htoppara verificar o uso de uma instância EC2 poderosa (e cara), descobri que eu estava usando apenas um núcleo e cerca de 200 MB de memória. Para resolver o problema e não perder muito dinheiro, tivemos que descobrir como paralelizar o trabalho. Felizmente, em um livro absolutamente incrível Ciência de dados na linha de comando Encontrei um capítulo de Jeron Janssens sobre paralelização. Com isso aprendi sobre gnu parallel, um método muito flexível para implementar multithreading em Unix.

Analisando 25 TB usando AWK e R
Quando iniciei o particionamento usando o novo processo, estava tudo bem, mas ainda havia um gargalo - o download de objetos S3 para o disco não era muito rápido e não era totalmente paralelizado. Para corrigir isso, fiz o seguinte:

  1. Descobri que é possível implementar a etapa de download do S3 diretamente no pipeline, eliminando completamente o armazenamento intermediário em disco. Isso significa que posso evitar gravar dados brutos em disco e usar armazenamento ainda menor e, portanto, mais barato na AWS.
  2. equipe aws configure set default.s3.max_concurrent_requests 50 aumentou bastante o número de threads que a AWS CLI usa (por padrão, são 10).
  3. Mudei para uma instância EC2 otimizada para velocidade de rede, com a letra n no nome. Descobri que a perda de poder de processamento ao usar n instâncias é mais do que compensada pelo aumento na velocidade de carregamento. Para a maioria das tarefas usei c5n.4xl.
  4. Mudado gzip em pigz, esta é uma ferramenta gzip que pode fazer coisas legais para paralelizar a tarefa inicialmente não paralelizada de descompactar arquivos (isso ajudou menos).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Essas etapas são combinadas entre si para que tudo funcione muito rapidamente. Ao aumentar a velocidade de download e eliminar gravações em disco, agora eu poderia processar um pacote de 5 terabytes em apenas algumas horas.

Este tweet deveria ter mencionado ‘TSV’. Infelizmente.

Usando dados recentemente analisados

O que eu aprendi: Spark gosta de dados descompactados e não gosta de combinar partições.

Agora os dados estavam no S3 em formato descompactado (leia-se: compartilhado) e semiordenado, e eu poderia retornar ao Spark novamente. Uma surpresa me esperava: novamente não consegui o que queria! Foi muito difícil dizer ao Spark exatamente como os dados foram particionados. E mesmo quando fiz isso, descobri que havia muitas partições (95 mil), e quando usei coalesce reduzi seu número a limites razoáveis, isso destruiu meu particionamento. Tenho certeza de que isso pode ser corrigido, mas depois de alguns dias de pesquisa não consegui encontrar uma solução. Finalmente terminei todas as tarefas no Spark, embora tenha demorado um pouco e meus arquivos Parquet divididos não fossem muito pequenos (~200 KB). No entanto, os dados estavam onde eram necessários.

Analisando 25 TB usando AWK e R
Muito pequeno e irregular, maravilhoso!

Testando consultas locais do Spark

O que eu aprendi: o Spark tem muita sobrecarga ao resolver problemas simples.

Ao baixar os dados em um formato inteligente, consegui testar a velocidade. Configure um script R para executar um servidor Spark local e, em seguida, carregue um quadro de dados Spark do armazenamento do grupo Parquet especificado (bin). Tentei carregar todos os dados, mas não consegui que o Sparklyr reconhecesse o particionamento.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

A execução durou 29,415 segundos. Muito melhor, mas não muito bom para testes em massa de qualquer coisa. Além disso, não consegui acelerar as coisas com o cache porque quando tentei armazenar em cache um quadro de dados na memória, o Spark sempre travava, mesmo quando eu alocava mais de 50 GB de memória para um conjunto de dados que pesava menos de 15.

Voltar para AWK

O que eu aprendi: Matrizes associativas no AWK são muito eficientes.

Percebi que poderia atingir velocidades mais altas. Lembrei-me disso de uma forma maravilhosa Tutorial AWK de Bruce Barnett Eu li sobre um recurso interessante chamado “matrizes associativas" Essencialmente, esses são pares de valores-chave que, por algum motivo, foram chamados de forma diferente no AWK e, portanto, de alguma forma, não pensei muito sobre eles. Roman Cheplyak lembrou que o termo “matrizes associativas” é muito mais antigo que o termo “par chave-valor”. Mesmo se você procure o valor-chave no Google Ngram, você não verá esse termo lá, mas encontrará matrizes associativas! Além disso, o “par chave-valor” é mais frequentemente associado a bancos de dados, por isso faz muito mais sentido compará-lo com um hashmap. Percebi que poderia usar essas matrizes associativas para associar meus SNPs a uma tabela bin e dados brutos sem usar o Spark.

Para fazer isso, no script AWK usei o bloco BEGIN. Este é um trecho de código executado antes que a primeira linha de dados seja passada para o corpo principal do script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Equipe while(getline...) carregou todas as linhas do grupo CSV (bin), defina a primeira coluna (nome SNP) como a chave para a matriz associativa bin e o segundo valor (grupo) como o valor. Então no bloco { }, que é executado em todas as linhas do arquivo principal, cada linha é enviada para o arquivo de saída, que recebe um nome único dependendo do seu grupo (bin): ..._bin_"bin[$1]"_....

Variáveis batch_num и chunk_id correspondeu aos dados fornecidos pelo pipeline, evitando uma condição de corrida, e cada thread de execução em execução parallel, gravou em seu próprio arquivo exclusivo.

Como espalhei todos os dados brutos em pastas nos cromossomos que sobraram do meu experimento anterior com AWK, agora eu poderia escrever outro script Bash para processar um cromossomo por vez e enviar dados particionados mais profundos para o S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

O script tem duas seções parallel.

Na primeira seção, os dados são lidos de todos os arquivos contendo informações sobre o cromossomo desejado, depois esses dados são distribuídos entre threads, que distribuem os arquivos nos grupos apropriados (bin). Para evitar condições de corrida quando vários threads gravam no mesmo arquivo, o AWK passa os nomes dos arquivos para gravar dados em locais diferentes, por exemplo. chr_10_bin_52_batch_2_aa.csv. Como resultado, muitos arquivos pequenos são criados no disco (para isso usei volumes EBS de terabyte).

Transportador da segunda seção parallel percorre os grupos (bin) e combina seus arquivos individuais em CSV comum c cate depois os envia para exportação.

Transmitindo em R?

O que eu aprendi: Você pode entrar em contato stdin и stdout de um script R e, portanto, usá-lo no pipeline.

Você deve ter notado esta linha em seu script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ele traduz todos os arquivos de grupo concatenados (bin) no script R abaixo. {} é uma técnica especial parallel, que insere todos os dados enviados ao fluxo especificado diretamente no próprio comando. Opção {#} fornece um ID de thread exclusivo e {%} representa o número do slot de trabalho (repetido, mas nunca simultaneamente). Uma lista de todas as opções pode ser encontrada em documentação.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Quando uma variável file("stdin") transmitido para readr::read_csv, os dados traduzidos no script R são carregados em um quadro, que fica no formato .rds-arquivo usando aws.s3 gravado diretamente no S3.

O RDS é algo como uma versão júnior do Parquet, sem as frescuras do armazenamento dos alto-falantes.

Depois de terminar o script Bash, recebi um pacote .rds-arquivos localizados no S3, o que me permitiu usar compactação eficiente e tipos integrados.

Apesar do uso do freio R, tudo funcionou muito rápido. Não é de surpreender que as partes do R que leem e gravam dados sejam altamente otimizadas. Depois de testar em um cromossomo de tamanho médio, o trabalho foi concluído em uma instância C5n.4xl em cerca de duas horas.

Limitações S3

O que eu aprendi: Graças à implementação do caminho inteligente, o S3 pode lidar com muitos arquivos.

Eu estava preocupado se o S3 seria capaz de lidar com os muitos arquivos que foram transferidos para ele. Eu poderia fazer com que os nomes dos arquivos fizessem sentido, mas como o S3 os procuraria?

Analisando 25 TB usando AWK e R
As pastas no S3 são apenas para exibição, na verdade o sistema não está interessado no símbolo /. Na página de perguntas frequentes do S3.

Parece que o S3 representa o caminho para um arquivo específico como uma chave simples em uma espécie de tabela hash ou banco de dados baseado em documento. Um bucket pode ser considerado uma tabela e os arquivos podem ser considerados registros nessa tabela.

Como velocidade e eficiência são importantes para obter lucro na Amazon, não é surpresa que esse sistema de chave como caminho de arquivo seja extremamente otimizado. Tentei encontrar um equilíbrio: para não ter que fazer muitas solicitações get, mas para que as solicitações fossem executadas rapidamente. Descobriu-se que é melhor criar cerca de 20 mil arquivos bin. Acho que se continuarmos a otimizar, podemos conseguir um aumento na velocidade (por exemplo, fazendo um balde especial apenas para dados, reduzindo assim o tamanho da tabela de pesquisa). Mas não havia tempo nem dinheiro para novas experiências.

E quanto à compatibilidade cruzada?

O que aprendi: A principal causa de perda de tempo é a otimização prematura do método de armazenamento.

Neste ponto, é muito importante se perguntar: “Por que usar um formato de arquivo proprietário?” O motivo está na velocidade de carregamento (arquivos CSV compactados demoravam 7 vezes mais para carregar) e na compatibilidade com nossos fluxos de trabalho. Posso reconsiderar se R pode carregar facilmente arquivos Parquet (ou Arrow) sem o carregamento do Spark. Todos em nosso laboratório usam R e, se eu precisar converter os dados para outro formato, ainda terei os dados de texto originais, então posso simplesmente executar o pipeline novamente.

Divisão de trabalho

O que eu aprendi: Não tente otimizar os trabalhos manualmente, deixe o computador fazer isso.

Depurei o fluxo de trabalho em um cromossomo, agora preciso processar todos os outros dados.
Eu queria criar várias instâncias do EC2 para conversão, mas ao mesmo tempo tinha medo de obter uma carga muito desequilibrada em diferentes trabalhos de processamento (assim como o Spark sofria com partições desequilibradas). Além disso, não estava interessado em criar uma instância por cromossomo, pois para contas AWS existe um limite padrão de 10 instâncias.

Então decidi escrever um script em R para otimizar os trabalhos de processamento.

Primeiro, pedi ao S3 que calculasse quanto espaço de armazenamento cada cromossomo ocupava.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Então escrevi uma função que pega o tamanho total, embaralha a ordem dos cromossomos e os divide em grupos num_jobs e informa quão diferentes são os tamanhos de todos os trabalhos de processamento.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Então fiz mil embaralhamentos usando ronronar e escolhi o melhor.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Então acabei com um conjunto de tarefas de tamanho muito semelhante. Então tudo o que restou foi envolver meu script Bash anterior em um grande loop for. Essa otimização levou cerca de 10 minutos para ser escrita. E isso é muito menos do que eu gastaria criando tarefas manualmente se elas estivessem desequilibradas. Portanto, acho que acertei nessa otimização preliminar.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

No final adiciono o comando shutdown:

sudo shutdown -h now

... e deu tudo certo! Usando a AWS CLI, criei instâncias usando a opção user_data deu-lhes scripts Bash de suas tarefas para processamento. Eles funcionavam e desligavam automaticamente, então eu não estava pagando por poder de processamento extra.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Vamos fazer as malas!

O que eu aprendi: A API deve ser simples para facilitar e flexibilidade de uso.

Finalmente consegui os dados no lugar e na forma certos. Faltava apenas simplificar ao máximo o processo de utilização dos dados para facilitar aos meus colegas. Eu queria fazer uma API simples para criar solicitações. Se no futuro eu decidir mudar de .rds aos arquivos Parquet, isso deverá ser um problema para mim, não para meus colegas. Para isso decidi fazer um pacote R interno.

Construa e documente um pacote muito simples contendo apenas algumas funções de acesso a dados organizadas em torno de uma função get_snp. Também fiz um site para meus colegas pacote para baixo, para que possam ver facilmente exemplos e documentação.

Analisando 25 TB usando AWK e R

Cache inteligente

O que eu aprendi: Se seus dados estiverem bem preparados, o armazenamento em cache será fácil!

Como um dos principais fluxos de trabalho aplicava o mesmo modelo de análise ao pacote SNP, decidi usar o binning a meu favor. Ao transmitir dados via SNP, todas as informações do grupo (bin) são anexadas ao objeto retornado. Ou seja, consultas antigas podem (em teoria) acelerar o processamento de novas consultas.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Ao construir o pacote, executei vários benchmarks para comparar a velocidade ao usar métodos diferentes. Recomendo não descuidar disso, pois às vezes os resultados são inesperados. Por exemplo, dplyr::filter foi muito mais rápido do que capturar linhas usando filtragem baseada em indexação, e recuperar uma única coluna de um quadro de dados filtrado foi muito mais rápido do que usar sintaxe de indexação.

Observe que o objeto prev_snp_results contém a chave snps_in_bin. Esta é uma matriz de todos os SNPs exclusivos em um grupo (bin), permitindo verificar rapidamente se você já possui dados de uma consulta anterior. Também facilita o loop por todos os SNPs em um grupo (bin) com este código:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Descobertas

Agora podemos (e começamos a fazê-lo seriamente) executar modelos e cenários que antes eram inacessíveis para nós. O melhor é que meus colegas de laboratório não precisam pensar em complicações. Eles apenas têm uma função que funciona.

E embora o pacote os poupe dos detalhes, tentei tornar o formato dos dados simples o suficiente para que eles pudessem descobrir se eu desaparecesse repentinamente amanhã...

A velocidade aumentou visivelmente. Geralmente escaneamos fragmentos do genoma funcionalmente significativos. Anteriormente, não podíamos fazer isso (acabou sendo muito caro), mas agora, graças à estrutura do grupo (bin) e ao cache, uma solicitação para um SNP leva em média menos de 0,1 segundos, e o uso de dados é tão tão baixo que os custos do S3 são insignificantes.

Conclusão

Este artigo não é um guia. A solução acabou sendo individual e quase certamente não ideal. Pelo contrário, é um diário de viagem. Quero que os outros entendam que tais decisões não aparecem totalmente formadas na cabeça, são resultado de tentativa e erro. Além disso, se você estiver procurando um cientista de dados, lembre-se de que o uso eficaz dessas ferramentas requer experiência, e experiência custa dinheiro. Estou feliz por ter tido meios para pagar, mas muitos outros que conseguem fazer o mesmo trabalho melhor do que eu nunca terão a oportunidade por falta de dinheiro para sequer tentar.

As ferramentas de big data são versáteis. Se você tiver tempo, quase certamente poderá escrever uma solução mais rápida usando técnicas inteligentes de limpeza, armazenamento e extração de dados. Em última análise, tudo se resume a uma análise de custo-benefício.

O que eu aprendi:

  • não existe uma maneira barata de analisar 25 TB por vez;
  • tenha cuidado com o tamanho dos seus arquivos Parquet e sua organização;
  • As partições no Spark devem ser balanceadas;
  • Em geral, nunca tente fazer 2,5 milhões de partições;
  • A classificação ainda é difícil, assim como a configuração do Spark;
  • às vezes, dados especiais requerem soluções especiais;
  • A agregação do Spark é rápida, mas o particionamento ainda é caro;
  • não durma quando te ensinarem o básico, provavelmente alguém já resolveu seu problema na década de 1980;
  • gnu parallel - isso é uma coisa mágica, todos deveriam usar;
  • Spark gosta de dados descompactados e não gosta de combinar partições;
  • O Spark tem muita sobrecarga ao resolver problemas simples;
  • Os arrays associativos do AWK são muito eficientes;
  • você pode entrar em contato stdin и stdout de um script R e, portanto, usá-lo no pipeline;
  • Graças à implementação do caminho inteligente, o S3 pode processar muitos arquivos;
  • A principal razão para perder tempo é a otimização prematura do seu método de armazenamento;
  • não tente otimizar tarefas manualmente, deixe o computador fazer isso;
  • A API deve ser simples para facilitar e flexibilidade de uso;
  • Se seus dados estiverem bem preparados, o armazenamento em cache será fácil!

Fonte: habr.com

Adicionar um comentário