Analyse de 25 To à l'aide d'AWK et R

Analyse de 25 To à l'aide d'AWK et R
Comment lire cet article: Je m'excuse pour le texte si long et chaotique. Pour vous faire gagner du temps, je commence chaque chapitre par une introduction « Ce que j'ai appris », qui résume l'essence du chapitre en une ou deux phrases.

"Montre-moi simplement la solution!" Si vous voulez simplement voir d'où je viens, passez au chapitre « Devenir plus inventif », mais je pense qu'il est plus intéressant et utile de lire sur l'échec.

J'ai récemment été chargé de mettre en place un procédé permettant de traiter un grand volume de séquences d'ADN brutes (techniquement une puce SNP). Le besoin était d’obtenir rapidement des données sur un emplacement génétique donné (appelé SNP) pour une modélisation ultérieure et d’autres tâches. Grâce à R et AWK, j'ai pu nettoyer et organiser les données de manière naturelle, accélérant considérablement le traitement des requêtes. Cela n’a pas été facile pour moi et a nécessité de nombreuses itérations. Cet article vous aidera à éviter certaines de mes erreurs et vous montrera ce avec quoi je me suis retrouvé.

Tout d’abord, quelques explications introductives.

Données

Notre centre universitaire de traitement de l’information génétique nous a fourni des données sous la forme d’un TSV de 25 To. Je les ai reçus divisés en 5 paquets, compressés par Gzip, contenant chacun environ 240 fichiers de quatre Go. Chaque ligne contenait des données pour un SNP d'un individu. Au total, des données sur environ 2,5 millions de SNP et environ 60 30 personnes ont été transmises. En plus des informations SNP, les fichiers contenaient de nombreuses colonnes avec des nombres reflétant diverses caractéristiques, telles que l'intensité de lecture, la fréquence des différents allèles, etc. Au total, il y avait environ XNUMX colonnes avec des valeurs uniques.

Objectif

Comme pour tout projet de gestion de données, le plus important était de déterminer comment les données seraient utilisées. Dans ce cas nous sélectionnerons principalement des modèles et des flux de travail pour SNP basés sur SNP. Autrement dit, nous n’aurons besoin que de données sur un seul SNP à la fois. J'ai dû apprendre à récupérer tous les enregistrements associés à l'un des 2,5 millions de SNP aussi facilement, rapidement et à moindre coût que possible.

Comment ne pas faire ça

Pour citer un cliché approprié :

Je n'ai pas échoué mille fois, j'ai juste découvert mille façons d'éviter d'analyser un tas de données dans un format convivial pour les requêtes.

Première tentative

Qu'ai-je appris: Il n'existe pas de moyen bon marché d'analyser 25 To à la fois.

Après avoir suivi le cours « Méthodes avancées pour le traitement du Big Data » à l'Université Vanderbilt, j'étais sûr que l'astuce était dans le sac. Il faudra probablement une heure ou deux pour configurer le serveur Hive afin qu'il parcoure toutes les données et rapporte le résultat. Puisque nos données sont stockées dans AWS S3, j'ai utilisé le service Athena, qui vous permet d'appliquer des requêtes Hive SQL aux données S3. Vous n'avez pas besoin de configurer/développer un cluster Hive et vous ne payez également que pour les données que vous recherchez.

Après avoir montré à Athena mes données et leur format, j'ai effectué quelques tests avec des requêtes comme celle-ci :

select * from intensityData limit 10;

Et j’ai rapidement obtenu des résultats bien structurés. Prêt.

Jusqu'à ce que nous essayions d'utiliser les données dans notre travail...

On m'a demandé d'extraire toutes les informations SNP pour tester le modèle. J'ai exécuté la requête :


select * from intensityData 
where snp = 'rs123456';

...et j'ai commencé à attendre. Après huit minutes et plus de 4 To de données demandées, j'ai reçu le résultat. Athena facture en fonction du volume de données trouvées, soit 5 $ par téraoctet. Cette seule demande coûte donc 20 $ et huit minutes d’attente. Pour exécuter le modèle sur toutes les données, nous avons dû attendre 38 ans et payer 50 millions de dollars, ce qui ne nous convenait évidemment pas.

Il fallait utiliser du Parquet...

Qu'ai-je appris: Soyez attentif à la taille de vos dossiers Parquet et à leur organisation.

J'ai d'abord essayé de résoudre le problème en convertissant tous les TSV en Limes pour parquet. Ils sont pratiques pour travailler avec de grands ensembles de données car les informations qu'ils contiennent sont stockées sous forme de colonnes : chaque colonne se trouve dans son propre segment de mémoire/disque, contrairement aux fichiers texte, dans lesquels les lignes contiennent des éléments de chaque colonne. Et si vous avez besoin de trouver quelque chose, lisez simplement la colonne requise. De plus, chaque fichier stocke une plage de valeurs dans une colonne, donc si la valeur que vous recherchez ne se trouve pas dans la plage de la colonne, Spark ne perdra pas de temps à analyser l'intégralité du fichier.

J'ai exécuté une tâche simple Colle AWS pour convertir nos TSV en Parquet et déposé les nouveaux fichiers dans Athena. Cela a pris environ 5 heures. Mais lorsque j’ai exécuté la demande, cela a pris à peu près le même temps et un peu moins d’argent. Le fait est que Spark, essayant d'optimiser la tâche, a simplement décompressé un morceau TSV et l'a placé dans son propre morceau Parquet. Et comme chaque morceau était suffisamment grand pour contenir l'intégralité des enregistrements de nombreuses personnes, chaque fichier contenait tous les SNP, donc Spark a dû ouvrir tous les fichiers pour extraire les informations dont il avait besoin.

Il est intéressant de noter que le type de compression par défaut (et recommandé) de Parquet, Snappy, n'est pas divisible. Par conséquent, chaque exécuteur testamentaire était bloqué sur la tâche de décompresser et de télécharger l’ensemble de données complet de 3,5 Go.

Analyse de 25 To à l'aide d'AWK et R

Comprenons le problème

Qu'ai-je appris: Le tri est difficile, surtout si les données sont distribuées.

Il me semblait que je comprenais désormais l'essence du problème. J'avais seulement besoin de trier les données par colonne SNP, pas par personnes. Ensuite, plusieurs SNP seront stockés dans un bloc de données séparé, puis la fonction « intelligente » de Parquet « ouverte uniquement si la valeur est dans la plage » se montrera dans toute sa splendeur. Malheureusement, trier des milliards de lignes dispersées dans un cluster s’est avéré une tâche difficile.

AWS ne souhaite certainement pas émettre de remboursement pour la raison « Je suis un étudiant distrait ». Après avoir exécuté le tri sur Amazon Glue, celui-ci a fonctionné pendant 2 jours et s'est écrasé.

Et le partitionnement ?

Qu'ai-je appris: Les partitions dans Spark doivent être équilibrées.

Puis j'ai eu l'idée de partitionner les données en chromosomes. Il y en a 23 (et plusieurs autres si l’on prend en compte l’ADN mitochondrial et les régions non cartographiées).
Cela vous permettra de diviser les données en morceaux plus petits. Si vous ajoutez une seule ligne à la fonction d'exportation Spark dans le script Glue partition_by = "chr", les données doivent alors être divisées en compartiments.

Analyse de 25 To à l'aide d'AWK et R
Le génome est constitué de nombreux fragments appelés chromosomes.

Malheureusement, cela n'a pas fonctionné. Les chromosomes ont des tailles différentes, ce qui signifie différentes quantités d'informations. Cela signifie que les tâches envoyées par Spark aux travailleurs n'étaient pas équilibrées et se terminaient lentement car certains nœuds se terminaient plus tôt et étaient inactifs. Cependant, les tâches ont été accomplies. Mais lorsqu’on a demandé un SNP, le déséquilibre a de nouveau posé des problèmes. Le coût du traitement des SNP sur des chromosomes plus gros (c'est-à-dire là où nous voulons obtenir des données) n'a diminué que d'un facteur 10 environ. Beaucoup, mais pas assez.

Et si nous le divisions en parties encore plus petites ?

Qu'ai-je appris: N'essayez jamais de créer 2,5 millions de partitions.

J'ai décidé de tout mettre en œuvre et de partitionner chaque SNP. Cela garantissait que les partitions étaient de taille égale. C'ÉTAIT UNE MAUVAISE IDÉE. J'ai utilisé Glue et ajouté une ligne innocente partition_by = 'snp'. La tâche a démarré et a commencé à s'exécuter. Un jour plus tard, j'ai vérifié et j'ai vu qu'il n'y avait toujours rien d'écrit dans S3, j'ai donc arrêté la tâche. Il semble que Glue écrivait des fichiers intermédiaires dans un emplacement caché dans S3, de nombreux fichiers, peut-être quelques millions. En conséquence, mon erreur a coûté plus de mille dollars et n'a pas plu à mon mentor.

Partitionnement + tri

Qu'ai-je appris: Le tri est encore difficile, tout comme le réglage de Spark.

Ma dernière tentative de partitionnement m'a impliqué de partitionner les chromosomes, puis de trier chaque partition. En théorie, cela accélérerait chaque requête car les données SNP souhaitées devaient se trouver dans quelques morceaux Parquet dans une plage donnée. Malheureusement, trier même des données partitionnées s’est avéré être une tâche difficile. En conséquence, je suis passé à EMR pour un cluster personnalisé et j'ai utilisé huit instances puissantes (C5.4xl) et Sparklyr pour créer un flux de travail plus flexible...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...cependant, la tâche n'était toujours pas terminée. Je l'ai configuré de différentes manières : augmenté l'allocation de mémoire pour chaque exécuteur de requêtes, utilisé des nœuds avec une grande quantité de mémoire, utilisé des variables de diffusion (variables de diffusion), mais à chaque fois celles-ci se sont avérées être des demi-mesures, et progressivement les exécuteurs ont commencé échouer jusqu'à ce que tout s'arrête.

Je deviens plus créatif

Qu'ai-je appris: Parfois, des données spéciales nécessitent des solutions spéciales.

Chaque SNP a une valeur de position. C'est un nombre correspondant au nombre de bases le long de son chromosome. C’est une façon agréable et naturelle d’organiser nos données. Au début, je voulais diviser par régions de chaque chromosome. Par exemple, positions 1 à 2000, 2001 à 4000, etc. Mais le problème est que les SNP ne sont pas répartis uniformément sur les chromosomes, de sorte que la taille des groupes varie considérablement.

Analyse de 25 To à l'aide d'AWK et R

En conséquence, je suis arrivé à une répartition des postes en catégories (rang). En utilisant les données déjà téléchargées, j'ai lancé une requête pour obtenir une liste de SNP uniques, leurs positions et leurs chromosomes. Ensuite, j'ai trié les données au sein de chaque chromosome et collecté les SNP en groupes (bin) d'une taille donnée. Disons 1000 XNUMX SNP chacun. Cela m'a donné la relation SNP-groupe-par-chromosome.

Au final, j'ai fait des groupes (bin) de 75 SNP, la raison sera expliquée ci-dessous.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Essayez d'abord avec Spark

Qu'ai-je appris: L'agrégation Spark est rapide, mais le partitionnement reste coûteux.

Je voulais lire ce petit bloc de données (2,5 millions de lignes) dans Spark, le combiner avec les données brutes, puis le partitionner en fonction de la colonne nouvellement ajoutée. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

J'ai utilisé sdf_broadcast(), donc Spark sait qu'il doit envoyer la trame de données à tous les nœuds. Ceci est utile si les données sont de petite taille et nécessaires à toutes les tâches. Sinon, Spark essaie d'être intelligent et distribue les données selon les besoins, ce qui peut entraîner des ralentissements.

Et encore une fois, mon idée n'a pas fonctionné : les tâches ont fonctionné pendant un certain temps, ont complété l'union, puis, comme les exécuteurs testamentaires lancés par le partitionnement, elles ont commencé à échouer.

Ajout d'AWK

Qu'ai-je appris: Ne dormez pas quand on vous enseigne les bases. Quelqu’un a sûrement déjà résolu votre problème dans les années 1980.

Jusqu'à présent, la raison de tous mes échecs avec Spark était le fouillis de données dans le cluster. Peut-être que la situation peut être améliorée grâce à un prétraitement. J'ai décidé d'essayer de diviser les données textuelles brutes en colonnes de chromosomes, j'espérais donc fournir à Spark des données « pré-partitionnées ».

J'ai cherché sur StackOverflow comment diviser par valeurs de colonne et j'ai trouvé une si bonne réponse. Avec AWK, vous pouvez diviser un fichier texte par valeurs de colonne en l'écrivant dans un script plutôt qu'en envoyant les résultats à stdout.

J'ai écrit un script Bash pour l'essayer. J'ai téléchargé l'un des TSV packagés, puis je l'ai décompressé à l'aide de gzip et envoyé à awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ça a marché!

Remplissage des noyaux

Qu'ai-je appris: gnu parallel - c'est une chose magique, tout le monde devrait l'utiliser.

La séparation a été assez lente et quand j'ai commencé htoppour vérifier l'utilisation d'une instance EC2 puissante (et coûteuse), il s'est avéré que je n'utilisais qu'un seul cœur et environ 200 Mo de mémoire. Pour résoudre le problème et ne pas perdre beaucoup d’argent, nous avons dû trouver un moyen de paralléliser le travail. Heureusement, dans un livre absolument étonnant Science des données en ligne de commande J'ai trouvé un chapitre de Jeron Janssens sur la parallélisation. De là, j'ai appris gnu parallel, une méthode très flexible pour implémenter le multithreading sous Unix.

Analyse de 25 To à l'aide d'AWK et R
Lorsque j'ai commencé le partitionnement à l'aide du nouveau processus, tout allait bien, mais il y avait toujours un goulot d'étranglement : le téléchargement des objets S3 sur le disque n'était pas très rapide et n'était pas entièrement parallélisé. Pour résoudre ce problème, j'ai fait ceci :

  1. J'ai découvert qu'il est possible d'implémenter l'étape de téléchargement S3 directement dans le pipeline, éliminant ainsi complètement le stockage intermédiaire sur disque. Cela signifie que je peux éviter d'écrire des données brutes sur le disque et utiliser un stockage encore plus petit, et donc moins cher, sur AWS.
  2. Équipe aws configure set default.s3.max_concurrent_requests 50 a considérablement augmenté le nombre de threads utilisés par AWS CLI (par défaut, il y en a 10).
  3. Je suis passé à une instance EC2 optimisée pour la vitesse du réseau, avec la lettre n dans le nom. J'ai constaté que la perte de puissance de traitement lors de l'utilisation de n-instances est plus que compensée par l'augmentation de la vitesse de chargement. Pour la plupart des tâches, j'ai utilisé c5n.4xl.
  4. Modifié gzip sur pigz, il s'agit d'un outil gzip qui peut faire des choses intéressantes pour paralléliser la tâche initialement non parallélisée de décompression de fichiers (c'est ce qui a le moins aidé).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ces étapes sont combinées entre elles pour que tout fonctionne très rapidement. En augmentant les vitesses de téléchargement et en éliminant les écritures sur disque, je pouvais désormais traiter un package de 5 téraoctets en quelques heures seulement.

Ce tweet aurait dû mentionner « TSV ». Hélas.

Utiliser des données nouvellement analysées

Qu'ai-je appris: Spark aime les données non compressées et n'aime pas combiner des partitions.

Désormais, les données étaient dans S3 dans un format décompressé (lire : partagé) et semi-ordonné, et je pouvais à nouveau revenir à Spark. Une surprise m'attendait : je n'ai encore pas réussi à réaliser ce que je voulais ! Il était très difficile de dire à Spark exactement comment les données étaient partitionnées. Et même quand j'ai fait cela, il s'est avéré qu'il y avait trop de partitions (95 XNUMX), et quand j'ai utilisé coalesce réduit leur nombre à des limites raisonnables, cela détruisit mon cloisonnement. Je suis sûr que cela peut être résolu, mais après quelques jours de recherche, je n'ai pas trouvé de solution. J'ai finalement terminé toutes les tâches dans Spark, même si cela a pris un certain temps et que mes fichiers Parquet divisés n'étaient pas très petits (~ 200 Ko). Cependant, les données étaient là où elles étaient nécessaires.

Analyse de 25 To à l'aide d'AWK et R
Trop petit et inégal, merveilleux !

Tester les requêtes Spark locales

Qu'ai-je appris: Spark a trop de temps système pour résoudre des problèmes simples.

En téléchargeant les données dans un format astucieux, j'ai pu tester la vitesse. Configurez un script R pour exécuter un serveur Spark local, puis chargez une trame de données Spark à partir du stockage de groupe Parquet spécifié (bac). J'ai essayé de charger toutes les données mais je n'ai pas réussi à faire reconnaître le partitionnement par Sparklyr.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

L'exécution a duré 29,415 secondes. Beaucoup mieux, mais pas trop bon pour tester en masse quoi que ce soit. De plus, je ne pouvais pas accélérer les choses avec la mise en cache, car lorsque j'essayais de mettre en cache une trame de données en mémoire, Spark plantait toujours, même lorsque j'allouais plus de 50 Go de mémoire à un ensemble de données pesant moins de 15.

Retour à AWK

Qu'ai-je appris: Les tableaux associatifs dans AWK sont très efficaces.

J'ai réalisé que je pouvais atteindre des vitesses plus élevées. Je m'en suis souvenu d'une manière merveilleuse Tutoriel AWK par Bruce Barnett J'ai entendu parler d'une fonctionnalité intéressante appelée "tableaux associatifs" Essentiellement, ce sont des paires clé-valeur qui, pour une raison quelconque, étaient appelées différemment dans AWK, et donc je n'y ai pas beaucoup réfléchi. Romain Cheplyaka a rappelé que le terme « tableaux associatifs » est beaucoup plus ancien que le terme « paire clé-valeur ». Même si vous recherchez la valeur-clé dans Google Ngram, vous n’y verrez pas ce terme, mais vous trouverez des tableaux associatifs ! De plus, la « paire clé-valeur » est le plus souvent associée aux bases de données, il est donc beaucoup plus logique de la comparer avec une hashmap. J'ai réalisé que je pouvais utiliser ces tableaux associatifs pour associer mes SNP à une table bin et à des données brutes sans utiliser Spark.

Pour ce faire, dans le script AWK j'ai utilisé le bloc BEGIN. Il s'agit d'un morceau de code exécuté avant que la première ligne de données ne soit transmise au corps principal du script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Équipe while(getline...) chargé toutes les lignes du groupe CSV (bin), définit la première colonne (nom SNP) comme clé du tableau associatif bin et la deuxième valeur (groupe) comme valeur. Puis dans le bloc { }, qui est exécuté sur toutes les lignes du fichier principal, chaque ligne est envoyée au fichier de sortie, qui reçoit un nom unique en fonction de son groupe (bin) : ..._bin_"bin[$1]"_....

Variables batch_num и chunk_id correspondait aux données fournies par le pipeline, évitant une condition de concurrence critique, et chaque thread d'exécution en cours d'exécution parallel, a écrit dans son propre fichier unique.

Depuis que j'ai dispersé toutes les données brutes dans des dossiers sur les chromosomes laissés par ma précédente expérience avec AWK, je peux désormais écrire un autre script Bash pour traiter un chromosome à la fois et envoyer des données partitionnées plus profondément à S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Le script comporte deux sections parallel.

Dans la première section, les données sont lues à partir de tous les fichiers contenant des informations sur le chromosome souhaité, puis ces données sont distribuées entre les threads, qui distribuent les fichiers dans les groupes appropriés (bac). Pour éviter les conditions de concurrence lorsque plusieurs threads écrivent dans le même fichier, AWK transmet les noms de fichiers pour écrire les données à différents endroits, par exemple. chr_10_bin_52_batch_2_aa.csv. En conséquence, de nombreux petits fichiers sont créés sur le disque (pour cela, j'ai utilisé des volumes EBS de téraoctets).

Convoyeur de la deuxième section parallel parcourt les groupes (bin) et combine leurs fichiers individuels dans un fichier CSV commun catpuis les envoie à l'exportation.

Diffusion en R ?

Qu'ai-je appris: Vous pouvez contacter stdin и stdout à partir d'un script R, et donc l'utiliser dans le pipeline.

Vous avez peut-être remarqué cette ligne dans votre script Bash : ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Il traduit tous les fichiers de groupe concaténés (bin) dans le script R ci-dessous. {} est une technique spéciale parallel, qui insère toutes les données qu'il envoie au flux spécifié directement dans la commande elle-même. Option {#} fournit un ID de thread unique, et {%} représente le numéro du créneau du travail (répété, mais jamais simultanément). Une liste de toutes les options peut être trouvée dans Documentation.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Lorsqu'une variable file("stdin") transmis à readr::read_csv, les données traduites dans le script R sont chargées dans une frame, qui se présente alors sous la forme .rds-fichier utilisant aws.s3 écrit directement sur S3.

RDS est quelque chose comme une version junior de Parquet, sans les fioritures du stockage des enceintes.

Après avoir terminé le script Bash, j'ai reçu un bundle .rds-fichiers situés dans S3, ce qui m'a permis d'utiliser une compression efficace et des types intégrés.

Malgré l'utilisation du frein R, tout a fonctionné très rapidement. Sans surprise, les parties de R qui lisent et écrivent des données sont hautement optimisées. Après des tests sur un chromosome de taille moyenne, le travail s'est terminé sur une instance C5n.4xl en deux heures environ.

Limites S3

Qu'ai-je appris: Grâce à l'implémentation de Smart Path, S3 peut gérer de nombreux fichiers.

Je m'inquiétais de savoir si S3 serait capable de gérer les nombreux fichiers qui y étaient transférés. Je pourrais donner un sens aux noms de fichiers, mais comment S3 les rechercherait-il ?

Analyse de 25 To à l'aide d'AWK et R
Les dossiers dans S3 sont juste pour le spectacle, en fait le système ne s'intéresse pas au symbole /. À partir de la page FAQ S3.

Il semble que S3 représente le chemin d'accès à un fichier particulier sous la forme d'une simple clé dans une sorte de table de hachage ou de base de données basée sur des documents. Un compartiment peut être considéré comme une table et les fichiers peuvent être considérés comme des enregistrements dans cette table.

Étant donné que la vitesse et l'efficacité sont importantes pour réaliser des bénéfices chez Amazon, il n'est pas surprenant que ce système de clé en tant que chemin de fichier soit optimisé. J'ai essayé de trouver un équilibre : pour ne pas avoir à faire beaucoup de requêtes get, mais pour que les requêtes soient exécutées rapidement. Il s'est avéré qu'il est préférable de créer environ 20 XNUMX fichiers bin. Je pense que si nous continuons à optimiser, nous pouvons atteindre une augmentation de la vitesse (par exemple, créer un compartiment spécial uniquement pour les données, réduisant ainsi la taille de la table de recherche). Mais il n’y avait ni temps ni argent pour d’autres expériences.

Qu’en est-il de la compatibilité croisée ?

Ce que j'ai appris : La première cause de perte de temps est l'optimisation prématurée de votre méthode de stockage.

À ce stade, il est très important de se demander : « Pourquoi utiliser un format de fichier propriétaire ? La raison réside dans la vitesse de chargement (les fichiers CSV gzippés prenaient 7 fois plus de temps à charger) et la compatibilité avec nos workflows. Je peux reconsidérer si R peut facilement charger des fichiers Parquet (ou Arrow) sans la charge Spark. Tout le monde dans notre laboratoire utilise R, et si je dois convertir les données dans un autre format, j'ai toujours les données texte d'origine, je peux donc simplement réexécuter le pipeline.

Division du travail

Qu'ai-je appris: N'essayez pas d'optimiser les tâches manuellement, laissez l'ordinateur le faire.

J'ai débogué le flux de travail sur un chromosome, je dois maintenant traiter toutes les autres données.
Je voulais créer plusieurs instances EC2 pour la conversion, mais en même temps, j'avais peur d'avoir une charge très déséquilibrée entre différentes tâches de traitement (tout comme Spark souffrait de partitions déséquilibrées). De plus, je n'étais pas intéressé à créer une instance par chromosome, car pour les comptes AWS, il existe une limite par défaut de 10 instances.

J'ai ensuite décidé d'écrire un script en R pour optimiser les tâches de traitement.

Tout d’abord, j’ai demandé à S3 de calculer la quantité d’espace de stockage occupée par chaque chromosome.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Ensuite j'ai écrit une fonction qui prend la taille totale, mélange l'ordre des chromosomes, les divise en groupes num_jobs et vous indique à quel point les tailles de tous les travaux de traitement sont différentes.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Ensuite, j'ai effectué mille mélanges en utilisant Purrr et j'ai choisi le meilleur.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Je me suis donc retrouvé avec un ensemble de tâches de taille très similaire. Ensuite, il ne restait plus qu'à envelopper mon précédent script Bash dans une grande boucle for. Cette optimisation a pris environ 10 minutes à écrire. Et c'est bien moins que ce que je dépenserais pour créer manuellement des tâches si elles étaient déséquilibrées. Par conséquent, je pense que j'avais raison avec cette optimisation préliminaire.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

A la fin j'ajoute la commande shutdown :

sudo shutdown -h now

... et tout s'est bien passé ! À l'aide de l'AWS CLI, j'ai généré des instances en utilisant l'option user_data leur a donné des scripts Bash de leurs tâches à traiter. Ils fonctionnaient et s'arrêtaient automatiquement, je ne payais donc pas pour une puissance de traitement supplémentaire.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Faisons nos valises !

Qu'ai-je appris: L'API doit être simple dans un souci de facilité et de flexibilité d'utilisation.

Finalement, j'ai obtenu les données au bon endroit et sous la bonne forme. Il ne restait plus qu’à simplifier au maximum le processus d’utilisation des données pour faciliter la tâche de mes collègues. Je voulais créer une API simple pour créer des requêtes. Si à l'avenir je décide de passer de .rds aux dossiers Parquet, cela devrait être un problème pour moi, pas pour mes collègues. Pour cela, j'ai décidé de créer un package R interne.

Construire et documenter un package très simple contenant seulement quelques fonctions d'accès aux données organisées autour d'une fonction get_snp. J'ai aussi créé un site internet pour mes collègues pkgdown, afin qu'ils puissent facilement voir des exemples et de la documentation.

Analyse de 25 To à l'aide d'AWK et R

Mise en cache intelligente

Qu'ai-je appris: Si vos données sont bien préparées, la mise en cache sera facile !

Étant donné que l'un des principaux flux de travail appliquait le même modèle d'analyse au package SNP, j'ai décidé d'utiliser le regroupement à mon avantage. Lors de la transmission de données via SNP, toutes les informations du groupe (bin) sont attachées à l'objet renvoyé. Autrement dit, les anciennes requêtes peuvent (en théorie) accélérer le traitement des nouvelles requêtes.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Lors de la création du package, j'ai exécuté de nombreux tests de performance pour comparer la vitesse lors de l'utilisation de différentes méthodes. Je recommande de ne pas négliger cela, car parfois les résultats sont inattendus. Par exemple, dplyr::filter était beaucoup plus rapide que la capture de lignes à l'aide d'un filtrage basé sur l'indexation, et la récupération d'une seule colonne à partir d'un bloc de données filtré était beaucoup plus rapide que l'utilisation de la syntaxe d'indexation.

Veuillez noter que l'objet prev_snp_results contient la clé snps_in_bin. Il s'agit d'un tableau de tous les SNP uniques dans un groupe (bin), vous permettant de vérifier rapidement si vous disposez déjà des données d'une requête précédente. Cela permet également de parcourir facilement tous les SNP d'un groupe (bin) avec ce code :

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

résultats

Nous pouvons désormais (et avons commencé à le faire sérieusement) exécuter des modèles et des scénarios qui nous étaient auparavant inaccessibles. Le mieux, c'est que mes collègues de laboratoire n'ont pas à penser à des complications. Ils ont juste une fonction qui fonctionne.

Et bien que le package leur épargne les détails, j'ai essayé de rendre le format des données suffisamment simple pour qu'ils puissent le comprendre si je disparaissais soudainement demain...

La vitesse a sensiblement augmenté. Nous analysons généralement des fragments de génome fonctionnellement significatifs. Auparavant, nous ne pouvions pas le faire (cela s'est avéré trop coûteux), mais maintenant, grâce à la structure de groupe (bin) et à la mise en cache, une demande pour un SNP prend en moyenne moins de 0,1 seconde, et l'utilisation des données est telle bas que les coûts pour S3 sont des cacahuètes.

Conclusion

Cet article n'est pas du tout un guide. La solution s’est avérée individuelle et certainement pas optimale. Il s'agit plutôt d'un récit de voyage. Je veux que les autres comprennent que de telles décisions ne semblent pas entièrement formées dans la tête, elles sont le résultat d’essais et d’erreurs. Aussi, si vous recherchez un data scientist, gardez à l’esprit que l’utilisation efficace de ces outils nécessite de l’expérience, et que l’expérience coûte de l’argent. Je suis heureux d'avoir eu les moyens de payer, mais beaucoup d'autres qui peuvent faire le même travail mieux que moi n'auront jamais l'opportunité, faute d'argent, d'essayer.

Les outils Big Data sont polyvalents. Si vous avez le temps, vous pouvez certainement écrire une solution plus rapide en utilisant des techniques intelligentes de nettoyage, de stockage et d’extraction des données. En fin de compte, cela se résume à une analyse coûts-avantages.

Ce que j'ai appris:

  • il n’existe pas de moyen bon marché d’analyser 25 To à la fois ;
  • soyez attentif à la taille de vos dossiers Parquet et à leur organisation ;
  • Les partitions dans Spark doivent être équilibrées ;
  • En général, n'essayez jamais de créer 2,5 millions de partitions ;
  • Le tri est encore difficile, tout comme la configuration de Spark ;
  • parfois, des données spéciales nécessitent des solutions spéciales ;
  • L'agrégation Spark est rapide, mais le partitionnement reste coûteux ;
  • ne dormez pas quand ils vous enseignent les bases, quelqu'un a probablement déjà résolu votre problème dans les années 1980 ;
  • gnu parallel - c'est une chose magique, tout le monde devrait l'utiliser ;
  • Spark aime les données non compressées et n'aime pas combiner des partitions ;
  • Spark a trop de temps système pour résoudre des problèmes simples ;
  • Les tableaux associatifs d'AWK sont très efficaces ;
  • vous pouvez contacter stdin и stdout à partir d'un script R, et donc l'utiliser dans le pipeline ;
  • Grâce à l'implémentation de Smart Path, S3 peut traiter de nombreux fichiers ;
  • La principale raison de perdre du temps est l’optimisation prématurée de votre méthode de stockage ;
  • n'essayez pas d'optimiser les tâches manuellement, laissez l'ordinateur le faire ;
  • L'API doit être simple dans un souci de facilité et de flexibilité d'utilisation ;
  • Si vos données sont bien préparées, la mise en cache sera facile !

Source: habr.com

Ajouter un commentaire