Analizowanie 25 TB przy użyciu AWK i R

Analizowanie 25 TB przy użyciu AWK i R
Jak przeczytać ten artykuł: Przepraszam, że tekst jest taki długi i chaotyczny. Aby zaoszczędzić czas, rozpoczynam każdy rozdział wstępem „Czego się nauczyłem”, który podsumowuje istotę rozdziału w jednym lub dwóch zdaniach.

„Po prostu pokaż mi rozwiązanie!” Jeśli chcesz po prostu zobaczyć, skąd pochodzę, przejdź do rozdziału „Stawanie się bardziej pomysłowymi”, ale myślę, że bardziej interesująca i przydatna jest lektura o porażce.

Niedawno powierzono mi zadanie przygotowania procesu przetwarzania dużej ilości surowych sekwencji DNA (technicznie rzecz biorąc, chipa SNP). Konieczne było szybkie uzyskanie danych o danej lokalizacji genetycznej (zwanej SNP) do późniejszego modelowania i innych zadań. Używając R i AWK, udało mi się oczyścić i uporządkować dane w naturalny sposób, znacznie przyspieszając przetwarzanie zapytań. Nie było to dla mnie łatwe i wymagało wielu powtórzeń. Ten artykuł pomoże Ci uniknąć niektórych moich błędów i pokaże, z czym skończyłem.

Na początek kilka wyjaśnień wprowadzających.

Te

Nasze uniwersyteckie centrum przetwarzania informacji genetycznej dostarczyło nam dane w postaci pliku TSV o pojemności 25 TB. Otrzymałem je podzielone na 5 pakietów, skompresowanych za pomocą programu Gzip, z których każdy zawierał około 240 czterogigabajtowych plików. Każdy wiersz zawierał dane dla jednego SNP od jednej osoby. W sumie przesłano dane dotyczące ~2,5 mln SNP i ~60 tys. osób. Oprócz informacji o SNP pliki zawierały liczne kolumny z liczbami odzwierciedlającymi różne cechy, takie jak intensywność odczytu, częstotliwość różnych alleli itp. W sumie było około 30 kolumn z unikalnymi wartościami.

cel

Jak w przypadku każdego projektu zarządzania danymi, najważniejsze było określenie, w jaki sposób dane będą wykorzystywane. W tym przypadku będziemy głównie wybierać modele i przepływy pracy dla SNP w oparciu o SNP. Oznacza to, że będziemy potrzebować danych tylko o jednym SNP na raz. Musiałem nauczyć się, jak odzyskać wszystkie rekordy powiązane z jednym z 2,5 miliona SNP tak łatwo, szybko i tanio, jak to możliwe.

Jak tego nie robić

Cytując odpowiedni frazes:

Nie zawiodłem się tysiąc razy, po prostu odkryłem tysiąc sposobów na uniknięcie analizowania dużej ilości danych w formacie przyjaznym dla zapytań.

Pierwsza próba

Czego się nauczyłem?: Nie ma taniego sposobu na jednoczesne przeanalizowanie 25 TB.

Po ukończeniu kursu „Zaawansowane metody przetwarzania dużych zbiorów danych” na Uniwersytecie Vanderbilt byłem pewien, że pomysł jest w zasięgu ręki. Skonfigurowanie serwera Hive do przeglądania wszystkich danych i raportowania wyników zajmie prawdopodobnie godzinę lub dwie. Ponieważ nasze dane są przechowywane w AWS S3, skorzystałem z usługi Athena, która umożliwia stosowanie zapytań SQL programu Hive do danych S3. Nie musisz konfigurować/unosić klastra Hive, a także płacisz tylko za dane, których szukasz.

Po pokazaniu Athenie moich danych i ich formatu przeprowadziłem kilka testów z takimi zapytaniami:

select * from intensityData limit 10;

I szybko otrzymał dobrze ustrukturyzowane wyniki. Gotowy.

Dopóki nie spróbowaliśmy wykorzystać danych w naszej pracy...

Poproszono mnie o pobranie wszystkich informacji o SNP w celu przetestowania modelu. Uruchomiłem zapytanie:


select * from intensityData 
where snp = 'rs123456';

...i zacząłem czekać. Po ośmiu minutach i ponad 4 TB żądanych danych otrzymałem wynik. Athena pobiera opłatę za ilość znalezionych danych – 5 dolarów za terabajt. Zatem to pojedyncze żądanie kosztuje 20 dolarów i osiem minut oczekiwania. Aby uruchomić model na wszystkich danych, musieliśmy czekać 38 lat i zapłacić 50 milionów dolarów, co oczywiście nie było dla nas odpowiednie.

Trzeba było użyć parkietu...

Czego się nauczyłem?: Uważaj na rozmiar plików Parquet i ich organizację.

Najpierw próbowałem naprawić sytuację, konwertując wszystkie TSV na Pilniki do parkietu. Są wygodne do pracy z dużymi zbiorami danych, ponieważ zawarte w nich informacje są przechowywane w formie kolumnowej: każda kolumna znajduje się w swoim własnym segmencie pamięci/dysku, w przeciwieństwie do plików tekstowych, w których wiersze zawierają elementy każdej kolumny. A jeśli chcesz coś znaleźć, po prostu przeczytaj wymaganą kolumnę. Dodatkowo każdy plik przechowuje zakres wartości w kolumnie, więc jeśli szukana wartość nie mieści się w zakresie kolumny, Spark nie będzie tracił czasu na skanowanie całego pliku.

Wykonałem proste zadanie Klej AWS aby przekonwertować nasze TSV na Parquet i upuścić nowe pliki do Atheny. Zajęło to około 5 godzin. Ale kiedy uruchomiłem prośbę, jej ukończenie zajęło mniej więcej tyle samo czasu i trochę mniej pieniędzy. Faktem jest, że Spark, próbując zoptymalizować zadanie, po prostu rozpakował jeden fragment TSV i umieścił go we własnym kawałku Parquet. A ponieważ każdy fragment był wystarczająco duży, aby pomieścić całe rekordy wielu osób, każdy plik zawierał wszystkie SNP, więc Spark musiał otworzyć wszystkie pliki, aby wyodrębnić potrzebne informacje.

Co ciekawe, domyślnego (i zalecanego) typu kompresji Parquet, snappy, nie można dzielić. Dlatego każdy wykonawca utknął w zadaniu rozpakowania i pobrania pełnego zestawu danych o rozmiarze 3,5 GB.

Analizowanie 25 TB przy użyciu AWK i R

Rozumiemy problem

Czego się nauczyłem?: Sortowanie jest trudne, zwłaszcza jeśli dane są rozproszone.

Wydawało mi się, że teraz zrozumiałem istotę problemu. Musiałem tylko posortować dane według kolumny SNP, a nie według osób. Następnie kilka SNP zostanie zapisanych w osobnym fragmencie danych, a wtedy „inteligentna” funkcja Parquet „otwiera tylko wtedy, gdy wartość mieści się w zakresie” pokaże się w całej okazałości. Niestety sortowanie miliardów wierszy rozproszonych w klastrze okazało się trudnym zadaniem.

AWS zdecydowanie nie chce zwrócić pieniędzy z powodu „jestem rozproszonym studentem”. Po uruchomieniu sortowania na Amazon Glue działało ono przez 2 dni i uległo awarii.

A co z partycjonowaniem?

Czego się nauczyłem?: Partycje w Sparku muszą być zrównoważone.

Wtedy wpadłem na pomysł podziału danych na chromosomy. Jest ich 23 (i kilka więcej, jeśli weźmie się pod uwagę mitochondrialne DNA i niezmapowane regiony).
Umożliwi to podzielenie danych na mniejsze fragmenty. Jeśli dodasz tylko jedną linię do funkcji eksportu Spark w skrypcie Glue partition_by = "chr", to dane należy podzielić na segmenty.

Analizowanie 25 TB przy użyciu AWK i R
Genom składa się z licznych fragmentów zwanych chromosomami.

Niestety, to nie zadziałało. Chromosomy mają różną wielkość, co oznacza różną ilość informacji. Oznacza to, że zadania wysyłane do pracowników przez platformę Spark nie były zrównoważone i wykonywane powoli, ponieważ niektóre węzły zakończyły się wcześniej i były bezczynne. Jednakże zadania zostały wykonane. Ale gdy poprosiłem o jeden SNP, brak równowagi ponownie spowodował problemy. Koszt przetwarzania SNP na większych chromosomach (to znaczy tam, gdzie chcemy uzyskać dane) spadł jedynie około 10-krotnie. Dużo, ale nie wystarczy.

A co jeśli podzielimy to na jeszcze mniejsze części?

Czego się nauczyłem?: Nigdy nie próbuj w ogóle tworzyć 2,5 miliona partycji.

Postanowiłem pójść na całość i podzielić każdy SNP na partycje. Zapewniło to, że przegrody były jednakowej wielkości. TO BYŁ ZŁY POMYSŁ. Użyłem kleju i dodałem niewinną linię partition_by = 'snp'. Zadanie rozpoczęło się i zaczęło być wykonywane. Dzień później sprawdziłem i zobaczyłem, że nadal nic nie jest zapisane w S3, więc zabiłem zadanie. Wygląda na to, że Glue zapisywał pliki pośrednie w ukrytej lokalizacji w S3, dużo plików, może kilka milionów. W rezultacie mój błąd kosztował ponad tysiąc dolarów i nie zadowolił mojego mentora.

Partycjonowanie + sortowanie

Czego się nauczyłem?: Sortowanie jest nadal trudne, podobnie jak strojenie Sparka.

Moja ostatnia próba podziału polegała na podzieleniu chromosomów, a następnie posortowaniu każdej partycji. Teoretycznie przyspieszyłoby to każde zapytanie, ponieważ żądane dane SNP musiały mieścić się w kilku fragmentach Parquet w danym zakresie. Niestety sortowanie nawet podzielonych na partycje danych okazało się trudnym zadaniem. W rezultacie przełączyłem się na EMR dla niestandardowego klastra i użyłem ośmiu potężnych instancji (C5.4xl) i Sparklyr, aby stworzyć bardziej elastyczny przepływ pracy...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...jednak zadanie nadal nie zostało ukończone. Konfigurowałem to na różne sposoby: zwiększałem alokację pamięci dla każdego executora zapytania, używałem węzłów z dużą ilością pamięci, używałem zmiennych rozgłoszeniowych (zmiennych rozgłoszeniowych), ale za każdym razem okazywały się to półśrodkami i stopniowo zaczynały się executory niepowodzeniem, dopóki wszystko się nie zatrzyma.

Staję się bardziej kreatywny

Czego się nauczyłem?: Czasami specjalne dane wymagają specjalnych rozwiązań.

Każdy SNP ma wartość pozycji. Jest to liczba odpowiadająca liczbie zasad wzdłuż jego chromosomu. To ładny i naturalny sposób uporządkowania naszych danych. Na początku chciałem podzielić według regionów każdego chromosomu. Na przykład pozycje 1 - 2000, 2001 - 4000 itd. Problem polega jednak na tym, że SNP nie są równomiernie rozmieszczone na chromosomach, dlatego liczebność grup będzie się znacznie różnić.

Analizowanie 25 TB przy użyciu AWK i R

W rezultacie doszedłem do podziału pozycji na kategorie (rangi). Korzystając z już pobranych danych, uruchomiłem prośbę o uzyskanie listy unikalnych SNP, ich pozycji i chromosomów. Następnie posortowałem dane w obrębie każdego chromosomu i zebrałem SNP w grupy (kosz) o określonej wielkości. Powiedzmy, że każdy ma 1000 SNP. To dało mi związek SNP-grupa-na-chromosom.

Ostatecznie utworzyłem grupy (kosz) 75 SNP, powód zostanie wyjaśniony poniżej.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Najpierw spróbuj ze Sparkiem

Czego się nauczyłem?: Agregacja iskier jest szybka, ale partycjonowanie jest nadal drogie.

Chciałem wczytać tę małą (2,5 miliona wierszy) ramkę danych do Sparka, połączyć ją z surowymi danymi, a następnie podzielić ją według nowo dodanej kolumny bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

użyłem sdf_broadcast(), więc Spark wie, że powinien wysłać ramkę danych do wszystkich węzłów. Jest to przydatne, jeśli dane są małe i wymagane do wszystkich zadań. W przeciwnym razie Spark stara się zachować rozsądek i dystrybuuje dane w razie potrzeby, co może powodować spowolnienia.

I znowu mój pomysł nie zadziałał: zadania działały przez jakiś czas, zakończyły unię, a potem, podobnie jak executory uruchomione przez partycjonowanie, zaczęły zawodzić.

Dodanie AWK

Czego się nauczyłem?: Nie śpij, gdy uczysz się podstaw. Z pewnością ktoś rozwiązał już Twój problem w latach 1980-tych.

Do tego momentu przyczyną wszystkich moich niepowodzeń ze Sparkiem była plątanina danych w klastrze. Być może sytuację można poprawić dzięki wstępnemu leczeniu. Postanowiłem spróbować podzielić surowe dane tekstowe na kolumny chromosomów, więc miałem nadzieję dostarczyć Sparkowi dane „wstępnie podzielone”.

Szukałem na StackOverflow sposobu dzielenia według wartości kolumn i znalazłem taka świetna odpowiedź. Za pomocą AWK możesz podzielić plik tekstowy według wartości kolumn, zapisując go w skrypcie, zamiast wysyłać wyniki stdout.

Napisałem skrypt Bash, aby go wypróbować. Pobrałem jeden z spakowanych plików TSV, a następnie rozpakowałem go za pomocą gzip i wysłane do awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Zadziałało!

Wypełnienie rdzeni

Czego się nauczyłem?: gnu parallel - to magiczna rzecz, każdy powinien z niej korzystać.

Separacja była dość powolna i kiedy zacząłem htopaby sprawdzić użycie potężnej (i drogiej) instancji EC2 okazało się, że używałem tylko jednego rdzenia i około 200 MB pamięci. Aby rozwiązać problem i nie stracić dużo pieniędzy, musieliśmy wymyślić, jak zrównoleglić pracę. Na szczęście w absolutnie niesamowitej książce Analiza danych w linii poleceń Znalazłem rozdział Jerona Janssena na temat równoległości. Z tego się dowiedziałem gnu parallel, bardzo elastyczna metoda implementacji wielowątkowości w systemie Unix.

Analizowanie 25 TB przy użyciu AWK i R
Kiedy zacząłem partycjonowanie przy użyciu nowego procesu, wszystko było w porządku, ale nadal istniało wąskie gardło - pobieranie obiektów S3 na dysk nie było zbyt szybkie i nie było w pełni zrównoleglone. Aby to naprawić, zrobiłem to:

  1. Dowiedziałem się, że możliwe jest wdrożenie etapu pobierania S3 bezpośrednio w potoku, całkowicie eliminując pośrednie przechowywanie na dysku. Oznacza to, że mogę uniknąć zapisywania surowych danych na dysku i korzystać z jeszcze mniejszej, a przez to tańszej pamięci w AWS.
  2. zespół aws configure set default.s3.max_concurrent_requests 50 znacznie zwiększyło liczbę wątków używanych przez AWS CLI (domyślnie jest ich 10).
  3. Przełączyłem się na instancję EC2 zoptymalizowaną pod kątem szybkości sieci, z literą n w nazwie. Odkryłem, że utrata mocy obliczeniowej podczas korzystania z n instancji jest z nadwyżką kompensowana przez wzrost prędkości ładowania. Do większości zadań używałem c5n.4xl.
  4. Zmieniono gzip na pigz, jest to narzędzie gzip, które może robić fajne rzeczy, aby zrównoleglić początkowo nierównoległe zadanie dekompresji plików (to najmniej pomogło).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Te kroki są ze sobą łączone, aby wszystko działało bardzo szybko. Zwiększając prędkość pobierania i eliminując zapisy na dysku, mogłem teraz przetworzyć pakiet o wielkości 5 terabajtów w ciągu zaledwie kilku godzin.

W tym tweecie należało wspomnieć o „TSV”. Niestety.

Korzystanie z nowo przeanalizowanych danych

Czego się nauczyłem?: Spark lubi nieskompresowane dane i nie lubi łączyć partycji.

Teraz dane znajdowały się w S3 w formacie rozpakowanym (czytaj: udostępnionym) i częściowo uporządkowanym i mogłem ponownie wrócić do Sparka. Czekała mnie niespodzianka: znowu nie udało mi się osiągnąć tego, czego chciałem! Bardzo trudno było dokładnie określić, w jaki sposób dane zostały podzielone na partycje. A nawet kiedy to zrobiłem, okazało się, że partycji było za dużo (95 tysięcy), a kiedy użyłem coalesce zmniejszyłem ich liczbę do rozsądnych granic, co zniszczyło mój podział. Jestem pewien, że można to naprawić, ale po kilku dniach poszukiwań nie mogłem znaleźć rozwiązania. Ostatecznie ukończyłem wszystkie zadania w Sparku, chociaż zajęło to trochę czasu, a moje podzielone pliki Parquet nie były zbyt małe (~200 KB). Jednak dane były tam, gdzie były potrzebne.

Analizowanie 25 TB przy użyciu AWK i R
Za małe i nierówne, cudowne!

Testowanie lokalnych zapytań Spark

Czego się nauczyłem?: Spark ma zbyt duże obciążenie przy rozwiązywaniu prostych problemów.

Pobierając dane w sprytnym formacie, mogłem przetestować prędkość. Skonfiguruj skrypt R, aby uruchomić lokalny serwer Spark, a następnie załaduj ramkę danych Spark z określonego magazynu grupy Parquet (bin). Próbowałem załadować wszystkie dane, ale nie mogłem zmusić Sparklyra do rozpoznania partycjonowania.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Wykonanie trwało 29,415 sekundy. Znacznie lepiej, ale nie za dobrze do masowego testowania czegokolwiek. Dodatkowo nie mogłem przyspieszyć działania buforowania, ponieważ gdy próbowałem buforować ramkę danych w pamięci, Spark zawsze ulegał awarii, nawet gdy przydzieliłem ponad 50 GB pamięci do zbioru danych ważącego mniej niż 15.

Wróć do AWK

Czego się nauczyłem?: Tablice asocjacyjne w AWK są bardzo wydajne.

Zdałem sobie sprawę, że mogę osiągnąć wyższe prędkości. Przypomniało mi się to cudownie Poradnik AWK autorstwa Bruce'a Barnetta Czytałem o fajnej funkcji zwanej „tablice asocjacyjne" Zasadniczo są to pary klucz-wartość, które z jakiegoś powodu w AWK nazywano inaczej i dlatego jakoś nie myślałem o nich zbyt wiele. Roman Czeplaka przypomniał, że termin „tablice asocjacyjne” jest znacznie starszy niż termin „para klucz-wartość”. Nawet jeśli ty wyszukaj parę klucz-wartość w Google Ngram, nie zobaczysz tam tego terminu, ale znajdziesz tablice asocjacyjne! Ponadto „para klucz-wartość” jest najczęściej kojarzona z bazami danych, dlatego o wiele sensowniej jest porównać ją z hashmapą. Zdałem sobie sprawę, że mogę użyć tych tablic asocjacyjnych do powiązania moich SNP z tabelą bin i surowymi danymi bez użycia Sparka.

W tym celu w skrypcie AWK użyłem bloku BEGIN. Jest to fragment kodu wykonywany przed przekazaniem pierwszej linii danych do głównej części skryptu.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Zespół while(getline...) załadował wszystkie wiersze z grupy CSV (bin), ustaw pierwszą kolumnę (nazwa SNP) jako klucz dla tablicy asocjacyjnej bin i druga wartość (grupa) jako wartość. Potem w bloku { }, który jest wykonywany we wszystkich liniach pliku głównego, każda linia jest wysyłana do pliku wyjściowego, który otrzymuje unikalną nazwę w zależności od swojej grupy (bin): ..._bin_"bin[$1]"_....

Zmienne batch_num и chunk_id dopasował dane dostarczone przez potok, unikając sytuacji wyścigu i każdego uruchomionego wątku wykonawczego parallel, zapisał do swojego własnego, unikalnego pliku.

Ponieważ rozrzuciłem wszystkie surowe dane do folderów na chromosomach pozostałych po moim poprzednim eksperymencie z AWK, teraz mogłem napisać kolejny skrypt w języku Bash, aby przetwarzał jeden chromosom na raz i wysyłał dane podzielone na głębsze partycje do S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skrypt składa się z dwóch sekcji parallel.

W pierwszej sekcji odczytywane są dane ze wszystkich plików zawierających informacje o pożądanym chromosomie, następnie dane te są rozdzielane pomiędzy wątki, które rozdzielają pliki do odpowiednich grup (bin). Aby uniknąć sytuacji wyścigu, gdy wiele wątków zapisuje do tego samego pliku, AWK przekazuje nazwy plików, aby zapisać dane w różnych miejscach, np. chr_10_bin_52_batch_2_aa.csv. W rezultacie na dysku powstaje wiele małych plików (w tym celu użyłem terabajtowych woluminów EBS).

Przenośnik z drugiej sekcji parallel przechodzi przez grupy (bin) i łączy ich poszczególne pliki we wspólny plik CSV c cata następnie wysyła je na eksport.

Nadawanie w R?

Czego się nauczyłem?: Możesz się skontaktować stdin и stdout ze skryptu R i dlatego użyj go w potoku.

Być może zauważyłeś tę linię w skrypcie Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Tłumaczy wszystkie połączone pliki grupowe (bin) na poniższy skrypt R. {} to specjalna technika parallel, który wstawia wszelkie dane wysyłane do określonego strumienia bezpośrednio do samego polecenia. Opcja {#} zapewnia unikalny identyfikator wątku i {%} reprezentuje numer stanowiska (powtarzane, ale nigdy jednocześnie). Listę wszystkich opcji znajdziesz w dokumentacja.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kiedy zmienna file("stdin") przesłane do readr::read_csv, dane przetłumaczone na skrypt R są ładowane do ramki, która następnie znajduje się w formularzu .rds-plik za pomocą aws.s3 zapisane bezpośrednio do S3.

RDS to coś w rodzaju młodszej wersji Parquet, bez zbędnych dodatków do przechowywania głośników.

Po ukończeniu skryptu Bash dostałem pakiet .rds-pliki zlokalizowane w S3, co pozwoliło mi zastosować wydajną kompresję i typy wbudowane.

Pomimo użycia hamulca R wszystko zadziałało bardzo szybko. Nic dziwnego, że części R, które odczytują i zapisują dane, są wysoce zoptymalizowane. Po przetestowaniu jednego średniej wielkości chromosomu, zadanie na instancji C5n.4xl zostało ukończone w ciągu około dwóch godzin.

Ograniczenia S3

Czego się nauczyłem?: Dzięki inteligentnej implementacji ścieżki S3 może obsłużyć wiele plików.

Martwiłem się, czy S3 będzie w stanie obsłużyć wiele plików, które zostały do ​​niego przeniesione. Mógłbym sprawić, że nazwy plików będą miały sens, ale jak S3 będzie ich szukać?

Analizowanie 25 TB przy użyciu AWK i R
Foldery w S3 są tylko na pokaz, tak naprawdę system nie jest zainteresowany symbolem /. Ze strony FAQ S3.

Wygląda na to, że S3 reprezentuje ścieżkę do konkretnego pliku jako prosty klucz w czymś w rodzaju tablicy mieszającej lub bazy danych opartej na dokumentach. Wiadro można traktować jak tabelę, a pliki można traktować jako rekordy w tej tabeli.

Ponieważ szybkość i wydajność są ważne dla osiągnięcia zysku w Amazon, nie jest niespodzianką, że ten system klucza jako ścieżki pliku jest szalenie zoptymalizowany. Próbowałem znaleźć równowagę: żebym nie musiał wysyłać wielu żądań get, ale aby żądania były szybko wykonywane. Okazało się, że najlepiej zrobić około 20 tysięcy plików bin. Myślę, że jeśli będziemy kontynuować optymalizację, możemy osiągnąć wzrost szybkości (na przykład tworząc specjalny segment tylko na dane, zmniejszając w ten sposób rozmiar tabeli przeglądowej). Ale na dalsze eksperymenty nie było czasu ani pieniędzy.

A co z kompatybilnością krzyżową?

Czego się nauczyłem: Główną przyczyną marnowania czasu jest przedwczesna optymalizacja metody przechowywania.

W tym momencie bardzo ważne jest, aby zadać sobie pytanie: „Po co używać zastrzeżonego formatu pliku?” Powodem jest szybkość ładowania (ładowanie spakowanych plików CSV trwało 7 razy dłużej) i kompatybilność z naszymi przepływami pracy. Mogę ponownie rozważyć, czy R może łatwo załadować pliki Parquet (lub Arrow) bez ładowania Spark. Wszyscy w naszym laboratorium używają języka R i jeśli będę musiał przekonwertować dane na inny format, nadal mam oryginalne dane tekstowe, więc mogę po prostu ponownie uruchomić potok.

Podział pracy

Czego się nauczyłem?: Nie próbuj ręcznie optymalizować zadań, pozwól, aby zrobił to komputer.

Debugowałem przepływ pracy na jednym chromosomie, teraz muszę przetworzyć wszystkie pozostałe dane.
Chciałem podnieść kilka instancji EC2 do konwersji, ale jednocześnie obawiałem się bardzo niezrównoważonego obciążenia między różnymi zadaniami przetwarzania (tak jak Spark cierpiał z powodu niezrównoważonych partycji). Ponadto nie byłem zainteresowany podnoszeniem jednej instancji na chromosom, ponieważ dla kont AWS domyślny limit wynosi 10 instancji.

Następnie zdecydowałem się napisać skrypt w R, aby zoptymalizować zadania przetwarzania.

Najpierw poprosiłem S3 o obliczenie, ile miejsca zajmuje każdy chromosom.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Następnie napisałem funkcję, która pobiera całkowity rozmiar, tasuje kolejność chromosomów, dzieli je na grupy num_jobs i informuje, jak różne są rozmiary wszystkich zadań przetwarzania.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Potem przejrzałem tysiąc przetasowań, używając mruczenia i wybrałem najlepsze.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

W rezultacie otrzymałem zestaw zadań o bardzo podobnym rozmiarze. Potem pozostało już tylko zawinąć mój poprzedni skrypt Bash w dużą pętlę for. Napisanie tej optymalizacji zajęło około 10 minut. A to znacznie mniej, niż wydałbym na ręczne tworzenie zadań, gdyby były niezrównoważone. Dlatego myślę, że miałem rację z tą wstępną optymalizacją.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Na koniec dodaję polecenie zamknięcia:

sudo shutdown -h now

... i wszystko się udało! Korzystając z interfejsu CLI AWS, podniosłem instancje za pomocą opcji user_data przekazał im skrypty Bash zawierające zadania do przetworzenia. Działały i wyłączały się automatycznie, więc nie płaciłem za dodatkową moc obliczeniową.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pakujmy się!

Czego się nauczyłem?: Interfejs API powinien być prosty ze względu na łatwość i elastyczność użytkowania.

Wreszcie mam dane we właściwym miejscu i formie. Pozostało tylko maksymalnie uprościć proces wykorzystania danych, aby ułatwić pracę moim współpracownikom. Chciałem stworzyć prosty interfejs API do tworzenia żądań. Jeśli w przyszłości zdecyduję się na zmianę .rds do plików Parquet, to powinien być to problem dla mnie, a nie dla moich kolegów. W tym celu zdecydowałem się stworzyć wewnętrzny pakiet R.

Zbuduj i udokumentuj bardzo prosty pakiet zawierający tylko kilka funkcji dostępu do danych zorganizowanych wokół funkcji get_snp. Zrobiłem też stronę internetową dla moich kolegów pkgdown, dzięki czemu mogą łatwo zapoznać się z przykładami i dokumentacją.

Analizowanie 25 TB przy użyciu AWK i R

Inteligentne buforowanie

Czego się nauczyłem?: Jeśli Twoje dane są dobrze przygotowane, buforowanie będzie łatwe!

Ponieważ jeden z głównych przepływów pracy zastosował ten sam model analizy do pakietu SNP, zdecydowałem się wykorzystać binowanie na swoją korzyść. Podczas przesyłania danych poprzez SNP wszystkie informacje z grupy (bin) są dołączane do zwracanego obiektu. Oznacza to, że stare zapytania mogą (w teorii) przyspieszyć przetwarzanie nowych zapytań.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Budując pakiet, przeprowadziłem wiele testów porównawczych, aby porównać prędkość przy użyciu różnych metod. Radzę nie zaniedbywać tego, ponieważ czasami rezultaty są nieoczekiwane. Na przykład, dplyr::filter było znacznie szybsze niż przechwytywanie wierszy przy użyciu filtrowania opartego na indeksowaniu, a pobieranie pojedynczej kolumny z przefiltrowanej ramki danych było znacznie szybsze niż przy użyciu składni indeksowania.

Należy pamiętać, że obiekt prev_snp_results zawiera klucz snps_in_bin. Jest to tablica wszystkich unikalnych SNP w grupie (bin), pozwalająca szybko sprawdzić, czy posiadasz już dane z poprzedniego zapytania. Ułatwia także przeglądanie wszystkich SNP w grupie (bin) za pomocą tego kodu:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

wyniki

Teraz możemy (i zaczęliśmy na poważnie) uruchamiać modele i scenariusze, które wcześniej były dla nas niedostępne. Najlepsze jest to, że moi koledzy z laboratorium nie muszą myśleć o żadnych komplikacjach. Mają po prostu funkcję, która działa.

I chociaż pakiet oszczędza im szczegółów, starałem się uprościć format danych, aby mogli się zorientować, gdybym jutro nagle zniknął...

Prędkość wyraźnie wzrosła. Zwykle skanujemy funkcjonalnie istotne fragmenty genomu. Wcześniej nie mogliśmy tego zrobić (okazało się to zbyt kosztowne), ale teraz, dzięki strukturze grupowej (bin) i buforowaniu, żądanie jednego SNP zajmuje średnio mniej niż 0,1 sekundy, a zużycie danych jest tak niskie że koszt S3 to grosze.

wniosek

Artykuł ten w żadnym wypadku nie jest przewodnikiem. Rozwiązanie okazało się indywidualne i prawie na pewno nie optymalne. Jest to raczej dziennik podróży. Chcę, żeby inni zrozumieli, że takie decyzje nie pojawiają się w pełni w głowie, są wynikiem prób i błędów. Ponadto, jeśli szukasz analityka danych, pamiętaj, że skuteczne korzystanie z tych narzędzi wymaga doświadczenia, a doświadczenie kosztuje. Cieszę się, że miałem środki, żeby zapłacić, ale wielu innych, którzy potrafią wykonywać tę samą pracę lepiej ode mnie, nigdy nie będzie miało okazji z powodu braku pieniędzy choćby spróbować.

Narzędzia Big Data są wszechstronne. Jeśli masz czas, prawie na pewno możesz napisać szybsze rozwiązanie, korzystając z inteligentnych technik czyszczenia, przechowywania i ekstrakcji danych. Ostatecznie sprowadza się to do analizy kosztów i korzyści.

Czego się nauczyłem:

  • nie ma taniego sposobu na jednoczesne przeanalizowanie 25 TB;
  • Uważaj na rozmiar plików Parquet i ich organizację;
  • Partycje w Sparku muszą być zrównoważone;
  • Ogólnie rzecz biorąc, nigdy nie próbuj tworzyć 2,5 miliona partycji;
  • Sortowanie jest nadal trudne, podobnie jak konfiguracja Sparka;
  • czasami specjalne dane wymagają specjalnych rozwiązań;
  • Agregacja iskier jest szybka, ale partycjonowanie jest nadal drogie;
  • nie śpij, gdy uczą cię podstaw, ktoś prawdopodobnie rozwiązał już twój problem w latach 1980-tych;
  • gnu parallel - to magiczna rzecz, każdy powinien z niej korzystać;
  • Spark lubi nieskompresowane dane i nie lubi łączyć partycji;
  • Spark ma zbyt duże obciążenie przy rozwiązywaniu prostych problemów;
  • Tablice asocjacyjne AWK są bardzo wydajne;
  • możesz się skontaktować stdin и stdout ze skryptu R i dlatego użyj go w potoku;
  • Dzięki implementacji inteligentnej ścieżki S3 może przetwarzać wiele plików;
  • Głównym powodem marnowania czasu jest przedwczesna optymalizacja metody przechowywania;
  • nie próbuj ręcznie optymalizować zadań, pozwól, aby zrobił to komputer;
  • API powinno być proste ze względu na łatwość i elastyczność użytkowania;
  • Jeśli Twoje dane są dobrze przygotowane, buforowanie będzie łatwe!

Źródło: www.habr.com

Dodaj komentarz