Analizando 25 TB usando AWK y R

Analizando 25 TB usando AWK y R
Cómo leer este artículo: Pido disculpas por que el texto sea tan largo y caótico. Para ahorrarle tiempo, comienzo cada capítulo con una introducción de “Lo que aprendí”, que resume la esencia del capítulo en una o dos oraciones.

"¡Sólo muéstrame la solución!" Si sólo quieres ver de dónde vengo, salta al capítulo “Volverte más inventivo”, pero creo que es más interesante y útil leer sobre el fracaso.

Recientemente me encargaron la tarea de configurar un proceso para procesar un gran volumen de secuencias de ADN sin procesar (técnicamente, un chip SNP). La necesidad era obtener rápidamente datos sobre una ubicación genética determinada (llamada SNP) para el modelado posterior y otras tareas. Usando R y AWK, pude limpiar y organizar datos de forma natural, acelerando enormemente el procesamiento de consultas. Esto no fue fácil para mí y requirió numerosas iteraciones. Este artículo te ayudará a evitar algunos de mis errores y te mostrará con qué terminé.

Primero, algunas explicaciones introductorias.

Datos

El centro de procesamiento de información genética de nuestra universidad nos proporcionó datos en forma de TSV de 25 TB. Los recibí divididos en 5 paquetes, comprimidos con Gzip, cada uno de los cuales contenía unos 240 archivos de cuatro gigabytes. Cada fila contenía datos para un SNP de un individuo. En total, se transmitieron datos sobre ~2,5 millones de SNP y ~60 mil personas. Además de la información de SNP, los archivos contenían numerosas columnas con números que reflejaban diversas características, como intensidad de lectura, frecuencia de diferentes alelos, etc. En total había unas 30 columnas con valores únicos.

objetivo

Como ocurre con cualquier proyecto de gestión de datos, lo más importante era determinar cómo se utilizarían los datos. En este caso Principalmente seleccionaremos modelos y flujos de trabajo para SNP basados ​​en SNP.. Es decir, sólo necesitaremos datos de un SNP a la vez. Tuve que aprender a recuperar todos los registros asociados con uno de los 2,5 millones de SNP de la manera más fácil, rápida y económica posible.

Como no hacer esto

Para citar un cliché adecuado:

No fallé mil veces, simplemente descubrí mil formas de evitar analizar una gran cantidad de datos en un formato fácil de consultar.

Primer intento

¿Qué he aprendido?: No existe una forma económica de analizar 25 TB a la vez.

Después de haber tomado el curso “Métodos avanzados para el procesamiento de Big Data” en la Universidad de Vanderbilt, estaba seguro de que el truco estaba en la bolsa. Probablemente llevará una o dos horas configurar el servidor Hive para que ejecute todos los datos e informe el resultado. Como nuestros datos están almacenados en AWS S3, utilicé el servicio Athena, que le permite aplicar consultas SQL de Hive a datos de S3. No necesita configurar/crear un clúster de Hive y también paga solo por los datos que está buscando.

Después de mostrarle a Athena mis datos y su formato, realicé algunas pruebas con consultas como esta:

select * from intensityData limit 10;

Y rápidamente recibió resultados bien estructurados. Listo.

Hasta que intentamos utilizar los datos en nuestro trabajo...

Me pidieron que sacara toda la información del SNP para probar el modelo. Ejecuté la consulta:


select * from intensityData 
where snp = 'rs123456';

...y empezó a esperar. Después de ocho minutos y más de 4 TB de datos solicitados, recibí el resultado. Athena cobra por el volumen de datos encontrados, 5 dólares por terabyte. Entonces esta única solicitud costó $20 y ocho minutos de espera. Para ejecutar el modelo con todos los datos, tuvimos que esperar 38 años y pagar 50 millones de dólares, lo que obviamente no era adecuado para nosotros.

Era necesario utilizar parquet...

¿Qué he aprendido?: Tenga cuidado con el tamaño de sus archivos Parquet y su organización.

Primero intenté solucionar la situación convirtiendo todos los TSV a Limas de parquet. Son convenientes para trabajar con grandes conjuntos de datos porque la información que contienen se almacena en forma de columnas: cada columna se encuentra en su propio segmento de memoria/disco, a diferencia de los archivos de texto, en los que las filas contienen elementos de cada columna. Y si necesita encontrar algo, simplemente lea la columna requerida. Además, cada archivo almacena un rango de valores en una columna, por lo que si el valor que busca no está en el rango de la columna, Spark no perderá tiempo escaneando todo el archivo.

Ejecuté una tarea simple Pegamento AWS para convertir nuestros TSV a Parquet y colocar los nuevos archivos en Athena. Tardaron unas 5 horas. Pero cuando ejecuté la solicitud, me llevó aproximadamente la misma cantidad de tiempo y un poco menos de dinero completarla. El hecho es que Spark, al intentar optimizar la tarea, simplemente descomprimió un fragmento de TSV y lo colocó en su propio fragmento de Parquet. Y como cada fragmento era lo suficientemente grande como para contener los registros completos de muchas personas, cada archivo contenía todos los SNP, por lo que Spark tuvo que abrir todos los archivos para extraer la información que necesitaba.

Curiosamente, el tipo de compresión predeterminado (y recomendado) de Parquet, rápido, no se puede dividir. Por lo tanto, cada ejecutor se vio atascado en la tarea de descomprimir y descargar el conjunto de datos completo de 3,5 GB.

Analizando 25 TB usando AWK y R

Entendamos el problema

¿Qué he aprendido?: La clasificación es difícil, especialmente si los datos están distribuidos.

Me pareció que ahora entendía la esencia del problema. Sólo necesitaba ordenar los datos por columna SNP, no por personas. Luego, se almacenarán varios SNP en un fragmento de datos separado y luego la función "inteligente" de Parquet "abrir sólo si el valor está dentro del rango" se mostrará en todo su esplendor. Desafortunadamente, clasificar miles de millones de filas dispersas en un grupo resultó ser una tarea difícil.

AWS definitivamente no quiere emitir un reembolso debido al motivo "Soy un estudiante distraído". Después de ejecutar la clasificación en Amazon Glue, funcionó durante 2 días y falló.

¿Qué pasa con la partición?

¿Qué he aprendido?: Las particiones en Spark deben estar equilibradas.

Luego se me ocurrió la idea de dividir datos en cromosomas. Hay 23 de ellos (y varios más si se tiene en cuenta el ADN mitocondrial y las regiones no cartografiadas).
Esto le permitirá dividir los datos en partes más pequeñas. Si agrega solo una línea a la función de exportación de Spark en el script Glue partition_by = "chr", entonces los datos deben dividirse en depósitos.

Analizando 25 TB usando AWK y R
El genoma está formado por numerosos fragmentos llamados cromosomas.

Desafortunadamente, no funcionó. Los cromosomas tienen diferentes tamaños, lo que significa diferentes cantidades de información. Esto significa que las tareas que Spark envió a los trabajadores no estaban equilibradas y se completaron lentamente porque algunos nodos terminaron antes y estaban inactivos. Sin embargo, las tareas se completaron. Pero al pedir un SNP, el desequilibrio volvió a causar problemas. El costo de procesar SNP en cromosomas más grandes (es decir, donde queremos obtener datos) solo ha disminuido aproximadamente en un factor de 10. Mucho, pero no suficiente.

¿Y si lo dividimos en partes aún más pequeñas?

¿Qué he aprendido?: Nunca intentes hacer 2,5 millones de particiones.

Decidí hacer todo lo posible y dividir cada SNP. Esto aseguró que las particiones fueran del mismo tamaño. FUE UNA MALA IDEA. Usé Glue y agregué una línea inocente. partition_by = 'snp'. La tarea comenzó y comenzó a ejecutarse. Un día después verifiqué y vi que todavía no había nada escrito en S3, así que eliminé la tarea. Parece que Glue estaba escribiendo archivos intermedios en una ubicación oculta en S3, muchos archivos, quizás un par de millones. Como resultado, mi error costó más de mil dólares y no agradó a mi mentor.

Particionar + ordenar

¿Qué he aprendido?: La clasificación sigue siendo difícil, al igual que el ajuste de Spark.

Mi último intento de partición implicó dividir los cromosomas y luego clasificar cada partición. En teoría, esto aceleraría cada consulta porque los datos SNP deseados tenían que estar dentro de unos pocos fragmentos de Parquet dentro de un rango determinado. Desafortunadamente, clasificar incluso los datos particionados resultó ser una tarea difícil. Como resultado, cambié a EMR para un clúster personalizado y utilicé ocho instancias potentes (C5.4xl) y Sparklyr para crear un flujo de trabajo más flexible...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...sin embargo, la tarea aún no se había completado. Lo configuré de diferentes maneras: aumenté la asignación de memoria para cada ejecutor de consultas, usé nodos con una gran cantidad de memoria, usé variables de transmisión (variables de transmisión), pero cada vez resultaron ser medidas a medias, y gradualmente los ejecutores comenzaron fracasar hasta que todo se detuviera.

Me estoy volviendo más creativo

¿Qué he aprendido?: A veces, los datos especiales requieren soluciones especiales.

Cada SNP tiene un valor de posición. Este es un número correspondiente al número de bases a lo largo de su cromosoma. Esta es una forma agradable y natural de organizar nuestros datos. Al principio quería dividir por regiones de cada cromosoma. Por ejemplo, posiciones 1 - 2000, 2001 - 4000, etc. Pero el problema es que los SNP no están distribuidos uniformemente entre los cromosomas, por lo que el tamaño de los grupos variará mucho.

Analizando 25 TB usando AWK y R

Como resultado, llegué a un desglose de las posiciones en categorías (rango). Utilizando los datos ya descargados, ejecuté una solicitud para obtener una lista de SNP únicos, sus posiciones y cromosomas. Luego clasifiqué los datos dentro de cada cromosoma y recopilé SNP en grupos (bin) de un tamaño determinado. Digamos 1000 SNP cada uno. Esto me dio la relación SNP-grupo-por-cromosoma.

Al final, hice grupos (bin) de 75 SNP, el motivo se explicará a continuación.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Primer intento con Spark

¿Qué he aprendido?: La agregación Spark es rápida, pero la partición sigue siendo costosa.

Quería leer este pequeño marco de datos (2,5 millones de filas) en Spark, combinarlo con los datos sin procesar y luego particionarlo según la columna recién agregada. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

solía sdf_broadcast(), entonces Spark sabe que debe enviar el marco de datos a todos los nodos. Esto es útil si los datos son de tamaño pequeño y necesarios para todas las tareas. De lo contrario, Spark intenta ser inteligente y distribuye los datos según sea necesario, lo que puede provocar ralentizaciones.

Y nuevamente mi idea no funcionó: las tareas funcionaron durante algún tiempo, completaron la unión y luego, como los ejecutores lanzados por la partición, comenzaron a fallar.

Agregar AWK

¿Qué he aprendido?: No duermas cuando te estén enseñando lo básico. Seguramente alguien ya solucionó tu problema allá por los años 1980.

Hasta este punto, la razón de todos mis fracasos con Spark fue la confusión de datos en el clúster. Quizás la situación pueda mejorar con un tratamiento previo. Decidí intentar dividir los datos de texto sin formato en columnas de cromosomas, por lo que esperaba proporcionarle a Spark datos "preparticionados".

Busqué en StackOverflow cómo dividir por valores de columna y encontré que gran respuesta. Con AWK puede dividir un archivo de texto por valores de columna escribiéndolo en un script en lugar de enviar los resultados a stdout.

Escribí un script Bash para probarlo. Descargué uno de los TSV empaquetados y luego lo descomprimí usando gzip y enviado a awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

¡Funcionó!

Llenando los núcleos

¿Qué he aprendido?: gnu parallel - Es algo mágico, todo el mundo debería usarlo.

La separación fue bastante lenta y cuando comencé htopAl comprobar el uso de una potente (y costosa) instancia EC2, resultó que estaba usando solo un núcleo y unos 200 MB de memoria. Para solucionar el problema y no perder mucho dinero, tuvimos que descubrir cómo paralelizar el trabajo. Afortunadamente, en un libro absolutamente asombroso. Ciencia de datos en la línea de comandos Encontré un capítulo de Jeron Janssens sobre paralelización. De ahí aprendí sobre gnu parallel, un método muy flexible para implementar subprocesos múltiples en Unix.

Analizando 25 TB usando AWK y R
Cuando comencé la partición usando el nuevo proceso, todo estaba bien, pero todavía había un cuello de botella: la descarga de objetos S3 al disco no fue muy rápida y no estaba completamente paralelizada. Para solucionar esto, hice esto:

  1. Descubrí que es posible implementar la etapa de descarga de S3 directamente en la tubería, eliminando por completo el almacenamiento intermedio en el disco. Esto significa que puedo evitar escribir datos sin procesar en el disco y utilizar un almacenamiento aún más pequeño y, por lo tanto, más económico en AWS.
  2. equipo aws configure set default.s3.max_concurrent_requests 50 Aumentó considerablemente la cantidad de subprocesos que utiliza AWS CLI (de forma predeterminada, hay 10).
  3. Cambié a una instancia EC2 optimizada para la velocidad de la red, con la letra n en el nombre. He descubierto que la pérdida de potencia de procesamiento cuando se utilizan n instancias se compensa con creces con el aumento en la velocidad de carga. Para la mayoría de las tareas utilicé c5n.4xl.
  4. Cambió gzip en pigz, esta es una herramienta gzip que puede hacer cosas interesantes para paralelizar la tarea inicialmente no paralelizada de descomprimir archivos (esto fue lo que menos ayudó).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Estos pasos se combinan entre sí para que todo funcione muy rápidamente. Al aumentar las velocidades de descarga y eliminar las escrituras en el disco, ahora podía procesar un paquete de 5 terabytes en tan solo unas horas.

Este tweet debería haber mencionado 'TSV'. Pobre de mí.

Usando datos recién analizados

¿Qué he aprendido?: A Spark le gustan los datos sin comprimir y no le gusta combinar particiones.

Ahora los datos estaban en S3 en un formato desempaquetado (léase: compartido) y semiordenado, y podía volver a Spark nuevamente. Me esperaba una sorpresa: ¡nuevamente no logré lo que quería! Fue muy difícil decirle a Spark exactamente cómo se dividieron los datos. E incluso cuando hice esto, resultó que había demasiadas particiones (95 mil), y cuando usé coalesce Reduje su número a límites razonables, esto destruyó mi partición. Estoy seguro de que esto se puede solucionar, pero después de un par de días de búsqueda no pude encontrar una solución. Finalmente terminé todas las tareas en Spark, aunque me tomó un tiempo y mis archivos de Parquet divididos no eran muy pequeños (~200 KB). Sin embargo, los datos estaban donde se necesitaban.

Analizando 25 TB usando AWK y R
Demasiado pequeño y desigual, ¡maravilloso!

Probando consultas Spark locales

¿Qué he aprendido?: Spark tiene demasiada sobrecarga al resolver problemas simples.

Al descargar los datos en un formato inteligente, pude probar la velocidad. Configure un script R para ejecutar un servidor Spark local y luego cargue un marco de datos Spark desde el almacenamiento del grupo Parquet (bin) especificado. Intenté cargar todos los datos pero no pude lograr que Sparklyr reconociera la partición.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

La ejecución tardó 29,415 segundos. Mucho mejor, pero no demasiado bueno para realizar pruebas masivas de cualquier cosa. Además, no podía acelerar las cosas con el almacenamiento en caché porque cuando intentaba almacenar en caché un marco de datos en la memoria, Spark siempre fallaba, incluso cuando asignaba más de 50 GB de memoria a un conjunto de datos que pesaba menos de 15.

Volver a AWK

¿Qué he aprendido?: Las matrices asociativas en AWK son muy eficientes.

Me di cuenta de que podía alcanzar velocidades más altas. Lo recordé de una manera maravillosa. Tutorial de AWK de Bruce Barnett Leí sobre una característica interesante llamada "matrices asociativas" Básicamente, estos son pares clave-valor, que por alguna razón se llamaban de manera diferente en AWK y, por lo tanto, de alguna manera no pensé mucho en ellos. Cheplyaka romano Recordó que el término “matrices asociativas” es mucho más antiguo que el término “par clave-valor”. Incluso si tú busque el valor-clave en Google Ngram, no verá este término allí, ¡pero encontrará matrices asociativas! Además, el "par clave-valor" se asocia con mayor frecuencia con bases de datos, por lo que tiene mucho más sentido compararlo con un mapa hash. Me di cuenta de que podía usar estas matrices asociativas para asociar mis SNP con una tabla bin y datos sin procesar sin usar Spark.

Para hacer esto, en el script AWK usé el bloque BEGIN. Este es un fragmento de código que se ejecuta antes de que la primera línea de datos pase al cuerpo principal del script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Equipo while(getline...) cargó todas las filas del grupo CSV (bin), estableció la primera columna (nombre SNP) como clave para la matriz asociativa bin y el segundo valor (grupo) como valor. Luego en el bloque { }, que se ejecuta en todas las líneas del archivo principal, cada línea se envía al archivo de salida, que recibe un nombre único dependiendo de su grupo (bin): ..._bin_"bin[$1]"_....

Variables batch_num и chunk_id coincidió con los datos proporcionados por la canalización, evitando una condición de carrera, y cada hilo de ejecución en ejecución parallel, escribió en su propio archivo único.

Dado que dispersé todos los datos sin procesar en carpetas de los cromosomas que quedaron de mi experimento anterior con AWK, ahora podría escribir otro script Bash para procesar un cromosoma a la vez y enviar datos particionados más profundos a S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

El guión tiene dos secciones. parallel.

En la primera sección, se leen datos de todos los archivos que contienen información sobre el cromosoma deseado, luego estos datos se distribuyen entre subprocesos, que distribuyen los archivos en los grupos apropiados (bin). Para evitar condiciones de carrera cuando varios subprocesos escriben en el mismo archivo, AWK pasa los nombres de los archivos para escribir datos en diferentes lugares, p. chr_10_bin_52_batch_2_aa.csv. Como resultado, se crean muchos archivos pequeños en el disco (para esto utilicé volúmenes EBS de terabytes).

Transportador de la segunda sección. parallel recorre los grupos (bin) y combina sus archivos individuales en un CSV c común caty luego los envía para su exportación.

¿Transmitiendo en R?

¿Qué he aprendido?: Puedes contactar stdin и stdout desde un script R y, por lo tanto, utilizarlo en el proceso.

Es posible que hayas notado esta línea en tu script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Traduce todos los archivos de grupo concatenados (bin) al script R a continuación. {} es una técnica especial parallel, que inserta cualquier dato que envíe a la secuencia especificada directamente en el propio comando. Opción {#} proporciona una ID de hilo única, y {%} representa el número de puesto de trabajo (repetido, pero nunca simultáneamente). Puede encontrar una lista de todas las opciones en documentación.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Cuando una variable file("stdin") transmitido a readr::read_csv, los datos traducidos al script R se cargan en un marco, que luego tiene el formato .rds-archivo usando aws.s3 escrito directamente en S3.

RDS es algo así como una versión junior de Parquet, sin los lujos del almacenamiento de altavoces.

Después de terminar el script Bash obtuve un paquete .rds-archivos ubicados en S3, lo que me permitió usar compresión eficiente y tipos integrados.

A pesar del uso del freno R, todo funcionó muy rápidamente. No es sorprendente que las partes de R que leen y escriben datos estén altamente optimizadas. Después de realizar pruebas en un cromosoma de tamaño mediano, el trabajo se completó en una instancia C5n.4xl en aproximadamente dos horas.

Limitaciones de S3

¿Qué he aprendido?: Gracias a la implementación de rutas inteligentes, S3 puede manejar muchos archivos.

Me preocupaba si S3 sería capaz de manejar la gran cantidad de archivos que se le transfirieron. Podría hacer que los nombres de los archivos tuvieran sentido, pero ¿cómo los buscaría S3?

Analizando 25 TB usando AWK y R
Las carpetas en S3 son sólo para mostrar, de hecho al sistema no le interesa el símbolo /. Desde la página de preguntas frecuentes de S3.

Parece que S3 representa la ruta a un archivo en particular como una clave simple en una especie de tabla hash o base de datos basada en documentos. Un depósito puede considerarse como una tabla y los archivos pueden considerarse registros en esa tabla.

Dado que la velocidad y la eficiencia son importantes para obtener ganancias en Amazon, no sorprende que este sistema de clave como ruta de archivo esté increíblemente optimizado. Intenté encontrar un equilibrio: no tener que realizar muchas solicitudes de obtención, pero que las solicitudes se ejecutaran rápidamente. Resultó que lo mejor es crear unos 20 mil archivos bin. Creo que si continuamos optimizando, podemos lograr un aumento en la velocidad (por ejemplo, creando un depósito especial solo para datos, reduciendo así el tamaño de la tabla de búsqueda). Pero no hubo tiempo ni dinero para realizar más experimentos.

¿Qué pasa con la compatibilidad cruzada?

Lo que aprendí: la causa número uno de pérdida de tiempo es optimizar prematuramente su método de almacenamiento.

Llegados a este punto es muy importante preguntarse: “¿Por qué utilizar un formato de archivo propietario?” La razón radica en la velocidad de carga (los archivos CSV comprimidos con gzip tardaron 7 veces más en cargarse) y la compatibilidad con nuestros flujos de trabajo. Puedo reconsiderar si R puede cargar fácilmente archivos Parquet (o Arrow) sin la carga Spark. Todos en nuestro laboratorio usan R, y si necesito convertir los datos a otro formato, todavía tengo los datos de texto originales, por lo que puedo ejecutar la canalización nuevamente.

Division de trabajo

¿Qué he aprendido?: No intente optimizar los trabajos manualmente, deje que la computadora lo haga.

He depurado el flujo de trabajo en un cromosoma, ahora necesito procesar todos los demás datos.
Quería generar varias instancias EC2 para la conversión, pero al mismo tiempo tenía miedo de obtener una carga muy desequilibrada en diferentes trabajos de procesamiento (al igual que Spark sufría particiones desequilibradas). Además, no me interesaba generar una instancia por cromosoma, porque para las cuentas de AWS hay un límite predeterminado de 10 instancias.

Luego decidí escribir un script en R para optimizar los trabajos de procesamiento.

Primero, le pedí a S3 que calculara cuánto espacio de almacenamiento ocupaba cada cromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Luego escribí una función que toma el tamaño total, baraja el orden de los cromosomas y los divide en grupos. num_jobs y le indica cuán diferentes son los tamaños de todos los trabajos de procesamiento.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Luego realicé mil combinaciones aleatorias usando ronroneo y elegí la mejor.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Así que terminé con un conjunto de tareas que eran muy similares en tamaño. Luego todo lo que quedaba era envolver mi script Bash anterior en un gran bucle. for. Esta optimización tardó unos 10 minutos en escribirse. Y esto es mucho menos de lo que gastaría en crear tareas manualmente si estuvieran desequilibradas. Por tanto, creo que acerté con esta optimización preliminar.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Al final agrego el comando de apagado:

sudo shutdown -h now

... ¡y todo salió bien! Usando la CLI de AWS, generé instancias usando la opción user_data les dio scripts Bash de sus tareas para su procesamiento. Se ejecutaron y cerraron automáticamente, por lo que no tuve que pagar por potencia de procesamiento adicional.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

¡Empaquemos!

¿Qué he aprendido?: La API debe ser simple para facilitar y flexibilidad de uso.

Finalmente obtuve los datos en el lugar y forma correctos. Todo lo que quedaba era simplificar el proceso de uso de datos tanto como fuera posible para que fuera más fácil para mis colegas. Quería crear una API sencilla para crear solicitudes. Si en el futuro decido cambiar de .rds a archivos Parquet, entonces esto debería ser un problema para mí, no para mis colegas. Para ello decidí hacer un paquete interno de R.

Cree y documente un paquete muy simple que contenga solo algunas funciones de acceso a datos organizadas en torno a una función. get_snp. También hice un sitio web para mis colegas. paquete abajo, para que puedan ver fácilmente ejemplos y documentación.

Analizando 25 TB usando AWK y R

Almacenamiento en caché inteligente

¿Qué he aprendido?: ¡Si sus datos están bien preparados, el almacenamiento en caché será fácil!

Dado que uno de los flujos de trabajo principales aplicó el mismo modelo de análisis al paquete SNP, decidí utilizar la agrupación a mi favor. Al transmitir datos a través de SNP, toda la información del grupo (bin) se adjunta al objeto devuelto. Es decir, las consultas antiguas pueden (en teoría) acelerar el procesamiento de consultas nuevas.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Al crear el paquete, realicé muchas pruebas comparativas para comparar la velocidad al utilizar diferentes métodos. Recomiendo no descuidar esto, porque a veces los resultados son inesperados. Por ejemplo, dplyr::filter fue mucho más rápido que capturar filas usando filtrado basado en indexación, y recuperar una sola columna de un marco de datos filtrado fue mucho más rápido que usar sintaxis de indexación.

Tenga en cuenta que el objeto prev_snp_results contiene la clave snps_in_bin. Se trata de una matriz de todos los SNP únicos en un grupo (bin), lo que le permite comprobar rápidamente si ya tiene datos de una consulta anterior. También facilita el recorrido por todos los SNP de un grupo (bin) con este código:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

resultados

Ahora podemos (y hemos comenzado a hacerlo seriamente) ejecutar modelos y escenarios que antes nos eran inaccesibles. Lo mejor es que mis compañeros de laboratorio no tienen que pensar en complicaciones. Simplemente tienen una función que funciona.

Y aunque el paquete les ahorra detalles, traté de hacer que el formato de los datos fuera lo suficientemente simple como para que pudieran descifrarlo si desapareciera repentinamente mañana...

La velocidad ha aumentado notablemente. Normalmente escaneamos fragmentos del genoma funcionalmente significativos. Anteriormente, no podíamos hacer esto (resultó ser demasiado costoso), pero ahora, gracias a la estructura del grupo (bin) y al almacenamiento en caché, una solicitud de un SNP demora en promedio menos de 0,1 segundos y el uso de datos es tan tan bajo que los costos de S3 son insignificantes.

Conclusión

Este artículo no es una guía en absoluto. La solución resultó ser individual y casi con seguridad no óptima. Más bien, es un diario de viaje. Quiero que los demás comprendan que este tipo de decisiones no aparecen completamente formadas en la cabeza, son el resultado de prueba y error. Además, si está buscando un científico de datos, tenga en cuenta que el uso eficaz de estas herramientas requiere experiencia, y la experiencia cuesta dinero. Estoy feliz de haber tenido los medios para pagar, pero muchos otros que pueden hacer el mismo trabajo mejor que yo nunca tendrán la oportunidad debido a la falta de dinero para siquiera intentarlo.

Las herramientas de big data son versátiles. Si tiene tiempo, es casi seguro que podrá escribir una solución más rápida utilizando técnicas inteligentes de limpieza, almacenamiento y extracción de datos. En última instancia, todo se reduce a un análisis de costo-beneficio.

Que aprendí:

  • no existe una forma económica de analizar 25 TB a la vez;
  • tenga cuidado con el tamaño de sus archivos Parquet y su organización;
  • Las particiones en Spark deben estar equilibradas;
  • En general, nunca intente crear 2,5 millones de particiones;
  • La clasificación sigue siendo difícil, al igual que configurar Spark;
  • a veces, los datos especiales requieren soluciones especiales;
  • La agregación Spark es rápida, pero la partición sigue siendo costosa;
  • no duermas cuando te enseñen lo básico, probablemente alguien ya resolvió tu problema allá por los años 1980;
  • gnu parallel - esto es algo mágico, todos deberían usarlo;
  • A Spark le gustan los datos sin comprimir y no le gusta combinar particiones;
  • Spark tiene demasiados gastos generales al resolver problemas simples;
  • Los arreglos asociativos de AWK son muy eficientes;
  • puedes contactar stdin и stdout desde un script R y, por lo tanto, utilizarlo en el proceso;
  • Gracias a la implementación de rutas inteligentes, S3 puede procesar muchos archivos;
  • La principal razón para perder el tiempo es optimizar prematuramente su método de almacenamiento;
  • no intentes optimizar las tareas manualmente, deja que la computadora lo haga;
  • La API debe ser simple para facilitar y flexibilidad de uso;
  • Si sus datos están bien preparados, ¡el almacenamiento en caché será fácil!

Fuente: habr.com

Añadir un comentario