Parsen von 25 TB mit AWK und R

Parsen von 25 TB mit AWK und R
So lesen Sie diesen Artikel: Es tut mir leid, dass der Text so lang und chaotisch ist. Um Ihnen Zeit zu sparen, beginne ich jedes Kapitel mit einer Einleitung „Was ich gelernt habe“, die das Wesentliche des Kapitels in ein oder zwei Sätzen zusammenfasst.

„Zeigen Sie mir einfach die Lösung!“ Wenn Sie nur sehen möchten, woher ich komme, dann fahren Sie mit dem Kapitel „Erfinderischer werden“ fort, aber ich denke, es ist interessanter und nützlicher, etwas über Scheitern zu lesen.

Ich wurde kürzlich damit beauftragt, einen Prozess zur Verarbeitung einer großen Menge roher DNA-Sequenzen (technisch gesehen ein SNP-Chip) einzurichten. Der Bedarf bestand darin, schnell Daten über einen bestimmten genetischen Ort (SNP genannt) für nachfolgende Modellierungs- und andere Aufgaben zu erhalten. Mit R und AWK konnte ich Daten auf natürliche Weise bereinigen und organisieren und so die Abfrageverarbeitung erheblich beschleunigen. Das war für mich nicht einfach und erforderte zahlreiche Iterationen. Dieser Artikel wird Ihnen helfen, einige meiner Fehler zu vermeiden und Ihnen zeigen, was dabei herausgekommen ist.

Zunächst einige einleitende Erläuterungen.

Daten

Unser genetisches Informationsverarbeitungszentrum der Universität stellte uns Daten in Form eines 25-TB-TSV zur Verfügung. Ich habe sie in 5 mit Gzip komprimierte Pakete aufgeteilt erhalten, die jeweils etwa 240 2,5-Gigabyte-Dateien enthielten. Jede Zeile enthielt Daten für einen SNP von einer Person. Insgesamt wurden Daten zu ca. 60 Millionen SNPs und ca. 30 Personen übermittelt. Zusätzlich zu den SNP-Informationen enthielten die Dateien zahlreiche Spalten mit Zahlen, die verschiedene Merkmale widerspiegelten, wie z. B. Leseintensität, Häufigkeit verschiedener Allele usw. Insgesamt gab es etwa XNUMX Spalten mit eindeutigen Werten.

Ziel

Wie bei jedem Datenmanagementprojekt bestand das Wichtigste darin, festzulegen, wie die Daten verwendet werden sollten. In diesem Fall Wir werden hauptsächlich Modelle und Workflows für SNP basierend auf SNP auswählen. Das heißt, wir benötigen jeweils nur Daten zu einem SNP. Ich musste lernen, alle mit einem der 2,5 Millionen SNPs verbundenen Datensätze so einfach, schnell und kostengünstig wie möglich abzurufen.

Wie man das nicht macht

Um ein passendes Klischee zu zitieren:

Ich bin nicht tausendmal gescheitert, ich habe nur tausend Möglichkeiten entdeckt, um das Parsen einer Menge Daten in einem abfragefreundlichen Format zu vermeiden.

Versuchen Sie es zuerst

Was habe ich gelernt: Es gibt keine kostengünstige Möglichkeit, 25 TB gleichzeitig zu analysieren.

Nachdem ich an der Vanderbilt University den Kurs „Advanced Methods for Big Data Processing“ belegt hatte, war ich mir sicher, dass der Trick drin war. Es wird wahrscheinlich ein oder zwei Stunden dauern, den Hive-Server so einzurichten, dass er alle Daten durchläuft und das Ergebnis meldet. Da unsere Daten in AWS S3 gespeichert sind, habe ich den Dienst genutzt Athena, mit dem Sie Hive SQL-Abfragen auf S3-Daten anwenden können. Sie müssen keinen Hive-Cluster einrichten/erhöhen und zahlen auch nur für die Daten, die Sie suchen.

Nachdem ich Athena meine Daten und ihr Format gezeigt hatte, führte ich einige Tests mit Abfragen wie dieser durch:

select * from intensityData limit 10;

Und erhielt schnell gut strukturierte Ergebnisse. Bereit.

Bis wir versuchten, die Daten in unserer Arbeit zu nutzen ...

Ich wurde gebeten, alle SNP-Informationen herauszuholen, um das Modell zu testen. Ich habe die Abfrage ausgeführt:


select * from intensityData 
where snp = 'rs123456';

...und begann zu warten. Nach acht Minuten und mehr als 4 TB angeforderter Daten erhielt ich das Ergebnis. Athena berechnet je nach gefundenem Datenvolumen 5 US-Dollar pro Terabyte. Diese einzelne Anfrage kostete also 20 $ und acht Minuten Wartezeit. Um das Modell mit allen Daten laufen zu lassen, mussten wir 38 Jahre warten und 50 Millionen US-Dollar zahlen. Offensichtlich war das für uns nicht geeignet.

Es war notwendig, Parkett zu verwenden...

Was habe ich gelernt: Seien Sie vorsichtig mit der Größe Ihrer Parquet-Dateien und ihrer Organisation.

Ich habe zunächst versucht, die Situation zu beheben, indem ich alle TSVs umgestellt habe Parkettfeilen. Sie eignen sich gut für die Arbeit mit großen Datensätzen, da die darin enthaltenen Informationen in Spaltenform gespeichert sind: Jede Spalte liegt in einem eigenen Speicher-/Festplattensegment, im Gegensatz zu Textdateien, in denen Zeilen Elemente jeder Spalte enthalten. Und wenn Sie etwas finden müssen, dann lesen Sie einfach die gewünschte Spalte. Darüber hinaus speichert jede Datei einen Wertebereich in einer Spalte. Wenn der gesuchte Wert also nicht im Bereich der Spalte liegt, verschwendet Spark keine Zeit mit dem Scannen der gesamten Datei.

Ich habe eine einfache Aufgabe ausgeführt AWS-Kleber um unsere TSVs in Parquet zu konvertieren und die neuen Dateien in Athena abzulegen. Es dauerte etwa 5 Stunden. Aber als ich die Anfrage ausführte, dauerte es ungefähr genauso lange und etwas weniger Geld, bis sie abgeschlossen war. Tatsache ist, dass Spark bei dem Versuch, die Aufgabe zu optimieren, einfach einen TSV-Block entpackt und ihn in seinen eigenen Parquet-Block eingefügt hat. Und da jeder Block groß genug war, um die gesamten Datensätze vieler Personen zu enthalten, enthielt jede Datei alle SNPs, sodass Spark alle Dateien öffnen musste, um die benötigten Informationen zu extrahieren.

Interessanterweise ist der standardmäßige (und empfohlene) Komprimierungstyp von Parquet, Snappy, nicht teilbar. Daher war jeder Ausführende mit der Aufgabe beschäftigt, den vollständigen 3,5-GB-Datensatz zu entpacken und herunterzuladen.

Parsen von 25 TB mit AWK und R

Lassen Sie uns das Problem verstehen

Was habe ich gelernt: Das Sortieren ist schwierig, insbesondere wenn die Daten verteilt sind.

Es schien mir, dass ich jetzt den Kern des Problems verstanden hatte. Ich musste die Daten nur nach SNP-Spalte sortieren, nicht nach Personen. Dann werden mehrere SNPs in einem separaten Datenblock gespeichert, und dann zeigt sich die „intelligente“ Funktion von Parquet „nur öffnen, wenn der Wert im Bereich liegt“ in ihrer ganzen Pracht. Leider erwies es sich als schwierige Aufgabe, Milliarden von Zeilen zu durchsuchen, die über einen Cluster verstreut waren.

AWS möchte aufgrund des Grundes „Ich bin ein abgelenkter Student“ definitiv keine Rückerstattung gewähren. Nachdem ich die Sortierung auf Amazon Glue ausgeführt hatte, lief es zwei Tage lang und stürzte ab.

Was ist mit der Partitionierung?

Was habe ich gelernt: Partitionen in Spark müssen ausgeglichen sein.

Dann kam mir die Idee, Daten in Chromosomen aufzuteilen. Es gibt 23 davon (und noch einige mehr, wenn man mitochondriale DNA und nicht kartierte Regionen berücksichtigt).
Dadurch können Sie die Daten in kleinere Teile aufteilen. Wenn Sie der Spark-Exportfunktion im Glue-Skript nur eine Zeile hinzufügen partition_by = "chr", dann sollten die Daten in Buckets unterteilt werden.

Parsen von 25 TB mit AWK und R
Das Genom besteht aus zahlreichen Fragmenten, den sogenannten Chromosomen.

Leider hat es nicht funktioniert. Chromosomen haben unterschiedliche Größen, was unterschiedliche Mengen an Informationen bedeutet. Das bedeutet, dass die Aufgaben, die Spark an die Worker schickte, nicht ausgeglichen waren und langsam abgeschlossen wurden, da einige Knoten früher fertig wurden und sich im Leerlauf befanden. Die Aufgaben wurden jedoch erledigt. Aber als man nach einem SNP fragte, verursachte das Ungleichgewicht erneut Probleme. Die Kosten für die Verarbeitung von SNPs auf größeren Chromosomen (d. h. dort, wo wir Daten erhalten möchten) sind nur um etwa den Faktor 10 gesunken. Viel, aber nicht genug.

Was wäre, wenn wir es in noch kleinere Teile aufteilen?

Was habe ich gelernt: Versuchen Sie niemals, 2,5 Millionen Partitionen zu erstellen.

Ich beschloss, alles zu geben und jeden SNP zu partitionieren. Dadurch wurde sichergestellt, dass die Partitionen gleich groß waren. Es war eine schlechte Idee. Ich habe Kleber verwendet und eine unschuldige Linie hinzugefügt partition_by = 'snp'. Die Aufgabe wurde gestartet und ausgeführt. Einen Tag später habe ich nachgesehen und festgestellt, dass immer noch nichts in S3 geschrieben wurde, also habe ich die Aufgabe abgebrochen. Es sieht so aus, als würde Glue Zwischendateien an einen versteckten Speicherort in S3 schreiben, viele Dateien, vielleicht ein paar Millionen. Infolgedessen kostete mein Fehler mehr als tausend Dollar und gefiel meinem Mentor nicht.

Partitionieren + Sortieren

Was habe ich gelernt: Das Sortieren ist immer noch schwierig, ebenso wie das Tuning von Spark.

Bei meinem letzten Partitionierungsversuch habe ich die Chromosomen partitioniert und dann jede Partition sortiert. Theoretisch würde dies jede Abfrage beschleunigen, da die gewünschten SNP-Daten innerhalb weniger Parquet-Blöcke innerhalb eines bestimmten Bereichs liegen müssten. Leider erwies sich das Sortieren selbst partitionierter Daten als schwierige Aufgabe. Infolgedessen wechselte ich für einen benutzerdefinierten Cluster zu EMR und nutzte acht leistungsstarke Instanzen (C5.4xl) und Sparklyr, um einen flexibleren Workflow zu erstellen ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...allerdings war die Aufgabe immer noch nicht abgeschlossen. Ich habe es auf unterschiedliche Weise konfiguriert: Ich habe die Speicherzuweisung für jeden Abfrage-Executor erhöht, Knoten mit großer Speichermenge verwendet, Broadcast-Variablen (Broadcasting-Variablen) verwendet, aber jedes Mal stellte sich heraus, dass dies nur halbe Sachen waren, und nach und nach begannen die Executoren damit scheitern, bis alles aufhört.

Ich werde kreativer

Was habe ich gelernt: Manchmal erfordern spezielle Daten spezielle Lösungen.

Jeder SNP hat einen Positionswert. Dies ist eine Zahl, die der Anzahl der Basen entlang seines Chromosoms entspricht. Dies ist eine schöne und natürliche Art, unsere Daten zu organisieren. Zuerst wollte ich jedes Chromosom nach Regionen aufteilen. Zum Beispiel die Positionen 1 – 2000, 2001 – 4000 usw. Das Problem besteht jedoch darin, dass SNPs nicht gleichmäßig über die Chromosomen verteilt sind und die Gruppengrößen daher stark variieren.

Parsen von 25 TB mit AWK und R

Als Ergebnis kam ich zu einer Aufteilung der Positionen in Kategorien (Rang). Anhand der bereits heruntergeladenen Daten habe ich eine Anfrage gestellt, um eine Liste eindeutiger SNPs, ihrer Positionen und Chromosomen zu erhalten. Dann habe ich die Daten innerhalb jedes Chromosoms sortiert und SNPs in Gruppen (Bin) einer bestimmten Größe gesammelt. Nehmen wir an, jeweils 1000 SNPs. Dies gab mir die SNP-zu-Gruppe-pro-Chromosom-Beziehung.

Am Ende habe ich Gruppen (bin) von 75 SNPs gebildet, der Grund wird weiter unten erklärt.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Versuchen Sie es zunächst mit Spark

Was habe ich gelernt: Die Spark-Aggregation ist schnell, aber die Partitionierung ist immer noch teuer.

Ich wollte diesen kleinen Datenrahmen (2,5 Millionen Zeilen) in Spark einlesen, ihn mit den Rohdaten kombinieren und ihn dann nach der neu hinzugefügten Spalte partitionieren bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ich benutzte sdf_broadcast(), sodass Spark weiß, dass es den Datenrahmen an alle Knoten senden soll. Dies ist nützlich, wenn die Daten klein sind und für alle Aufgaben benötigt werden. Andernfalls versucht Spark, intelligent zu sein und Daten nach Bedarf zu verteilen, was zu Verlangsamungen führen kann.

Und wieder funktionierte meine Idee nicht: Die Aufgaben funktionierten einige Zeit, vervollständigten die Vereinigung und begannen dann, wie die durch Partitionierung gestarteten Testamentsvollstrecker, zu scheitern.

AWK hinzufügen

Was habe ich gelernt: Schlafen Sie nicht, wenn Ihnen die Grundlagen beigebracht werden. Sicherlich hat jemand Ihr Problem bereits in den 1980er Jahren gelöst.

Bis zu diesem Zeitpunkt war der Grund für alle meine Misserfolge mit Spark das Datenwirrwarr im Cluster. Vielleicht lässt sich die Situation durch eine Vorbehandlung verbessern. Ich beschloss, die Rohtextdaten in Chromosomenspalten aufzuteilen, und hoffte, Spark mit „vorpartitionierten“ Daten versorgen zu können.

Ich habe auf StackOverflow nach einer Aufteilung nach Spaltenwerten gesucht und Folgendes gefunden so eine tolle Antwort. Mit AWK können Sie eine Textdatei nach Spaltenwerten aufteilen, indem Sie sie in ein Skript schreiben, anstatt die Ergebnisse an zu senden stdout.

Ich habe ein Bash-Skript geschrieben, um es auszuprobieren. Eines der gepackten TSVs heruntergeladen und dann mit entpackt gzip und an gesendet awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Es funktionierte!

Füllen der Kerne

Was habe ich gelernt: gnu parallel - Es ist eine magische Sache, jeder sollte es nutzen.

Die Trennung verlief recht langsam und als ich anfing htopUm die Nutzung einer leistungsstarken (und teuren) EC2-Instanz zu überprüfen, stellte sich heraus, dass ich nur einen Kern und etwa 200 MB Speicher nutzte. Um das Problem zu lösen und nicht viel Geld zu verlieren, mussten wir herausfinden, wie wir die Arbeit parallelisieren können. Zum Glück in einem absolut erstaunlichen Buch Data Science an der Kommandozeile Ich habe ein Kapitel von Jeron Janssens über Parallelisierung gefunden. Daraus habe ich etwas gelernt gnu parallel, eine sehr flexible Methode zur Implementierung von Multithreading in Unix.

Parsen von 25 TB mit AWK und R
Als ich die Partitionierung mit dem neuen Prozess startete, war alles in Ordnung, aber es gab immer noch einen Engpass – das Herunterladen von S3-Objekten auf die Festplatte war nicht sehr schnell und nicht vollständig parallelisiert. Um das zu beheben, habe ich Folgendes getan:

  1. Ich habe herausgefunden, dass es möglich ist, die S3-Download-Phase direkt in der Pipeline zu implementieren, wodurch die Zwischenspeicherung auf der Festplatte vollständig entfällt. Das bedeutet, dass ich das Schreiben von Rohdaten auf die Festplatte vermeiden und noch kleineren und damit günstigeren Speicher auf AWS nutzen kann.
  2. Team aws configure set default.s3.max_concurrent_requests 50 Die Anzahl der Threads, die AWS CLI verwendet, wurde erheblich erhöht (standardmäßig sind es 10).
  3. Ich bin zu einer für Netzwerkgeschwindigkeit optimierten EC2-Instanz mit dem Buchstaben n im Namen gewechselt. Ich habe festgestellt, dass der Verlust an Rechenleistung bei der Verwendung von n-Instanzen durch die Erhöhung der Ladegeschwindigkeit mehr als ausgeglichen wird. Für die meisten Aufgaben habe ich c5n.4xl verwendet.
  4. Geändert gzip auf pigz, das ist ein gzip-Tool, das coole Dinge tun kann, um die anfänglich nicht parallelisierte Aufgabe des Dekomprimierens von Dateien zu parallelisieren (das hat am wenigsten geholfen).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Diese Schritte werden miteinander kombiniert, damit alles sehr schnell funktioniert. Durch die Erhöhung der Download-Geschwindigkeit und den Wegfall von Festplattenschreibvorgängen konnte ich nun ein 5-Terabyte-Paket in nur wenigen Stunden verarbeiten.

Dieser Tweet hätte „TSV“ erwähnen sollen. Ach.

Verwendung neu analysierter Daten

Was habe ich gelernt: Spark mag unkomprimierte Daten und kombiniert keine Partitionen.

Jetzt lagen die Daten in S3 in einem entpackten (sprich: geteilten) und halbgeordneten Format vor, und ich konnte wieder zu Spark zurückkehren. Eine Überraschung erwartete mich: Ich habe wieder einmal nicht das erreicht, was ich wollte! Es war sehr schwierig, Spark genau mitzuteilen, wie die Daten partitioniert waren. Und selbst als ich das tat, stellte sich heraus, dass es zu viele Partitionen gab (95), und als ich es benutzte coalesce Reduzierte ich ihre Anzahl auf ein vernünftiges Maß, zerstörte dies meine Partitionierung. Ich bin mir sicher, dass das Problem behoben werden kann, aber nach ein paar Tagen der Suche konnte ich keine Lösung finden. Letztendlich habe ich alle Aufgaben in Spark erledigt, obwohl es eine Weile gedauert hat und meine geteilten Parquet-Dateien nicht sehr klein waren (~200 KB). Die Daten waren jedoch dort, wo sie benötigt wurden.

Parsen von 25 TB mit AWK und R
Zu klein und uneben, wunderbar!

Testen lokaler Spark-Abfragen

Was habe ich gelernt: Spark hat beim Lösen einfacher Probleme zu viel Overhead.

Durch das Herunterladen der Daten in einem cleveren Format konnte ich die Geschwindigkeit testen. Richten Sie ein R-Skript ein, um einen lokalen Spark-Server auszuführen, und laden Sie dann einen Spark-Datenrahmen aus dem angegebenen Parquet-Gruppenspeicher (bin). Ich habe versucht, alle Daten zu laden, konnte Sparklyr jedoch nicht dazu bringen, die Partitionierung zu erkennen.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Die Ausführung dauerte 29,415 Sekunden. Viel besser, aber nicht zu gut für Massentests. Außerdem konnte ich das Caching nicht beschleunigen, denn wenn ich versuchte, einen Datenrahmen im Speicher zwischenzuspeichern, stürzte Spark immer ab, selbst wenn ich einem Datensatz, der weniger als 50 wog, mehr als 15 GB Speicher zuwies.

Zurück zu AWK

Was habe ich gelernt: Assoziative Arrays in AWK sind sehr effizient.

Mir wurde klar, dass ich höhere Geschwindigkeiten erreichen konnte. Daran habe ich mich wunderbar erinnert AWK-Tutorial von Bruce Barnett Ich habe von einer coolen Funktion namens „Assoziative Arrays" Im Wesentlichen handelt es sich dabei um Schlüssel-Wert-Paare, die in AWK aus irgendeinem Grund anders genannt wurden, und deshalb habe ich irgendwie nicht viel darüber nachgedacht. Roman Cheplyaka erinnerte daran, dass der Begriff „assoziative Arrays“ viel älter ist als der Begriff „Schlüssel-Wert-Paar“. Auch wenn Du Suchen Sie den Schlüsselwert in Google Ngram, Sie werden diesen Begriff dort nicht finden, aber Sie werden assoziative Arrays finden! Darüber hinaus wird das „Schlüssel-Wert-Paar“ am häufigsten mit Datenbanken in Verbindung gebracht, sodass es viel sinnvoller ist, es mit einer Hashmap zu vergleichen. Mir wurde klar, dass ich diese assoziativen Arrays verwenden konnte, um meine SNPs mit einer Bin-Tabelle und Rohdaten zu verknüpfen, ohne Spark zu verwenden.

Dazu habe ich im AWK-Skript den Block verwendet BEGIN. Dabei handelt es sich um einen Codeabschnitt, der ausgeführt wird, bevor die erste Datenzeile an den Hauptteil des Skripts übergeben wird.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) Alle Zeilen aus der CSV-Gruppe (bin) geladen und die erste Spalte (SNP-Name) als Schlüssel für das assoziative Array festgelegt bin und der zweite Wert (Gruppe) als Wert. Dann im Block { }, das auf allen Zeilen der Hauptdatei ausgeführt wird, wird jede Zeile an die Ausgabedatei gesendet, die abhängig von ihrer Gruppe (bin) einen eindeutigen Namen erhält: ..._bin_"bin[$1]"_....

Variablen batch_num и chunk_id stimmte mit den von der Pipeline bereitgestellten Daten überein, vermied eine Race-Bedingung und führte jeden Ausführungsthread aus parallel, schrieb in eine eigene eindeutige Datei.

Da ich alle Rohdaten in Ordnern auf Chromosomen verteilt habe, die von meinem vorherigen Experiment mit AWK übrig geblieben sind, könnte ich jetzt ein weiteres Bash-Skript schreiben, um jeweils ein Chromosom zu verarbeiten und tiefer partitionierte Daten an S3 zu senden.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Das Skript besteht aus zwei Abschnitten parallel.

Im ersten Abschnitt werden Daten aus allen Dateien gelesen, die Informationen zum gewünschten Chromosom enthalten. Anschließend werden diese Daten über Threads verteilt, die die Dateien in die entsprechenden Gruppen (Bin) verteilen. Um Race-Bedingungen zu vermeiden, wenn mehrere Threads in dieselbe Datei schreiben, übergibt AWK die Dateinamen, um Daten an verschiedene Orte zu schreiben, z. B. chr_10_bin_52_batch_2_aa.csv. Dadurch entstehen viele kleine Dateien auf der Festplatte (dafür habe ich Terabyte-EBS-Volumes verwendet).

Förderer aus dem zweiten Abschnitt parallel geht die Gruppen (bin) durch und kombiniert ihre einzelnen Dateien in einer gemeinsamen CSV c catund sendet sie dann zum Export.

Ausstrahlung in R?

Was habe ich gelernt: Du kannst kontaktieren stdin и stdout aus einem R-Skript und verwenden Sie es daher in der Pipeline.

Möglicherweise ist Ihnen diese Zeile in Ihrem Bash-Skript aufgefallen: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Es übersetzt alle verketteten Gruppendateien (bin) in das untenstehende R-Skript. {} ist eine besondere Technik parallel, wodurch alle Daten, die an den angegebenen Stream gesendet werden, direkt in den Befehl selbst eingefügt werden. Möglichkeit {#} stellt eine eindeutige Thread-ID bereit und {%} stellt die Job-Slot-Nummer dar (wiederholt, aber nie gleichzeitig). Eine Liste aller Optionen finden Sie in Dokumentation.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Wenn eine Variable file("stdin") übermittelt an readr::read_csv, werden die in das R-Skript übersetzten Daten in einen Frame geladen, der dann im Formular vorliegt .rds-Datei mit aws.s3 direkt in S3 geschrieben.

RDS ist so etwas wie eine Junior-Version von Parquet, ohne den Schnickschnack der Lautsprecherspeicherung.

Nachdem ich das Bash-Skript fertiggestellt hatte, bekam ich ein Paket .rds-Dateien in S3, wodurch ich effiziente Komprimierung und integrierte Typen verwenden konnte.

Trotz Einsatz der Bremse R klappte alles sehr schnell. Es überrascht nicht, dass die Teile von R, die Daten lesen und schreiben, stark optimiert sind. Nach dem Testen auf einem mittelgroßen Chromosom wurde der Job auf einer C5n.4xl-Instanz in etwa zwei Stunden abgeschlossen.

S3-Einschränkungen

Was habe ich gelernt: Dank der Smart-Path-Implementierung kann S3 viele Dateien verarbeiten.

Ich war besorgt, ob S3 mit den vielen Dateien, die darauf übertragen wurden, zurechtkommen würde. Ich könnte dafür sorgen, dass die Dateinamen einen Sinn ergeben, aber wie würde S3 danach suchen?

Parsen von 25 TB mit AWK und R
Ordner in S3 dienen nur zur Veranschaulichung, tatsächlich ist das System nicht an dem Symbol interessiert /. Von der S3-FAQ-Seite.

Es scheint, dass S3 den Pfad zu einer bestimmten Datei als einfachen Schlüssel in einer Art Hash-Tabelle oder einer dokumentbasierten Datenbank darstellt. Ein Bucket kann als Tabelle betrachtet werden und Dateien können als Datensätze in dieser Tabelle betrachtet werden.

Da Geschwindigkeit und Effizienz wichtig sind, um bei Amazon Gewinne zu erzielen, ist es keine Überraschung, dass dieses Schlüssel-als-Dateipfad-System verdammt optimiert ist. Ich habe versucht, ein Gleichgewicht zu finden: so dass ich nicht viele Get-Anfragen stellen musste, die Anfragen aber schnell ausgeführt wurden. Es stellte sich heraus, dass es am besten ist, etwa 20 Bin-Dateien zu erstellen. Ich denke, wenn wir weiter optimieren, können wir die Geschwindigkeit steigern (z. B. indem wir einen speziellen Bucket nur für Daten erstellen und so die Größe der Nachschlagetabelle verringern). Für weitere Experimente fehlte jedoch die Zeit und das Geld.

Wie sieht es mit der Kreuzkompatibilität aus?

Was ich gelernt habe: Die häufigste Ursache für Zeitverschwendung ist die vorzeitige Optimierung Ihrer Speichermethode.

An dieser Stelle ist es sehr wichtig, sich zu fragen: „Warum ein proprietäres Dateiformat verwenden?“ Der Grund liegt in der Ladegeschwindigkeit (das Laden gzippter CSV-Dateien dauerte siebenmal länger) und der Kompatibilität mit unseren Arbeitsabläufen. Ich überlege vielleicht noch einmal, ob R problemlos Parquet- (oder Arrow-)Dateien laden kann, ohne dass Spark geladen werden muss. Jeder in unserem Labor verwendet R, und wenn ich die Daten in ein anderes Format konvertieren muss, habe ich immer noch die ursprünglichen Textdaten, sodass ich die Pipeline einfach erneut ausführen kann.

Arbeitsteilung

Was habe ich gelernt: Versuchen Sie nicht, Jobs manuell zu optimieren, sondern überlassen Sie es dem Computer.

Ich habe den Workflow für ein Chromosom debuggt, jetzt muss ich alle anderen Daten verarbeiten.
Ich wollte mehrere EC2-Instanzen zur Konvertierung hochfahren, hatte aber gleichzeitig Angst vor einer sehr unausgeglichenen Last über verschiedene Verarbeitungsjobs hinweg (so wie Spark unter unausgeglichenen Partitionen litt). Darüber hinaus hatte ich kein Interesse daran, eine Instanz pro Chromosom zu erhöhen, da für AWS-Konten ein Standardlimit von 10 Instanzen gilt.

Dann habe ich beschlossen, ein Skript in R zu schreiben, um Verarbeitungsaufgaben zu optimieren.

Zuerst habe ich S3 gebeten, zu berechnen, wie viel Speicherplatz jedes Chromosom belegt.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Dann habe ich eine Funktion geschrieben, die die Gesamtgröße ermittelt, die Reihenfolge der Chromosomen mischt und sie in Gruppen aufteilt num_jobs und verrät Ihnen, wie unterschiedlich die Größen aller Verarbeitungsaufträge sind.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Dann habe ich mit purrr tausend Mischvorgänge durchgespielt und das Beste ausgewählt.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Am Ende hatte ich eine Reihe von Aufgaben, die vom Umfang her sehr ähnlich waren. Dann musste ich nur noch mein vorheriges Bash-Skript in eine große Schleife packen for. Das Schreiben dieser Optimierung dauerte etwa 10 Minuten. Und das ist viel weniger, als ich für die manuelle Erstellung von Aufgaben ausgeben würde, wenn sie unausgewogen wären. Daher denke ich, dass ich mit dieser vorläufigen Optimierung richtig gelegen habe.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Am Ende füge ich den Shutdown-Befehl hinzu:

sudo shutdown -h now

... und alles hat geklappt! Mit der AWS CLI habe ich Instanzen mithilfe der Option ausgelöst user_data gab ihnen Bash-Skripte ihrer Aufgaben zur Verarbeitung. Sie liefen und schalteten sich automatisch ab, sodass ich nicht für zusätzliche Rechenleistung zahlen musste.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Lasst uns packen!

Was habe ich gelernt: Die API sollte aus Gründen der Benutzerfreundlichkeit und Flexibilität einfach sein.

Endlich habe ich die Daten an der richtigen Stelle und in der richtigen Form. Es blieb nur noch, den Prozess der Datennutzung so weit wie möglich zu vereinfachen, um es meinen Kollegen einfacher zu machen. Ich wollte eine einfache API zum Erstellen von Anfragen erstellen. Wenn ich mich in Zukunft für einen Wechsel entscheide .rds zu Parquet-Dateien, dann sollte das für mich ein Problem sein, nicht für meine Kollegen. Dafür habe ich beschlossen, ein internes R-Paket zu erstellen.

Erstellen und dokumentieren Sie ein sehr einfaches Paket, das nur wenige Datenzugriffsfunktionen enthält, die um eine Funktion herum organisiert sind get_snp. Ich habe auch eine Website für meine Kollegen erstellt pkgdown, damit sie leicht Beispiele und Dokumentation sehen können.

Parsen von 25 TB mit AWK und R

Intelligentes Caching

Was habe ich gelernt: Wenn Ihre Daten gut vorbereitet sind, wird das Caching einfach!

Da einer der Hauptworkflows dasselbe Analysemodell auf das SNP-Paket anwendete, entschied ich mich, Binning zu meinem Vorteil zu nutzen. Bei der Datenübertragung per SNP werden alle Informationen aus der Gruppe (bin) an das zurückgegebene Objekt angehängt. Das heißt, alte Abfragen können (theoretisch) die Verarbeitung neuer Abfragen beschleunigen.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Beim Erstellen des Pakets habe ich viele Benchmarks durchgeführt, um die Geschwindigkeit bei Verwendung verschiedener Methoden zu vergleichen. Ich empfehle, dies nicht zu vernachlässigen, da die Ergebnisse manchmal unerwartet sind. Zum Beispiel, dplyr::filter war viel schneller als das Erfassen von Zeilen mit indizierungsbasierter Filterung, und das Abrufen einer einzelnen Spalte aus einem gefilterten Datenrahmen war viel schneller als mit der Indizierungssyntax.

Bitte beachten Sie, dass das Objekt prev_snp_results enthält den Schlüssel snps_in_bin. Dabei handelt es sich um ein Array aller eindeutigen SNPs in einer Gruppe (Bin), sodass Sie schnell überprüfen können, ob Sie bereits Daten aus einer vorherigen Abfrage haben. Mit diesem Code ist es auch einfach, alle SNPs in einer Gruppe (Bin) zu durchlaufen:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Ergebnisse

Jetzt können wir (und haben begonnen, dies ernsthaft zu tun) Modelle und Szenarien ausführen, die für uns zuvor unzugänglich waren. Das Beste ist, dass meine Laborkollegen über keine Komplikationen nachdenken müssen. Sie haben einfach eine Funktion, die funktioniert.

Und obwohl das Paket ihnen die Details erspart, habe ich versucht, das Datenformat so einfach zu gestalten, dass sie es herausfinden könnten, wenn ich morgen plötzlich verschwinden würde ...

Die Geschwindigkeit hat spürbar zugenommen. Wir scannen in der Regel funktionell bedeutsame Genomfragmente. Früher war dies nicht möglich (es stellte sich als zu teuer heraus), aber dank der Gruppenstruktur (Bin) und dem Caching dauert eine Anfrage für einen SNP jetzt im Durchschnitt weniger als 0,1 Sekunden, und die Datennutzung ist entsprechend niedrig, dass die Kosten für S3 Peanuts sind.

Abschluss

Dieser Artikel ist überhaupt kein Leitfaden. Die Lösung erwies sich als individuell und mit ziemlicher Sicherheit nicht optimal. Vielmehr handelt es sich um einen Reisebericht. Ich möchte, dass andere verstehen, dass solche Entscheidungen nicht vollständig im Kopf getroffen werden, sondern das Ergebnis von Versuch und Irrtum sind. Wenn Sie auf der Suche nach einem Datenwissenschaftler sind, bedenken Sie außerdem, dass der effektive Einsatz dieser Tools Erfahrung erfordert und Erfahrung Geld kostet. Ich bin froh, dass ich die Mittel hatte, um zu bezahlen, aber viele andere, die den gleichen Job besser machen können als ich, werden aus Geldmangel nie die Möglichkeit haben, es überhaupt zu versuchen.

Big-Data-Tools sind vielseitig. Wenn Sie Zeit haben, können Sie mithilfe intelligenter Techniken zur Datenbereinigung, -speicherung und -extraktion mit ziemlicher Sicherheit eine schnellere Lösung schreiben. Letztlich kommt es auf eine Kosten-Nutzen-Analyse an.

Was ich gelernt habe:

  • Es gibt keine kostengünstige Möglichkeit, 25 TB gleichzeitig zu analysieren.
  • Seien Sie vorsichtig mit der Größe Ihrer Parquet-Dateien und deren Organisation.
  • Partitionen in Spark müssen ausgeglichen sein;
  • Versuchen Sie im Allgemeinen niemals, 2,5 Millionen Partitionen zu erstellen;
  • Das Sortieren ist immer noch schwierig, ebenso wie das Einrichten von Spark.
  • Manchmal erfordern spezielle Daten spezielle Lösungen.
  • Die Spark-Aggregation ist schnell, aber die Partitionierung ist immer noch teuer.
  • Schlafen Sie nicht, wenn Ihnen die Grundlagen beigebracht werden, wahrscheinlich hat jemand Ihr Problem bereits in den 1980er Jahren gelöst;
  • gnu parallel - Das ist eine magische Sache, jeder sollte sie nutzen;
  • Spark mag unkomprimierte Daten und kombiniert keine Partitionen.
  • Spark hat bei der Lösung einfacher Probleme zu viel Overhead;
  • Die assoziativen Arrays von AWK sind sehr effizient;
  • du kannst kontaktieren stdin и stdout aus einem R-Skript und verwenden Sie es daher in der Pipeline;
  • Dank der Smart-Path-Implementierung kann S3 viele Dateien verarbeiten;
  • Der Hauptgrund für Zeitverschwendung ist die vorzeitige Optimierung Ihrer Speichermethode.
  • Versuchen Sie nicht, Aufgaben manuell zu optimieren, sondern überlassen Sie dies dem Computer.
  • Die API sollte aus Gründen der Benutzerfreundlichkeit und Flexibilität einfach sein;
  • Wenn Ihre Daten gut vorbereitet sind, wird das Caching einfach!

Source: habr.com

Kommentar hinzufügen