Analizando 25 TB usando AWK e R

Analizando 25 TB usando AWK e R
Como ler este artigo: Pido desculpas polo texto tan longo e caótico. Para aforrarche tempo, comezo cada capítulo cunha introdución "O que aprendín", que resume a esencia do capítulo nunha ou dúas frases.

"Só móstrame a solución!" Se só queres ver de onde veño, pasa ao capítulo "Vendo máis inventivo", pero creo que é máis interesante e útil ler sobre o fracaso.

Recentemente encargáronme a creación dun proceso para procesar un gran volume de secuencias de ADN en bruto (tecnicamente un chip SNP). A necesidade era obter rapidamente datos sobre unha determinada localización xenética (chamada SNP) para o modelado posterior e outras tarefas. Usando R e AWK, puiden limpar e organizar os datos dun xeito natural, acelerando moito o procesamento de consultas. Isto non foi doado para min e requiriu numerosas iteracións. Este artigo axudarache a evitar algúns dos meus erros e amosarache o que acabei.

En primeiro lugar, algunhas explicacións introdutorias.

Datos

O noso centro de procesamento de información xenética universitaria proporcionounos datos en forma de TSV de 25 TB. Recibínos divididos en 5 paquetes, comprimidos por Gzip, cada un dos cales contiña uns 240 ficheiros de catro gigabytes. Cada fila contiña datos para un SNP dun individuo. En total, transmitíronse datos sobre ~ 2,5 millóns de SNP e ~ 60 mil persoas. Ademais da información SNP, os ficheiros contiñan numerosas columnas con números que reflicten varias características, como a intensidade de lectura, a frecuencia de diferentes alelos, etc. En total había unhas 30 columnas con valores únicos.

Meta

Como en calquera proxecto de xestión de datos, o máis importante era determinar como se utilizarían os datos. Neste caso seleccionaremos principalmente modelos e fluxos de traballo para SNP baseados en SNP. É dicir, só necesitaremos datos dun SNP á vez. Tiven que aprender a recuperar todos os rexistros asociados a un dos 2,5 millóns de SNP da forma máis sinxela, rápida e barata posible.

Como non facer isto

Para citar un cliché axeitado:

Non fallei mil veces, só descubrín mil formas de evitar analizar unha morea de datos nun formato de consulta.

Primeiro intento

Que aprendín: Non hai unha forma barata de analizar 25 TB á vez.

Despois de ter feito o curso "Métodos avanzados para o procesamento de grandes datos" na Universidade de Vanderbilt, estaba seguro de que o truco estaba na bolsa. Probablemente tardará unha ou dúas horas en configurar o servidor Hive para executar todos os datos e informar do resultado. Dado que os nosos datos están almacenados en AWS S3, usei o servizo Atena, que lle permite aplicar consultas SQL de Hive aos datos S3. Non necesitas configurar/crear un clúster Hive e tamén pagas só polos datos que buscas.

Despois de mostrarlle a Athena os meus datos e o seu formato, realicei algunhas probas con consultas como esta:

select * from intensityData limit 10;

E rapidamente recibiu resultados ben estruturados. Listo.

Ata que intentamos utilizar os datos no noso traballo...

Pedíronme que sacara toda a información do SNP para probar o modelo. Realicei a consulta:


select * from intensityData 
where snp = 'rs123456';

...e comezou a esperar. Despois de oito minutos e máis de 4 TB de datos solicitados, recibín o resultado. Athena cobra polo volume de datos atopados, 5 dólares por terabyte. Entón, esta única solicitude custou 20 dólares e oito minutos de espera. Para executar o modelo con todos os datos, tivemos que esperar 38 anos e pagar 50 millóns de dólares, obviamente, isto non era axeitado para nós.

Era necesario utilizar parquet...

Que aprendín: Teña coidado co tamaño dos seus ficheiros Parquet e coa súa organización.

Primeiro tentei solucionar a situación convertendo todos os TSV a Fichas de parquet. Son convenientes para traballar con conxuntos de datos grandes porque a información neles almacénase en forma de columnas: cada columna atópase no seu propio segmento de memoria/disco, a diferenza dos ficheiros de texto, nos que as filas conteñen elementos de cada columna. E se precisas atopar algo, só tes que ler a columna necesaria. Ademais, cada ficheiro almacena un intervalo de valores nunha columna, polo que se o valor que busca non está dentro do intervalo da columna, Spark non perderá tempo escaneando todo o ficheiro.

Fixen unha tarefa sinxela AWS Glue para converter os nosos TSV a Parquet e soltou os novos ficheiros en Athena. Levou unhas 5 horas. Pero cando executei a solicitude, levou aproximadamente a mesma cantidade de tempo e un pouco menos de diñeiro en completar. O caso é que Spark, intentando optimizar a tarefa, simplemente desempaquetou un anaco de TSV e púxoo no seu propio anaco de Parquet. E como cada anaco era o suficientemente grande como para conter os rexistros completos de moitas persoas, cada ficheiro contiña todos os SNP, polo que Spark tivo que abrir todos os ficheiros para extraer a información que necesitaba.

Curiosamente, o tipo de compresión predeterminado (e recomendado) de Parquet, rápido, non se pode dividir. Polo tanto, cada executor quedou atrapado na tarefa de desempaquetar e descargar o conxunto de datos completo de 3,5 GB.

Analizando 25 TB usando AWK e R

Imos entender o problema

Que aprendín: Ordenar é difícil, especialmente se os datos están distribuídos.

Pareceume que agora entendía a esencia do problema. Só necesitaba ordenar os datos por columna SNP, non por persoas. Despois, varios SNP almacenaranse nun anaco de datos separado e, a continuación, a función "intelixente" de Parquet "abrir só se o valor está dentro do intervalo" amosarase en todo o seu esplendor. Desafortunadamente, clasificar miles de millóns de filas repartidas por un clúster resultou ser unha tarefa difícil.

AWS definitivamente non quere emitir un reembolso debido ao motivo "son un estudante distraído". Despois de executar a clasificación en Amazon Glue, funcionou durante 2 días e fallou.

Que pasa coa partición?

Que aprendín: As particións en Spark deben estar equilibradas.

Entón ocorréuseme a idea de dividir os datos en cromosomas. Hai 23 deles (e varios máis se se ten en conta o ADN mitocondrial e as rexións non mapeadas).
Isto permitirache dividir os datos en anacos máis pequenos. Se engades só unha liña á función de exportación de Spark no script Glue partition_by = "chr", entón os datos deben dividirse en baldes.

Analizando 25 TB usando AWK e R
O xenoma está formado por numerosos fragmentos chamados cromosomas.

Desafortunadamente, non funcionou. Os cromosomas teñen diferentes tamaños, o que significa diferentes cantidades de información. Isto significa que as tarefas que Spark enviou aos traballadores non se equilibraron e se completaron lentamente porque algúns nodos remataron cedo e estaban inactivos. Non obstante, as tarefas foron completadas. Pero ao pedir un SNP, o desequilibrio volveu causar problemas. O custo de procesar os SNP en cromosomas máis grandes (é dicir, onde queremos obter datos) só diminuíu nun factor 10. Moito, pero non suficiente.

E se o dividimos en partes aínda máis pequenas?

Que aprendín: Nunca intente facer 2,5 millóns de particións.

Decidín facer todo e particionei cada SNP. Isto garantiu que as particións fosen do mesmo tamaño. FOI UNHA MALA IDEA. Usei Glue e engadín unha liña inocente partition_by = 'snp'. A tarefa comezou e comezou a executarse. Un día despois comprobei e vin que aínda non había nada escrito no S3, así que matei a tarefa. Parece que Glue estaba escribindo ficheiros intermedios nunha localización oculta en S3, moitos ficheiros, quizais un par de millóns. Como resultado, o meu erro custou máis de mil dólares e non gustou ao meu mentor.

Partición + clasificación

Que aprendín: Ordenar aínda é difícil, igual que a afinación de Spark.

O meu último intento de partición consistiu en particionar os cromosomas e logo clasificar cada partición. En teoría, isto aceleraría cada consulta porque os datos SNP desexados tiñan que estar dentro duns poucos anacos de Parquet dentro dun intervalo determinado. Desafortunadamente, clasificar incluso os datos particionados resultou ser unha tarefa difícil. Como resultado, cambiei a EMR para un clúster personalizado e usei oito instancias potentes (C5.4xl) e Sparklyr para crear un fluxo de traballo máis flexible...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...porén, a tarefa aínda non estaba rematada. Configureino de diferentes xeitos: aumentou a asignación de memoria para cada executor de consultas, utilicei nodos cunha gran cantidade de memoria, utilicei variables de difusión (variables de difusión), pero cada vez estas resultaron ser medias compases, e aos poucos os executores comezaron. fallar ata que todo se detivo.

Estou facendo máis creativo

Que aprendín: Ás veces, os datos especiais requiren solucións especiais.

Cada SNP ten un valor de posición. Este é un número correspondente ao número de bases ao longo do seu cromosoma. Esta é unha forma agradable e natural de organizar os nosos datos. Nun principio quería dividir por rexións de cada cromosoma. Por exemplo, posicións 1 - 2000, 2001 - 4000, etc. Pero o problema é que os SNP non se distribúen uniformemente entre os cromosomas, polo que os tamaños dos grupos variarán moito.

Analizando 25 TB usando AWK e R

Como resultado, cheguei a un desglose das posicións en categorías (rango). Usando os datos xa descargados, realicei unha solicitude para obter unha lista de SNP únicos, as súas posicións e cromosomas. Despois clasifiquei os datos dentro de cada cromosoma e recollín os SNP en grupos (bin) dun tamaño determinado. Digamos 1000 SNP cada un. Isto deume a relación SNP-a-grupo-por-cromosoma.

Ao final, fixen grupos (bin) de 75 SNP, o motivo explicarase a continuación.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Primeiro proba con Spark

Que aprendín: A agregación de Spark é rápida, pero a partición aínda é cara.

Quería ler este pequeno marco de datos (2,5 millóns de filas) en Spark, combinalo cos datos en bruto e, a continuación, particionalo pola columna recén engadida. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

usei sdf_broadcast(), polo que Spark sabe que debería enviar o marco de datos a todos os nodos. Isto é útil se os datos son de pequeno tamaño e son necesarios para todas as tarefas. En caso contrario, Spark tenta ser intelixente e distribúe os datos segundo sexa necesario, o que pode causar desaceleracións.

E, de novo, a miña idea non funcionou: as tarefas funcionaron durante algún tempo, completaron a unión, e despois, como os executores lanzados por partición, comezaron a fracasar.

Engadindo AWK

Que aprendín: Non durmes cando che ensinan o básico. Seguramente alguén xa resolveu o teu problema alá polos anos 1980.

Ata este punto, o motivo de todos os meus fallos con Spark foi o revolto de datos no clúster. Quizais a situación poida mellorarse cun tratamento previo. Decidín tentar dividir os datos de texto en bruto en columnas de cromosomas, polo que esperaba proporcionar a Spark datos "pre-particionados".

Busquei en StackOverflow como dividir por valores de columna e atopei unha resposta tan xenial. Con AWK pode dividir un ficheiro de texto por valores de columna escribindoo nun script en lugar de enviar os resultados a stdout.

Escribín un script de Bash para probalo. Baixou un dos TSV empaquetados e despois desempaqueteo usando gzip e enviado a awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Funcionou!

Enchendo os núcleos

Que aprendín: gnu parallel - é unha cousa máxica, todos deberían usalo.

A separación foi bastante lenta e cando comecei htoppara comprobar o uso dunha potente (e cara) instancia EC2, resultou que estaba a usar só un núcleo e uns 200 MB de memoria. Para solucionar o problema e non perder moito diñeiro, tivemos que descubrir como paralelizar o traballo. Afortunadamente, nun libro absolutamente incrible Ciencia de datos na liña de comandos Atopei un capítulo de Jeron Janssens sobre a paralelización. Del aprendín gnu parallel, un método moi flexible para implementar multithreading en Unix.

Analizando 25 TB usando AWK e R
Cando comecei a partición usando o novo proceso, todo estaba ben, pero aínda había un pescozo de botella: a descarga de obxectos S3 no disco non era moi rápida e non estaba totalmente paralela. Para solucionar isto, fixen isto:

  1. Descubrín que é posible implementar a fase de descarga de S3 directamente na canalización, eliminando completamente o almacenamento intermedio no disco. Isto significa que podo evitar escribir datos en bruto no disco e usar almacenamento aínda máis pequeno e, polo tanto, máis barato en AWS.
  2. equipo aws configure set default.s3.max_concurrent_requests 50 aumentou moito o número de fíos que usa AWS CLI (por defecto hai 10).
  3. Cambiei a unha instancia EC2 optimizada para a velocidade da rede, coa letra n no nome. Descubrín que a perda de potencia de procesamento cando se usan n instancias está máis que compensada polo aumento da velocidade de carga. Para a maioría das tarefas usei c5n.4xl.
  4. Cambiado gzip en pigz, esta é unha ferramenta gzip que pode facer cousas interesantes para paralelizar a tarefa inicialmente non paralela de descomprimir ficheiros (isto axudou o menos).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Estes pasos combínanse entre si para que todo funcione moi rápido. Ao aumentar a velocidade de descarga e eliminar as escrituras no disco, agora podería procesar un paquete de 5 terabytes en só unhas horas.

Este chío debería mencionar "TSV". Ai.

Usando datos recentemente analizados

Que aprendín: A Spark gústanlle os datos sen comprimir e non lle gusta combinar particións.

Agora os datos estaban en S3 nun formato desempaquetado (léase: compartido) e semiordenado, e podería volver a Spark de novo. Agardábame unha sorpresa: de novo non conseguín o que quería! Foi moi difícil dicir a Spark exactamente como se particionaban os datos. E mesmo cando fixen isto, resultou que había demasiadas particións (95 mil) e cando usei coalesce reduciu o seu número a límites razoables, isto destruíu a miña partición. Estou seguro de que se pode solucionar, pero despois dun par de días de busca non puiden atopar unha solución. Finalmente rematei todas as tarefas en Spark, aínda que levou un tempo e os meus ficheiros de Parquet divididos non eran moi pequenos (~200 KB). Non obstante, os datos estaban onde facían falta.

Analizando 25 TB usando AWK e R
Demasiado pequeno e irregular, marabilloso!

Probando consultas Spark locais

Que aprendín: Spark ten demasiada sobrecarga á hora de resolver problemas sinxelos.

Ao descargar os datos nun formato intelixente, puiden probar a velocidade. Configure un script R para executar un servidor Spark local e, a continuación, cargue un marco de datos Spark desde o almacenamento do grupo Parquet especificado (bineira). Tentei cargar todos os datos pero non puiden facer que Sparklyr recoñecese a partición.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

A execución levou 29,415 segundos. Moito mellor, pero non demasiado bo para probas masivas de nada. Ademais, non puiden acelerar as cousas coa memoria caché porque cando tentaba almacenar na memoria caché un marco de datos, Spark sempre fallaba, mesmo cando asignei máis de 50 GB de memoria a un conxunto de datos que pesaba menos de 15.

Volver a AWK

Que aprendín: As matrices asociativas en AWK son moi eficientes.

Decateime de que podía acadar velocidades máis altas. Lembreime diso dun xeito marabilloso Tutorial AWK de Bruce Barnett Lin sobre unha función interesante chamada "matrices asociativas" Esencialmente, estes son pares clave-valor, que por algún motivo chamáronse de forma diferente en AWK e, polo tanto, non pensei moito neles. Roman Cheplyaka lembrou que o termo "matrices asociativas" é moito máis antigo que o termo "par clave-valor". Aínda que ti busque o valor clave en Google Ngram, non verás este termo alí, pero atoparás matrices asociativas! Ademais, o "par clave-valor" adóitase asociar con bases de datos, polo que ten moito máis sentido comparalo cun mapa hash. Decateime de que podía usar estas matrices asociativas para asociar os meus SNP cunha táboa de papel e datos en bruto sen usar Spark.

Para iso, no script AWK usei o bloque BEGIN. Este é un fragmento de código que se executa antes de pasar a primeira liña de datos ao corpo principal do script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Equipo while(getline...) cargou todas as filas do grupo CSV (bin), estableza a primeira columna (nome SNP) como clave para a matriz asociativa bin e o segundo valor (grupo) como valor. Despois no bloque { }, que se executa en todas as liñas do ficheiro principal, cada liña envíase ao ficheiro de saída, que recibe un nome único dependendo do seu grupo (bin): ..._bin_"bin[$1]"_....

Variables batch_num и chunk_id coincidiu cos datos proporcionados pola canalización, evitando unha condición de carreira e cada fío de execución en execución parallel, escribiu no seu propio ficheiro único.

Xa que espallei todos os datos en bruto en cartafoles dos cromosomas que sobraron do meu experimento anterior con AWK, agora podería escribir outro script Bash para procesar un cromosoma á vez e enviar datos particionados máis profundos a S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

O guión ten dúas seccións parallel.

Na primeira sección, lense os datos de todos os ficheiros que conteñan información sobre o cromosoma desexado, despois estes datos distribúense en fíos, que distribúen os ficheiros nos grupos apropiados (bineira). Para evitar condicións de carreira cando varios fíos escriben no mesmo ficheiro, AWK pasa os nomes dos ficheiros para escribir datos en diferentes lugares, por exemplo. chr_10_bin_52_batch_2_aa.csv. Como resultado, créanse moitos ficheiros pequenos no disco (para iso usei volumes EBS de terabytes).

Transportador do segundo tramo parallel pasa polos grupos (bin) e combina os seus ficheiros individuais nun CSV común c cate despois envíaos para exportación.

Emisión en R?

Que aprendín: Podes contactar stdin и stdout desde un script R e, polo tanto, utilízao no proceso.

Podes ter notado esta liña no teu script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce todos os ficheiros de grupos concatenados (bin) ao script R a continuación. {} é unha técnica especial parallel, que insire todos os datos que envíe ao fluxo especificado directamente no propio comando. Opción {#} proporciona un ID de fío único e {%} representa o número de praza de traballo (repetido, pero nunca simultaneamente). Pódese atopar unha lista de todas as opcións en documentación.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Cando unha variable file("stdin") transmitido a readr::read_csv, os datos traducidos ao script R cárganse nun cadro, que está entón no formulario .rds-usando arquivo aws.s3 escrito directamente en S3.

RDS é algo así como unha versión junior de Parquet, sen os adornos do almacenamento dos altofalantes.

Despois de rematar o script de Bash conseguín un paquete .rds-arquivos localizados en S3, o que me permitiu usar unha compresión eficiente e tipos incorporados.

A pesar do uso do freo R, todo funcionou moi rápido. Non é sorprendente que as partes de R que len e escriben datos estean altamente optimizadas. Despois de probar nun cromosoma de tamaño medio, o traballo completouse nunha instancia C5n.4xl nunhas dúas horas.

S3 Limitacións

Que aprendín: Grazas á implementación de ruta intelixente, S3 pode xestionar moitos ficheiros.

Preocupábame se S3 sería capaz de xestionar os moitos ficheiros que lle foron transferidos. Podería facer que os nomes dos ficheiros teñan sentido, pero como os buscaría S3?

Analizando 25 TB usando AWK e R
Os cartafoles en S3 son só para mostrar, de feito o sistema non está interesado no símbolo /. Desde a páxina de preguntas frecuentes de S3.

Parece que S3 representa o camiño a un ficheiro en particular como unha clave simple nunha especie de táboa hash ou base de datos baseada en documentos. Un depósito pódese considerar unha táboa e os ficheiros pódense considerar rexistros nesa táboa.

Dado que a velocidade e a eficiencia son importantes para obter beneficios en Amazon, non é de estrañar que este sistema de claves como rutas de ficheiros estea moi optimizado. Intentei atopar un equilibrio: para que non tivera que facer moitas solicitudes de obtención, pero que as solicitudes se executasen rapidamente. Descubriuse que o mellor é facer uns 20 mil ficheiros bin. Creo que se seguimos optimizando, podemos conseguir un aumento da velocidade (por exemplo, facendo un cubo especial só para os datos, reducindo así o tamaño da táboa de busca). Pero non había tempo nin diñeiro para máis experimentos.

Que pasa coa compatibilidade cruzada?

O que aprendín: a principal causa de perda de tempo é a optimización prematura do seu método de almacenamento.

Neste punto, é moi importante preguntarse: "Por que usar un formato de ficheiro propietario?" O motivo reside na velocidade de carga (os ficheiros CSV con gzip tardaron 7 veces máis en cargarse) e na compatibilidade cos nosos fluxos de traballo. Podo reconsiderar se R pode cargar facilmente ficheiros Parquet (ou Arrow) sen a carga de Spark. Todos no noso laboratorio usan R e, se teño que converter os datos a outro formato, aínda teño os datos de texto orixinais, polo que podo executar a canalización de novo.

División do traballo

Que aprendín: Non intentes optimizar os traballos manualmente, deixa que o faga o ordenador.

Depurei o fluxo de traballo nun cromosoma, agora teño que procesar todos os demais datos.
Quería crear varias instancias EC2 para a conversión, pero ao mesmo tempo tiña medo de conseguir unha carga moi desequilibrada en diferentes traballos de procesamento (igual que Spark sufría particións desequilibradas). Ademais, non estaba interesado en crear unha instancia por cromosoma, porque para as contas de AWS hai un límite predeterminado de 10 instancias.

Entón decidín escribir un script en R para optimizar os traballos de procesamento.

En primeiro lugar, pedinlle a S3 que calculase canto espazo de almacenamento ocupaba cada cromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Despois escribín unha función que toma o tamaño total, baralla a orde dos cromosomas, divídeos en grupos num_jobs e indícalle o diferente que son os tamaños de todos os traballos de procesamento.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Despois percorrín mil shuffles usando ronroneo e escollín o mellor.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Entón acabei cun conxunto de tarefas que eran moi similares en tamaño. Entón todo o que quedaba era envolver o meu guión de Bash anterior nun gran bucle for. Esta optimización tardou uns 10 minutos en escribirse. E isto é moito menos do que gastaría en crear tarefas manualmente se estivesen desequilibradas. Polo tanto, creo que tiña razón con esta optimización preliminar.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Ao final engado o comando de apagado:

sudo shutdown -h now

... e todo funcionou! Usando a AWS CLI, levantei instancias usando a opción user_data deulles scripts Bash das súas tarefas para procesar. Correron e apagáronse automaticamente, polo que non estaba pagando por potencia de procesamento extra.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Imos empacar!

Que aprendín: A API debe ser sinxela por mor da facilidade e flexibilidade de uso.

Finalmente conseguín os datos no lugar e na forma correctas. Só restaba simplificar ao máximo o proceso de uso dos datos para facilitarllo aos meus compañeiros. Quería facer unha API sinxela para crear solicitudes. Se no futuro decido cambiar de .rds aos ficheiros Parquet, entón isto debería ser un problema para min, non para os meus compañeiros. Para iso decidín facer un paquete R interno.

Constrúe e documente un paquete moi sinxelo que conteña só algunhas funcións de acceso a datos organizadas arredor dunha función get_snp. Tamén fixen unha páxina web para os meus compañeiros paquete abaixo, para que poidan ver facilmente exemplos e documentación.

Analizando 25 TB usando AWK e R

Caché intelixente

Que aprendín: Se os teus datos están ben preparados, o almacenamento en caché será doado!

Dado que un dos fluxos de traballo principais aplicou o mesmo modelo de análise ao paquete SNP, decidín usar binning para min. Ao transmitir datos a través de SNP, toda a información do grupo (bin) está anexada ao obxecto devolto. É dicir, as consultas antigas poden (en teoría) acelerar o procesamento de consultas novas.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Ao construír o paquete, executei moitos puntos de referencia para comparar a velocidade ao usar diferentes métodos. Recomendo non descoidar isto, porque ás veces os resultados son inesperados. Por exemplo, dplyr::filter foi moito máis rápido que capturar filas mediante o filtrado baseado na indexación, e recuperar unha única columna dun marco de datos filtrado foi moito máis rápido que usar a sintaxe de indexación.

Teña en conta que o obxecto prev_snp_results contén a chave snps_in_bin. Esta é unha matriz de todos os SNP únicos nun grupo (bineira), o que che permite comprobar rapidamente se xa tes datos dunha consulta anterior. Tamén facilita o recorrido por todos os SNP dun grupo (bineiro) con este código:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Descubrimentos

Agora podemos (e comezamos a executar en serio) modelos e escenarios que antes eran inaccesibles para nós. O mellor é que os meus compañeiros de laboratorio non teñen que pensar en ningunha complicación. Só teñen unha función que funciona.

E aínda que o paquete aforralles os detalles, tentei simplificar o formato de datos o suficiente como para que puidesen averiguarlo se de súpeto desaparecese mañá...

A velocidade aumentou notablemente. Adoitamos escanear fragmentos de xenoma funcionalmente significativos. Anteriormente, non podíamos facelo (resultou demasiado caro), pero agora, grazas á estrutura do grupo (bin) e á memoria caché, a solicitude dun SNP leva de media menos de 0,1 segundos e o uso de datos é tan baixo que os custos para S3 son cacahuetes.

Conclusión

Este artigo non é unha guía en absoluto. A solución resultou individual, e case seguro que non é a óptima. Máis ben, é un caderno de viaxe. Quero que outros entendan que este tipo de decisións non aparecen totalmente formadas na cabeza, son o resultado de ensaio e erro. Ademais, se estás a buscar un científico de datos, ten en conta que usar estas ferramentas de forma efectiva require experiencia e a experiencia custa diñeiro. Estou feliz de ter os medios para pagar, pero moitos outros que poden facer o mesmo traballo mellor ca min nunca terán a oportunidade por falta de cartos de probalo.

As ferramentas de big data son versátiles. Se tes tempo, case seguro que podes escribir unha solución máis rápida usando técnicas intelixentes de limpeza, almacenamento e extracción de datos. En definitiva, trátase dunha análise custo-beneficio.

O que aprendín:

  • non hai unha forma barata de analizar 25 TB á vez;
  • teña coidado co tamaño dos seus ficheiros Parquet e coa súa organización;
  • As particións en Spark deben estar equilibradas;
  • En xeral, nunca intente facer 2,5 millóns de particións;
  • Ordenar aínda é difícil, igual que configurar Spark;
  • ás veces, os datos especiais requiren solucións especiais;
  • A agregación de Spark é rápida, pero a partición aínda é cara;
  • non durmes cando che ensinan o básico, probablemente alguén xa solucionou o teu problema alá polos anos 1980;
  • gnu parallel - isto é unha cousa máxica, todos deberían usalo;
  • A Spark gústanlle os datos sen comprimir e non lle gusta combinar particións;
  • Spark ten demasiada sobrecarga ao resolver problemas sinxelos;
  • As matrices asociativas de AWK son moi eficientes;
  • podes contactar stdin и stdout desde un script R e, polo tanto, utilízao na canalización;
  • Grazas á implementación de camiños intelixentes, S3 pode procesar moitos ficheiros;
  • O principal motivo para perder tempo é optimizar prematuramente o seu método de almacenamento;
  • non intentes optimizar as tarefas manualmente, deixa que o faga o ordenador;
  • A API debe ser sinxela por mor da facilidade e flexibilidade de uso;
  • Se os teus datos están ben preparados, o caché será doado!

Fonte: www.habr.com

Engadir un comentario