Разбор на 25TB с помощта на AWK и R

Разбор на 25TB с помощта на AWK и R
Как да прочетете тази статия: Извинявам се, че текстът е толкова дълъг и хаотичен. За да ви спестя време, започвам всяка глава с въведение „Какво научих“, което обобщава същността на главата в едно или две изречения.

„Просто ми покажете решението!“ Ако просто искате да видите откъде идвам, тогава преминете към главата „Да станем по-изобретателни“, но мисля, че е по-интересно и полезно да прочетете за провала.

Наскоро ми беше възложено да създам процес за обработка на голям обем необработени ДНК последователности (технически SNP чип). Необходимостта беше бързо да се получат данни за дадено генетично местоположение (наречено SNP) за последващо моделиране и други задачи. Използвайки R и AWK, успях да изчистя и организирам данни по естествен начин, което значително ускори обработката на заявките. Това не беше лесно за мен и изискваше много повторения. Тази статия ще ви помогне да избегнете някои от моите грешки и ще ви покаже до какво стигнах.

Първо, някои уводни обяснения.

Данни

Нашият университетски център за обработка на генетична информация ни предостави данни под формата на 25 TB TSV. Получих ги разделени на 5 пакета, компресирани с Gzip, всеки от които съдържаше около 240 четиригигабайтови файла. Всеки ред съдържа данни за един SNP от един индивид. Общо бяха предадени данни за ~2,5 милиона SNP и ~60 хиляди души. В допълнение към информацията за SNP, файловете съдържат множество колони с числа, отразяващи различни характеристики, като интензивност на четене, честота на различни алели и др. Общо имаше около 30 колони с уникални стойности.

Цел

Както при всеки проект за управление на данни, най-важното беше да се определи как ще се използват данните. В такъв случай най-вече ще избираме модели и работни процеси за SNP, базирани на SNP. Тоест ще ни трябват данни само за един SNP в даден момент. Трябваше да се науча как да извличам всички записи, свързани с един от 2,5 милиона SNP, възможно най-лесно, бързо и евтино.

Как да не правим това

Да цитирам подходящо клише:

Не съм се провалил хиляди пъти, просто открих хиляди начини да избегна анализирането на куп данни в удобен за заявки формат.

Първи опит

Какво научих: Няма евтин начин да анализирате 25 TB наведнъж.

След като взех курса „Усъвършенствани методи за обработка на големи данни“ в университета Вандербилт, бях сигурен, че номерът е в чантата. Вероятно ще отнеме час или два, за да настроите Hive сървъра да премине през всички данни и да отчете резултата. Тъй като данните ни се съхраняват в AWS S3, използвах услугата Атина, което ви позволява да прилагате Hive SQL заявки към S3 данни. Не е необходимо да настройвате/повишавате Hive клъстер и също така плащате само за данните, които търсите.

След като показах на Athena моите данни и техния формат, проведох няколко теста със заявки като тези:

select * from intensityData limit 10;

И бързо получи добре структурирани резултати. Готов.

Докато не се опитахме да използваме данните в нашата работа...

Помолиха ме да извадя цялата информация за SNP, за да тествам модела. Пуснах заявката:


select * from intensityData 
where snp = 'rs123456';

...и започна да чака. След осем минути и повече от 4 TB заявени данни получих резултата. Athena таксува според обема намерени данни, $5 на терабайт. Така че тази единствена заявка струва $20 и осем минути чакане. За да стартираме модела върху всички данни, трябваше да чакаме 38 години и да платим $50 млн. Очевидно това не беше подходящо за нас.

Наложи се паркет...

Какво научих: Внимавайте с размера на вашите паркетни пили и тяхната организация.

Първо се опитах да поправя ситуацията, като конвертирах всички TSV в Пили за паркет. Те са удобни за работа с големи масиви от данни, тъй като информацията в тях се съхранява в колонна форма: всяка колона се намира в собствен сегмент от памет/диск, за разлика от текстовите файлове, в които редовете съдържат елементи от всяка колона. И ако трябва да намерите нещо, просто прочетете необходимата колона. Освен това всеки файл съхранява диапазон от стойности в колона, така че ако стойността, която търсите, не е в диапазона на колоната, Spark няма да губи време за сканиране на целия файл.

Изпълних проста задача AWS лепило за да конвертираме нашите TSV в Parquet и пуснахме новите файлове в Athena. Отне около 5 часа. Но когато пуснах заявката, завършването отне приблизително същото време и малко по-малко пари. Факт е, че Spark, опитвайки се да оптимизира задачата, просто разопакова едно TSV парче и го постави в собствено парче Parquet. И тъй като всяка част беше достатъчно голяма, за да съдържа всички записи на много хора, всеки файл съдържаше всички SNP, така че Spark трябваше да отвори всички файлове, за да извлече необходимата информация.

Интересното е, че стандартният (и препоръчителен) тип компресия на Parquet, snappy, не може да се разделя. Следователно всеки изпълнител беше заседнал със задачата да разопакова и изтегли пълния набор от данни от 3,5 GB.

Разбор на 25TB с помощта на AWK и R

Нека разберем проблема

Какво научих: Сортирането е трудно, особено ако данните са разпределени.

Струваше ми се, че сега разбирам същността на проблема. Трябваше само да сортирам данните по SNP колона, а не по хора. Тогава няколко SNP ще бъдат съхранени в отделна част от данни и тогава „интелигентната“ функция на Parquet „отворена само ако стойността е в диапазона“ ще се покаже в цялата си слава. За съжаление, сортирането на милиарди редове, разпръснати в клъстер, се оказа трудна задача.

AWS определено не иска да възстанови сумата поради причината „Аз съм разсеян ученик“. След като пуснах сортиране на Amazon Glue, той работи 2 дни и се срина.

Какво ще кажете за разделяне?

Какво научих: Разделенията в Spark трябва да бъдат балансирани.

Тогава ми хрумна идеята за разделяне на данни в хромозоми. Има 23 от тях (и още няколко, ако вземете предвид митохондриалната ДНК и некартираните региони).
Това ще ви позволи да разделите данните на по-малки части. Ако добавите само един ред към функцията за експортиране на Spark в Glue скрипта partition_by = "chr", тогава данните трябва да бъдат разделени на кофи.

Разбор на 25TB с помощта на AWK и R
Геномът се състои от множество фрагменти, наречени хромозоми.

За съжаление не се получи. Хромозомите имат различни размери, което означава различно количество информация. Това означава, че задачите, които Spark изпрати на работниците, не бяха балансирани и завършени бавно, защото някои възли приключиха рано и бяха неактивни. Задачите обаче бяха изпълнени. Но когато се поиска един SNP, дисбалансът отново създаде проблеми. Разходите за обработка на SNP на по-големи хромозоми (т.е. там, където искаме да получим данни) са намалели само с около фактор 10. Много, но не достатъчно.

Ами ако го разделим на още по-малки части?

Какво научих: Никога не се опитвайте да правите 2,5 милиона дяла.

Реших да направя всичко възможно и разделих всеки SNP. Това гарантира, че дяловете са с еднакъв размер. ТОВА БЕШЕ ЛОША ИДЕЯ. Използвах лепило и добавих невинна линия partition_by = 'snp'. Задачата стартира и започна да се изпълнява. Ден по-късно проверих и видях, че все още няма нищо записано на S3, така че убих задачата. Изглежда, че Glue е записвал междинни файлове на скрито място в S3, много файлове, може би няколко милиона. В резултат на това грешката ми струваше повече от хиляда долара и не зарадва ментора ми.

Разделяне + сортиране

Какво научих: Сортирането все още е трудно, както и настройката на Spark.

Последният ми опит за разделяне включваше разделяне на хромозомите и след това сортиране на всеки дял. На теория това би ускорило всяка заявка, тъй като желаните SNP данни трябваше да бъдат в рамките на няколко парчета Parquet в даден диапазон. За съжаление, сортирането дори на разделени данни се оказа трудна задача. В резултат на това преминах към EMR за персонализиран клъстер и използвах осем мощни екземпляра (C5.4xl) и Sparklyr, за да създам по-гъвкав работен процес...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...задачата обаче все още не беше изпълнена. Конфигурирах го по различни начини: увеличих разпределението на паметта за всеки изпълнител на заявка, използвах възли с голямо количество памет, използвах променливи за излъчване (променливи за излъчване), но всеки път това се оказваше половинчата мярка и постепенно изпълнителите започнаха да се проваля, докато всичко спре.

Ставам по-креативен

Какво научих: Понякога специалните данни изискват специални решения.

Всеки SNP има стойност на позиция. Това е число, съответстващо на броя на базите по неговата хромозома. Това е хубав и естествен начин за организиране на нашите данни. Първоначално исках да разделя по региони на всяка хромозома. Например позиции 1 - 2000, 2001 - 4000 и т.н. Но проблемът е, че SNP не са равномерно разпределени в хромозомите, така че размерите на групите ще варират значително.

Разбор на 25TB с помощта на AWK и R

В резултат стигнах до разбивка на позициите по категории (ранг). Използвайки вече изтеглените данни, пуснах заявка за получаване на списък с уникални SNP, техните позиции и хромозоми. След това сортирах данните във всяка хромозома и събрах SNPs в групи (bin) с даден размер. Да кажем 1000 SNP всеки. Това ми даде връзката SNP към група за хромозома.

В крайна сметка направих групи (bin) от 75 SNP, причината ще бъде обяснена по-долу.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Първо опитайте със Spark

Какво научих: Агрегирането на Spark е бързо, но разделянето все още е скъпо.

Исках да прочета този малък (2,5 милиона реда) кадър с данни в Spark, да го комбинирам с необработените данни и след това да го разделя с новодобавената колона bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

използвах sdf_broadcast(), така че Spark знае, че трябва да изпрати рамката с данни до всички възли. Това е полезно, ако данните са малки по размер и са необходими за всички задачи. В противен случай Spark се опитва да бъде умен и разпространява данни според нуждите, което може да причини забавяне.

И отново идеята ми не проработи: задачите работиха известно време, завършиха обединението и след това, подобно на изпълнителите, стартирани чрез разделяне, започнаха да се провалят.

Добавяне на AWK

Какво научих: Не спете, когато ви учат на основите. Със сигурност някой вече е решил проблема ви през 1980-те години.

До този момент причината за всичките ми неуспехи със Spark беше бъркотията от данни в клъстера. Може би ситуацията може да се подобри с предварителна обработка. Реших да опитам да разделя необработените текстови данни на колони от хромозоми, така че се надявах да осигуря на Spark „предварително разделени“ данни.

Търсих в StackOverflow как да разделя по стойности на колони и намерих толкова страхотен отговор. С AWK можете да разделите текстов файл по стойности на колони, като го напишете в скрипт, вместо да изпращате резултатите до stdout.

Написах Bash скрипт, за да го изпробвам. Изтеглих един от пакетираните TSV файлове, след което го разопаковах с помощта на gzip и изпратен до awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Проработи!

Пълнене на сърцевините

Какво научих: gnu parallel - това е магическо нещо, всеки трябва да го използва.

Раздялата беше доста бавна и когато започнах htopза да проверя използването на мощен (и скъп) екземпляр EC2, се оказа, че използвам само едно ядро ​​и около 200 MB памет. За да разрешим проблема и да не загубим много пари, трябваше да измислим как да паралелизираме работата. За щастие, в една абсолютно невероятна книга Data Science в командния ред Намерих глава от Jeron Janssens за паралелизиране. От него научих за gnu parallel, много гъвкав метод за прилагане на многопоточност в Unix.

Разбор на 25TB с помощта на AWK и R
Когато започнах разделянето с помощта на новия процес, всичко беше наред, но все още имаше пречка - изтеглянето на S3 обекти на диск не беше много бързо и не беше напълно паралелизирано. За да поправя това, направих следното:

  1. Открих, че е възможно да внедря етапа на изтегляне на S3 директно в тръбопровода, напълно елиминирайки междинното съхранение на диск. Това означава, че мога да избегна записването на необработени данни на диск и да използвам дори по-малко и следователно по-евтино хранилище на AWS.
  2. Екип aws configure set default.s3.max_concurrent_requests 50 значително увеличи броя на нишките, които AWS CLI използва (по подразбиране има 10).
  3. Преминах на инстанция EC2, оптимизирана за скорост на мрежата, с буквата n в името. Открих, че загубата на процесорна мощност при използване на n-инстанции е повече от компенсирана от увеличаването на скоростта на зареждане. За повечето задачи използвах c5n.4xl.
  4. Променен gzip на pigz, това е gzip инструмент, който може да прави страхотни неща за паралелизиране на първоначално непаралелизираната задача за декомпресиране на файлове (това помогна най-малко).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Тези стъпки се комбинират една с друга, за да може всичко да работи много бързо. Чрез увеличаване на скоростта на изтегляне и елиминиране на записите на диск, вече мога да обработя пакет от 5 терабайта само за няколко часа.

Този туит трябваше да споменава „TSV“. уви

Използване на нови анализирани данни

Какво научих: Spark харесва некомпресирани данни и не обича комбинирането на дялове.

Сега данните бяха в S3 в неопакован (да се чете: споделен) и полуподреден формат и можех отново да се върна към Spark. Очакваше ме изненада: отново не успях да постигна това, което исках! Беше много трудно да се каже на Spark точно как са разделени данните. И дори когато направих това, се оказа, че има твърде много дялове (95 хиляди) и когато използвах coalesce намалих техния брой до разумни граници, това унищожи моя дял. Сигурен съм, че това може да се поправи, но след няколко дни търсене не можах да намеря решение. В крайна сметка завърших всички задачи в Spark, въпреки че отне известно време и моите разделени файлове на Parquet не бяха много малки (~200 KB). Данните обаче бяха там, където бяха необходими.

Разбор на 25TB с помощта на AWK и R
Твърде малко и неравномерно, прекрасно!

Тестване на локални заявки на Spark

Какво научих: Spark има твърде много разходи при решаване на прости проблеми.

Като изтеглих данните в умен формат, успях да тествам скоростта. Настройте R скрипт за стартиране на локален сървър на Spark и след това заредете кадър с данни на Spark от посоченото групово хранилище на Parquet (bin). Опитах се да заредя всички данни, но не можах да накарам Sparklyr да разпознае разделянето.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Изпълнението отне 29,415 секунди. Много по-добър, но не твърде добър за масово тестване на каквото и да било. Освен това не можах да ускоря нещата с кеширане, защото когато се опитах да кеширам кадър с данни в паметта, Spark винаги се срива, дори когато разпределих повече от 50 GB памет за набор от данни, който тежеше по-малко от 15.

Върнете се към AWK

Какво научих: Асоциативните масиви в AWK са много ефективни.

Разбрах, че мога да постигна по-високи скорости. Спомних си това в чудесен AWK урок от Брус Барнет Прочетох за страхотна функция, наречена „асоциативни масиви" По същество това са двойки ключ-стойност, които по някаква причина бяха наречени по различен начин в AWK и затова някак си не мислех много за тях. Роман Чепляка припомни, че терминът „асоциативни масиви“ е много по-стар от термина „двойка ключ-стойност“. Дори ако ти потърсете ключ-стойност в Google Ngram, няма да видите този термин там, но ще намерите асоциативни масиви! В допълнение, „двойката ключ-стойност“ най-често се свързва с бази данни, така че има много по-голям смисъл да се сравнява с hashmap. Осъзнах, че мога да използвам тези асоциативни масиви, за да асоциирам моите SNPs с bin таблица и необработени данни, без да използвам Spark.

За да направя това, в AWK скрипта използвах блока BEGIN. Това е част от кода, който се изпълнява преди първият ред от данни да бъде предаден към основната част на скрипта.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Отбор while(getline...) зареди всички редове от CSV групата (bin), задайте първата колона (SNP име) като ключ за асоциативния масив bin и втората стойност (група) като стойност. След това в блока { }, който се изпълнява на всички редове на основния файл, всеки ред се изпраща на изходния файл, който получава уникално име в зависимост от неговата група (bin): ..._bin_"bin[$1]"_....

променливи batch_num и chunk_id съответства на данните, предоставени от тръбопровода, избягвайки състояние на състезание и всяка нишка за изпълнение се изпълнява parallel, записа в собствения си уникален файл.

Тъй като разпръснах всички необработени данни в папки на хромозоми, останали от предишния ми експеримент с AWK, сега мога да напиша друг Bash скрипт, за да обработвам една хромозома наведнъж и да изпращам по-дълбоко разделени данни към S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Сценарият има два раздела parallel.

В първия раздел данните се четат от всички файлове, съдържащи информация за желаната хромозома, след което тези данни се разпределят между нишки, които разпределят файловете в съответните групи (bin). За да се избегнат условия на състезание, когато множество нишки пишат в един и същ файл, AWK предава имената на файловете, за да записва данни на различни места, напр. chr_10_bin_52_batch_2_aa.csv. В резултат на това на диска се създават много малки файлове (за това използвах терабайтови EBS томове).

Конвейер от втора секция parallel преминава през групите (bin) и комбинира отделните им файлове в общ CSV c catи след това ги изпраща за износ.

Излъчване в R?

Какво научих: Можете да се свържете stdin и stdout от R скрипт и следователно го използвайте в конвейера.

Може би сте забелязали този ред във вашия Bash скрипт: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Той превежда всички свързани групови файлове (bin) в R скрипта по-долу. {} е специална техника parallel, който вмъква всички данни, които изпраща към посочения поток директно в самата команда. опция {#} предоставя уникален идентификатор на нишка и {%} представлява номера на слота за работа (повтарящо се, но никога едновременно). Списък с всички опции можете да намерите в документация.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Когато променлива file("stdin") предадено на readr::read_csv, данните, преведени в R скрипта, се зареждат в рамка, която след това е във формуляра .rds- използване на файл aws.s3 написан директно на S3.

RDS е нещо като младша версия на Parquet, без излишните украшения за съхранение на високоговорители.

След като завърших Bash скрипта, получих пакет .rds-файлове, разположени в S3, което ми позволи да използвам ефективна компресия и вградени типове.

Въпреки използването на спирачка R, всичко работи много бързо. Не е изненадващо, че частите на R, които четат и записват данни, са силно оптимизирани. След тестване на една хромозома със среден размер, работата приключи на екземпляр C5n.4xl за около два часа.

S3 Ограничения

Какво научих: Благодарение на внедряването на интелигентен път, S3 може да обработва много файлове.

Притесних се дали S3 ще може да се справи с многото файлове, които бяха прехвърлени към него. Бих могъл да направя имената на файловете смислени, но как S3 ще ги търси?

Разбор на 25TB с помощта на AWK и R
Папките в S3 са само за показ, всъщност системата не се интересува от символа /. От страницата с често задавани въпроси на S3.

Изглежда, че S3 представя пътя до конкретен файл като прост ключ в нещо като хеш таблица или база данни, базирана на документи. Кофата може да се разглежда като таблица, а файловете могат да се считат за записи в тази таблица.

Тъй като скоростта и ефективността са важни за правенето на печалба в Amazon, не е изненадващо, че тази система за ключ като път към файл е адски оптимизирана. Опитах се да намеря баланс: така че да не се налага да правя много заявки за получаване, но заявките да се изпълняват бързо. Оказа се, че е най-добре да направите около 20 хиляди bin файла. Мисля, че ако продължим да оптимизираме, можем да постигнем увеличение на скоростта (например да направим специална кофа само за данни, като по този начин намалим размера на справочната таблица). Но нямаше време и пари за по-нататъшни експерименти.

Какво ще кажете за кръстосаната съвместимост?

Какво научих: Причина номер едно за загуба на време е преждевременното оптимизиране на вашия метод за съхранение.

В този момент е много важно да се запитате: „Защо да използваме собствен файлов формат?“ Причината се крие в скоростта на зареждане (зареждането на gzip CSV файлове отне 7 пъти повече време) и съвместимостта с нашите работни процеси. Може да преразгледам дали R може лесно да зарежда файлове на Parquet (или Arrow) без натоварването на Spark. Всички в нашата лаборатория използват R и ако трябва да конвертирам данните в друг формат, все още имам оригиналните текстови данни, така че мога просто да стартирам конвейера отново.

Разделяне на работата

Какво научих: Не се опитвайте да оптимизирате задачите ръчно, оставете компютъра да го направи.

Отстраних грешки в работния процес на една хромозома, сега трябва да обработя всички останали данни.
Исках да създам няколко екземпляра на EC2 за преобразуване, но в същото време се страхувах да не получа много небалансирано натоварване в различни задания за обработка (точно както Spark страдаше от небалансирани дялове). Освен това не се интересувах от повишаване на един екземпляр на хромозома, защото за AWS акаунти има ограничение по подразбиране от 10 екземпляра.

Тогава реших да напиша скрипт в R, за да оптимизирам заданията за обработка.

Първо, помолих S3 да изчисли колко място за съхранение заема всяка хромозома.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

След това написах функция, която взема общия размер, разбърква реда на хромозомите, разделя ги на групи num_jobs и ви казва колко различни са размерите на всички задания за обработка.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

След това прегледах хиляди разбърквания с помощта на purrr и избрах най-доброто.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Така че завърших с набор от задачи, които бяха много сходни по размер. Тогава всичко, което остана, беше да увия предишния си Bash скрипт в голям цикъл for. Написването на тази оптимизация отне около 10 минути. И това е много по-малко, отколкото бих похарчил за ръчно създаване на задачи, ако бяха небалансирани. Затова смятам, че бях прав с тази предварителна оптимизация.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

В края добавям командата за изключване:

sudo shutdown -h now

... и всичко се получи! Използвайки AWS CLI, повдигнах екземпляри, използвайки опцията user_data им даде Bash скриптове на техните задачи за обработка. Те стартираха и се изключиха автоматично, така че не плащах за допълнителна процесорна мощност.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Да опаковаме багажа!

Какво научих: API трябва да е прост в името на лекотата и гъвкавостта на използване.

Най-накрая получих данните на правилното място и във форма. Оставаше само да опростя процеса на използване на данни, доколкото е възможно, за да улесня колегите си. Исках да направя прост API за създаване на заявки. Ако в бъдеще реша да премина от .rds до Пили за паркет, то това би трябвало да е проблем за мен, не за колегите. За това реших да направя вътрешен R пакет.

Създайте и документирайте много прост пакет, съдържащ само няколко функции за достъп до данни, организирани около функция get_snp. Направих и сайт за колегите pkgdown, за да могат лесно да видят примери и документация.

Разбор на 25TB с помощта на AWK и R

Интелигентно кеширане

Какво научих: Ако вашите данни са добре подготвени, кеширането ще бъде лесно!

Тъй като един от основните работни потоци приложи същия модел за анализ към SNP пакета, реших да използвам групиране в моя полза. При предаване на данни чрез SNP цялата информация от групата (bin) се прикачва към върнатия обект. Тоест старите заявки могат (на теория) да ускорят обработката на нови заявки.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

При изграждането на пакета проведох много бенчмаркове, за да сравня скоростта при използване на различни методи. Препоръчвам да не пренебрегвате това, защото понякога резултатите са неочаквани. Например, dplyr::filter беше много по-бързо от заснемането на редове с помощта на филтриране, базирано на индексиране, а извличането на една колона от филтрирана рамка с данни беше много по-бързо, отколкото използването на синтаксис на индексиране.

Моля, имайте предвид, че обектът prev_snp_results съдържа ключа snps_in_bin. Това е масив от всички уникални SNP в група (bin), което ви позволява бързо да проверите дали вече имате данни от предишна заявка. Освен това улеснява преминаването през всички SNP в група (bin) с този код:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

резултати

Сега можем (и започнахме сериозно) да управляваме модели и сценарии, които преди бяха недостъпни за нас. Най-хубавото е, че колегите ми от лабораторията не трябва да мислят за някакви усложнения. Те просто имат функция, която работи.

И въпреки че пакетът им спестява подробностите, опитах се да направя формата на данните достатъчно прост, за да могат да го разберат, ако внезапно изчезна утре...

Скоростта се увеличи значително. Обикновено сканираме функционално значими фрагменти от генома. Преди не можехме да направим това (оказа се твърде скъпо), но сега, благодарение на структурата на групата (bin) и кеширането, една заявка за един SNP отнема средно по-малко от 0,1 секунди, а използването на данни е толкова ниско че цената на S3 е фъстъци.

Заключение

Тази статия изобщо не е ръководство. Решението се оказа индивидуално и почти сигурно не оптимално. По-скоро е пътепис. Искам другите да разберат, че такива решения не изглеждат напълно оформени в главата, те са резултат от проба и грешка. Освен това, ако търсите специалист по данни, имайте предвид, че използването на тези инструменти ефективно изисква опит, а опитът струва пари. Щастлив съм, че имах средства да платя, но много други, които могат да вършат същата работа по-добре от мен, никога няма да имат възможност поради липса на пари дори да опитат.

Инструментите за големи данни са многостранни. Ако имате време, почти сигурно можете да напишете по-бързо решение, като използвате интелигентни техники за почистване, съхранение и извличане на данни. В крайна сметка всичко се свежда до анализ на разходите и ползите.

Какво научих:

  • няма евтин начин да анализирате 25 TB наведнъж;
  • внимавайте с размера на вашите паркетни пили и тяхната организация;
  • Разделите в Spark трябва да бъдат балансирани;
  • Като цяло, никога не се опитвайте да направите 2,5 милиона дяла;
  • Сортирането все още е трудно, както и настройката на Spark;
  • понякога специални данни изискват специални решения;
  • Spark агрегирането е бързо, но разделянето все още е скъпо;
  • не заспивай, когато те учат на основите, сигурно някой вече е решил проблема ти през 1980-те;
  • gnu parallel - това е магическо нещо, всеки трябва да го използва;
  • Spark харесва некомпресираните данни и не обича комбинирането на дялове;
  • Spark има твърде много разходи при решаване на прости проблеми;
  • Асоциативните масиви на AWK са много ефективни;
  • можете да се свържете stdin и stdout от R скрипт и следователно го използвайте в конвейера;
  • Благодарение на внедряването на интелигентен път, S3 може да обработва много файлове;
  • Основната причина за загуба на време е преждевременното оптимизиране на вашия метод за съхранение;
  • не се опитвайте да оптимизирате задачите ръчно, оставете компютъра да го направи;
  • API трябва да бъде прост за по-лесна и гъвкава употреба;
  • Ако вашите данни са добре подготвени, кеширането ще бъде лесно!

Източник: www.habr.com

Добавяне на нов коментар