Парсим 25TB за допомогою AWK та R

Парсим 25TB за допомогою AWK та R
Як читати цю статтю: перепрошую за те, що текст вийшов таким довгим і хаотичним Щоб заощадити ваш час, я кожний розділ починаю зі вступу «Чому я навчився», в якому одним-двома пропозиціями викладаю суть глави.

"Просто покажи рішення!" Якщо ви хочете лише побачити, до чого я прийшов, то переходьте до розділу «Стаю винахідливішим», але я вважаю, що цікавіше і корисніше почитати про невдачі.

Нещодавно мені доручили налаштувати процес обробки великого обсягу вихідних послідовностей ДНК (технічно це SNP-чіп). Потрібно було швидко отримувати дані про задане генетичне розташування (яке називається SNP) для подальшого моделювання та інших завдань. За допомогою R та AWK мені вдалося очистити та організувати дані природним чином, сильно прискоривши обробку запитів. Далося мені це нелегко і вимагало численних ітерацій. Ця стаття допоможе вам уникнути деяких моїх помилок і продемонструє, що ж мені врешті-решт вийшло.

Для початку деякі вступні пояснення.

Дані

Наш університетський центр обробки генетичної інформації надав нам дані у вигляді TSV об'ємом 25 Тб. Мені вони дісталися розбитими на 5 пакетів, стислих Gzip, кожен із яких містив близько 240 чотирьохгігабайтних файлів. Кожен ряд містив дані одного SNP одну людину. Усього були передані дані по ~2,5 млн SNP та ~60 тис. осіб. Крім SNP-інформації у файлах були численні колонки з числами, що відображають різні характеристики, такі як інтенсивність читання, частота різних алелів і т.д. Усього було близько 30 колонок із унікальними значеннями.

Мета

Як і в будь-якому проекті управління даними, найважливішим було визначити, як будуть використовуватися дані. В цьому випадку ми здебільшого підбиратимемо моделі та робочі процеси для SNP на основі SNP. Тобто одночасно нам будуть потрібні дані лише по одному SNP. Я повинен був навчитися якнайпростіше, швидше і дешевше витягувати всі записи, що стосуються одного з 2,5 мільйонів SNP.

Як цього не робити

Процитую відповідне кліше:

Я не терпів тисячу разів невдачу, я лише відкрив тисячу способів не розбивати купу даних у форматі, зручному для запитів.

Перша спроба

Чому я навчився: немає дешевого способу відпарсити 25 Тб за раз.

Прослухавши в Університеті Вандербільта предмет «Розширені методи обробки великих даних», я був певен, що справа в капелюсі. Мабуть, годину-дві піде на налаштування Hive-сервера, щоб пробігти за всіма даними і прозвітувати про результат. Оскільки наші дані зберігаються в AWS S3, я скористався сервісом Афінащо дозволяє застосовувати Hive SQL-запити до S3-даних. Не треба налаштовувати/піднімати Hive-кластер та ще й платиш тільки за ті дані, які шукаєш.

Після того як я показав Athena свої дані та їх формат, я прогнав кілька тестів із подібними запитами:

select * from intensityData limit 10;

І швидко отримав добре структуровані результати. Готово.

Поки що ми не спробували використовувати дані в роботі…

Мене попросили витягнути всю інформацію щодо SNP, щоб протестувати на ній модель. Я запустив запит:


select * from intensityData 
where snp = 'rs123456';

…і почав чекати. Через вісім хвилин і більше 4 Тб даних я отримав результат. Athena бере плату за обсяг знайдених даних, по $5 за терабайт. Так що цей єдиний запит коштував $20 і вісім хвилин очікування. Щоб прогнати модель за всіма даними, потрібно було прочекати 38 років та заплатити $50 млн. Очевидно, що нам це не пасувало.

Потрібно було використати Parquet…

Чому я навчився: будьте обережні з розміром ваших Parquet-файлів та їх організацією.

Спочатку я спробував виправити ситуацію, конвертувавши всі TSV в Parquet-файли. Вони зручні для роботи з великими наборами даних, тому що інформація в них зберігається в колонковому вигляді: кожна колонка лежить у власному сегменті пам'яті/диска, на відміну від текстових файлів, в яких рядки містять елементи кожної колонки. І якщо потрібно щось знайти, достатньо прочитати необхідну колонку. Крім того, у кожному файлі в колонці зберігається діапазон значень, тому якщо шукане значення відсутня в діапазоні колонки, Spark не витрачатиме час на сканування всього файлу.

Я запустив просте завдання Клей AWS перетворення наших TSV в Parquet і закинув нові файли в Athena. Це зайняло близько 5 години. Але коли я запустив запит, на його виконання пішло приблизно стільки ж часу і трохи менше грошей. Справа в тому, що Spark, намагаючись оптимізувати завдання, просто розпакував один TSV-чанк і поклав його у власний Parquet-чанк. І оскільки кожен чанк був досить великий і вміщав повні записи багатьох людей, то в кожному файлі зберігалися всі SNP, тому Spark'у доводилося відкривати всі файли, щоб отримати потрібну інформацію.

Цікаво, що тип компресії, що використовується за замовчуванням (і рекомендований) в Parquet - snappy, - не є роздільним (splitable). Тому кожен виконавець (executor) залипав завдання розпакування і завантажи повного датасета на 3,5 Гб.

Парсим 25TB за допомогою AWK та R

Розбираємось у проблемі

Чому я навчився: сортувати складно, особливо якщо дані розподілені.

Мені здавалося, що тепер я зрозумів суть проблеми. Мені потрібно було лише відсортувати дані по колонці SNP, а не людям. Тоді в окремому чанці даних зберігатиметься кілька SNP, і тоді себе у всій красі проявить «розумна» функція Parquet «відкривати тільки якщо значення знаходиться в діапазоні». На жаль, відсортувати мільярди рядків, розкиданих за кластером, виявилося складним завданням.

AWS точно не хоче повертати гроші через «Я розсіяний студент». Після того, як я запустив сортування на Amazon Glue, воно пропрацювало 2 дні і завершилося збоєм.

Що щодо партиціонування?

Чому я навчився: партиції Spark повинні бути збалансовані.

Потім мені спало на думку ідея партиціонувати дані в хромосомах. Їх 23 штуки (і ще кілька, якщо враховувати мітохондріальну ДНК та нерозшифровані (unmapped) області).
Це дозволить розділити дані на дрібніші порції. Якщо додати в Spark-функцію експорту в скрипті Glue лише один рядок partition_by = "chr", то дані мають бути розкладені за бакетами (buckets).

Парсим 25TB за допомогою AWK та R
Геном складається із численних фрагментів, які називаються хромосомами.

На жаль, це не спрацювало. У хромосом різні розміри, а отже, і різна кількість інформації. Це означає, що завдання, які Spark відправляв воркерам, не були збалансовані та виконувались повільно, тому що деякі вузли закінчували раніше та простоювали. Проте завдання було виконано. Але при запиті одного SNP незбалансованість знову спричинила проблеми. Вартість обробки SNP у більших хромосомах (тобто там, звідки ми і хочемо отримати дані) зменшилася лише приблизно в 10 разів. Багато, але мало.

А якщо поділити на ще дрібніші партиції?

Чому я навчився: взагалі ніколи не намагайтеся робити 2,5 мільйона партицій

Я вирішив піти на повній та партиціонував кожний SNP. Це гарантувало однаковий розмір партицій. ДРУГА БУЛА ІДЕЯ. Я скористався Glue і додав невинний рядок partition_by = 'snp'. Завдання запустилося і почало виконуватися. Через день я перевірив, і побачив, що в S3 досі нічого не записано, тому вбив завдання. Схоже, Glue записував проміжні файли у приховане місце у S3, і багато файлів, можливо, кілька мільйонів. В результаті моя помилка обійшлася більш ніж у тисячу доларів і не втішила мого наставника.

Партиціонування + сортування

Чому я навчився: сортувати все ще важко, як і налаштовувати Spark

Остання спроба партиціонування полягала в тому, що я партиціонував хромосоми, а потім сортував кожну партицію. Теоретично, це дозволяло б прискорити кожен запит, тому що бажані дані про SNP повинні були знаходитися в межах декількох Parquet-чанків всередині заданого діапазону. На жаль, сортування навіть партиціонованих даних виявилося важким завданням. В результаті я перейшов на EMR для кастомного кластера і використав вісім потужних інстансів (C5.4xl) та Sparklyr для створення більш гнучкого робочого процесу.

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

…проте завдання все одно так і не було виконано. Я налаштовував по-різному: збільшував виділення пам'яті кожного виконавця запитів, використовував вузли з великим обсягом пам'яті, застосовував широкомовні змінні (broadcasting variable), але щоразу це виявлялися напівзаходи, і поступово виконавці починали збоїти, доки усе зупинилося.

Стаю винахідливішим

Чому я навчився: Іноді особливі дані вимагають особливих рішень.

Кожен SNP має значення позиції. Це число, що відповідає кількості основ, що лежать уздовж його хромосоми. Це хороший та природний спосіб організації наших даних. Спочатку я хотів партиціонувати областями кожної хромосоми. Наприклад, позиції 1 - 2000, 2001 - 4000 і т.д. Але проблема в тому, що SNP розподілені по хромосомах нерівномірно, тому розмір груп буде сильно відрізнятися.

Парсим 25TB за допомогою AWK та R

В результаті я прийшов до розбиття за категоріями (rank) позицій. За вже завантаженими даними, я прогнав запит на отримання списку унікальних SNP, їх позицій та хромосом. Потім відсортував дані всередині кожної хромосоми і зібрав SNP групи (bin) заданого розміру. Скажімо, 1000 SNP. Це дало мені зв'язок SNP з групою-в-хромосомі.

Зрештою я зробив групи (bin) по 75 SNP, причину поясню нижче.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Перша спроба зі Spark

Чому я навчився: об'єднання в Spark працює швидко, але партиціонування все ще коштує дорого.

Я хотів прочитати цей маленький (2,5 млн рядків) кадр даних в Spark, об'єднати його з сирими даними, а потім партиціонувати по свіжододаній колонці bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Я використовував sdf_broadcast(), таким чином Spark дізнається, що він повинен відправити кадр даних у всі вузли. Це корисно, якщо дані невеликого розміру і потрібні всім завданням. Інакше Spark намагається бути розумним і розподіляє дані при необхідності, що може спричинити гальма.

І знову мій задум не спрацював: завдання якийсь час працювали, завершували об'єднання, а згодом, як і запущені партиціонуванням виконавці, почали збоїти.

Додаю AWK

Чому я навчився: не спіть, коли вам викладають основи Напевно, хтось уже вирішив вашу проблему ще в 1980-х.

До цього моменту причиною всіх моїх невдач зі Spark була перемішаність даних у кластері. Можливо, ситуацію можна покращити за допомогою попередньої обробки. Я вирішив спробувати розділити сирі текстові дані на колонки хромосом, тому я сподівався надати Spark'у «попередньо партиціоновані» дані.

Я пошукав на StackOverflow, як розбивати за значеннями стовпчиків, і знайшов така чудова відповідь. За допомогою AWK ви можете розділити текстовий файл за значеннями колонок, виконавши запис у скрипті, а не надсилаючи результати в stdout.

Для проби я написав Bash-скрипт. Завантажив один із запакованих TSV, потім розпакував його за допомогою gzip і відправив до awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Це спрацювало!

Заповнення ядер

Чому я навчився: gnu parallel - це чарівна річ, всі повинні її використати.

Поділ проходив досить повільно, і коли я запустив htop, щоб перевірити використання потужного (і дорогого) EC2-інстансу, виявилося, що я задіяю тільки одне ядро ​​і приблизно 200 Мб пам'яті. Щоб вирішити завдання і не втратити купу грошей, треба було придумати, як розпаралелити роботу. На щастя, у зовсім приголомшливій книзі Data Science в командному рядку Джерона Джанссенса я знайшов розділ, присвячений розпаралелювання. З неї я дізнався про gnu parallel, дуже гнучкий метод реалізації багатопоточності в Unix

Парсим 25TB за допомогою AWK та R
Коли я запустив поділ за допомогою нового процесу, все було чудово, але залишалося вузьке місце - завантаження S3-об'єктів на диск було не дуже швидким і не повністю розпаралеленим. Щоб це виправити, я зробив ось що:

  1. З'ясував, що можна прямо в конвеєрі реалізувати етап S3-скачування, виключивши повністю проміжне зберігання на диску. Це означає, що я можу уникнути запису сирих даних на диск і використовувати ще більш маленьке, а отже, і дешевше сховище на AWS.
  2. Командою aws configure set default.s3.max_concurrent_requests 50 сильно збільшила кількість потоків, які використовує AWS CLI (за замовчуванням їх 10).
  3. Перейшов на оптимізований за швидкістю мережі інстанс EC2, з літерою n у назві. Я виявив, що втрата обчислювальної потужності при використанні n-інстансів з надлишком компенсується збільшенням швидкості завантаження. Для більшості завдань я використав c5n.4xl.
  4. поміняв gzip на pigz, це gzip-інструмент, який вміє робити класні штуки для розпаралелювання спочатку не розпаралеленої задачі розпакування файлів (це допомогло найменше).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ці кроки скомбіновані один з одним, щоби все працювало дуже швидко. Завдяки збільшенню швидкості скачування та відмові від запису на диск я тепер міг обробити 5-терабайтний пакет лише за кілька годин.

У цьому твіті мало згадуватися 'TSV'. На жаль.

Використання заново відпарсенних даних

Чому я навчився: Spark любить стиснуті дані і не любить комбінувати партиції

Тепер дані лежали в S3 у незапакованому (читай, що розділяється) та напівупорядкованому форматі, і я міг знову повернутися до Spark'у. Мене чекав сюрприз: мені знову не вдалося досягти бажаного! Було дуже складно точно сказати Spark'у, як партизовані дані. І навіть коли я це зробив, виявилося, що партицій надто багато (95 тис.) і коли я за допомогою coalesce зменшив їхню кількість до розумних меж, це порушило моє партиціонування. Впевнений, це можна виправити, але за кілька днів пошуків мені не вдалося знайти рішення. Зрештою я доробив усі завдання в Spark, хоча це зайняло якийсь час, а мої розділені Parquet-файли були не дуже маленькими (~200 Кб). Однак, дані лежали там, де потрібно.

Парсим 25TB за допомогою AWK та R
Занадто маленькі та неоднакові, чудово!

Тестування локальних Spark-запитів

Чому я навчився: у Spark занадто багато накладних витрат при вирішенні простих завдань.

Завантаживши дані у продуманому форматі, я зміг протестувати швидкість. Налаштував скрипт на R для запуску локального Spark-сервера, а потім завантажив кадр даних Spark із зазначеного сховища Parquet-груп (bin). Я намагався завантажити всі дані, але не зміг змусити Sparklyr розпізнати партіціонування.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Виконання зайняло 29,415 секунд. Набагато краще, але не дуже добре для масового тестування чогось. Крім того, я не міг прискорити роботу за допомогою кешування, тому що коли я намагався кешувати в пам'яті кадр даних, Spark завжди падав, навіть коли я виділив більше 50 Гб пам'яті для датасету, який важив менше 15.

Повернення до AWK

Чому я навчився: асоціативні масиви в AWK дуже ефективні

Я розумів, що можу досягти вищої швидкості. Мені згадалося, що у чудовому посібнику з AWK Брюса Барнетта я читав про кльову фічу, яка називається «асоціативні масиви». По суті це пари ключ-значення, які чомусь в AWK назвали інакше, і тому я якось про них і не згадував особливо. Роман Чепляка нагадав, що термін «асоціативні масиви» набагато старший за термін «пара ключ-значення». навіть якщо ви шукаєте ключ-значення в Google Ngram, цей термін ви там не побачите, зате знайдете асоціативні масиви! До того ж пара ключ-значення найчастіше асоціюється базами даних, тому набагато логічніше порівнювати з hashmap. Я зрозумів, що можу використовувати ці асоціативні масиви для зв'язку моїх SNP із таблицею груп (bin table) та сирими даними без застосування Spark.

Для цього в AWK-скрипті я використав блок BEGIN. Це фрагмент коду, який виконується перед тим, як перший рядок даних буде передано в основне тіло скрипта.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Команда while(getline...) завантажила всі рядки з CSV групи (bin), задала першу колонку (назву SNP) як ключ для асоціативного масиву bin і друге значення (група) як значення. Потім у блоці { }, який виконується стосовно всіх рядків основного файлу, кожен рядок відправляється у вихідний файл, який отримує унікальне ім'я в залежності від його групи (bin): ..._bin_"bin[$1]"_....

Змінні batch_num и chunk_id відповідали даним, наданим конвеєром, що дозволило уникнути стану гонки, і кожен потік виконання, запущений parallelписав у свій унікальний файл.

Оскільки всі сирі дані я розкидав по папках хромосом, що залишилися після мого попереднього експерименту з AWK, тепер я міг написати інший Bash-скрипт, щоб обробляти хромосому за раз і віддавати в S3 глибше партиціоновані дані.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

У скрипті два розділи parallel.

У першому розділі зчитуються дані з усіх файлів, що містять інформацію про потрібну хромосому, потім ці дані розподіляються по потоках, які розкидають файли за відповідними групами (bin). Щоб не виникало стану гонки, коли кілька потоків записують в один файл, AWK передає імена файлів для запису даних у різні місця, наприклад, chr_10_bin_52_batch_2_aa.csv. В результаті на диску створюється безліч маленьких файлів (для цього я використав терабайтні EBS-томи).

Конвеєр з другого розділу parallel проходить групами (bin) і об'єднує їх окремі файли у загальні CSV c catа потім відправляє їх на експорт.

Транслювання R?

Чому я навчився: можна звертатися до stdin и stdout з R-скрипту, а значить і використовувати його в конвеєрі.

У Bash-скрипті ви могли помітити такий рядок: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Вона транслює всі конкатеновані файли групи (bin) в наведений нижче R-скрипт. {} є спеціальною методикою parallel, яка будь-які дані, що відправляються нею в зазначений потік дані вставляє в прямо в саму команду. Опція {#} надає унікальний ID потоку виконання, а {%} є номер слота завдання (повторюється, але ніколи одночасно). Список усіх опцій можна знайти у документації.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Коли змінна file("stdin") передається в readr::read_csv, дані, трансльовані в R-скрипт, завантажуються у кадр, який потім у вигляді .rds-файла за допомогою aws.s3 записується прямо у S3.

RDS - це щось на зразок молодшої версії Parquet, без вишукувань колонкового сховища.

Після завершення Bash-скрипту я отримав пачку .rds-файлів, що лежать у S3, що дозволило мені використовувати ефективне стиснення та вбудовані типи.

Незважаючи на використання гальмівного R все працювало дуже швидко. Не дивно, що фрагменти R, відповідальні за читання і запис даних, добре оптимізовані. Після тестування на одній хромосомі середнього розміру завдання виконалося на інстансі C5n.4xl приблизно за дві години.

Обмеження S3

Чому я навчився: завдяки розумній реалізації шляхів S3 може обробляти багато файлів.

Я турбувався, чи зможе S3 обробити безліч переданих їй файлів. Я міг зробити імена файлів осмисленими, але як S3 буде шукати за ними?

Парсим 25TB за допомогою AWK та R
Папки в S3 лише для краси, насправді систему не цікавить символ /. З FAQ сторінок S3.

Схоже, S3 представляє шлях до конкретного файлу у вигляді простого ключа у своєрідній хеш-таблиці чи базі даних на основі документів. Бакет (bucket) вважатимуться таблицею, а файли — записами у цій таблиці.

Оскільки швидкість та ефективність важливі для отримання прибутку в Amazon, не дивно, що ця система «ключ-у-якості-шляху-до-файлу» офігенно оптимізована. Я намагався знайти баланс: щоб не потрібно було робити безліч get-запитів, але при цьому запити виконувалися швидко. Виявилося, що найкраще робити близько 20 тис. bin-файлів. Думаю, якщо продовжити оптимізувати, то можна досягти підвищення швидкості (наприклад, робити спеціальний бакет лише для даних, таким чином зменшуючи розмір пошукової таблиці). Але на подальші експерименти вже не було часу та грошей.

Що щодо перехресної сумісності?

Чому я навчився: головною причиною втрати часу є передчасна оптимізація вашого методу зберігання.

У цей момент дуже важливо запитати себе: "Навіщо використовувати пропрієтарний формат файлів?" Причина у швидкості завантаження (запаковані gzip CSV-файли вантажилися в 7 разів довше) та сумісності з нашими робочими процесами. Я можу переглянути своє рішення якщо R зможе легко завантажувати файли Parquet (або Arrow) без навантаження у вигляді Spark. У нас у лабораторії всі використовують R, і якщо мені знадобиться перетворити дані на інший формат, то у мене все ще залишаються вихідні текстові дані, тому я можу просто знову запустити конвеєр.

Поділ роботи

Чому я навчився: не намагайтеся оптимізувати завдання вручну, нехай це робить комп'ютер.

Я налагодив робочий процес на одній хромосомі, тепер потрібно обробити всі інші дані.
Хотілося підняти кілька інстансів EC2 для перетворення, але в той же час я побоювався отримати вкрай незбалансоване навантаження в різних завданнях на обробку (так само як Spark страждав від незбалансованих партій). Крім того, мені не посміхалося піднімати по одному інстансу на кожну хромосому, тому що для AWS-акаунтів є стандартне обмеження в 10 інстансів.

Тоді я вирішив написати на R-скрипт для оптимізації завдань на обробку.

Спочатку попросив S3 обчислити, скільки місця у сховищі займає кожна хромосома.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Потім я написав функцію, яка бере загальний розмір, перемішує порядок хромосом, поділяє їх на групи num_jobs та повідомляє, наскільки різняться розміри всіх завдань на обробку.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Потім я прогнав за допомогою purrr тисячу перемішування і вибрав найкраще.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Так я отримав набір завдань дуже схожих за розміром. Потім залишалося тільки обернути мій попередній Bash-скрипт у великий цикл for. На написання цієї оптимізації пішло близько десяти хвилин. І це набагато менше, ніж я витратив би на ручне створення завдань у разі їх незбалансованості. Тому вважаю, що з цією попередньою оптимізацією я не помилився.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Наприкінці додаю команду вимкнення:

sudo shutdown -h now

… і все вийшло! За допомогою AWS CLI я піднімав інстанси та через опцію user_data передавав їм Bash-скрипти їхніх завдань на обробку. Вони виконувались і автоматично вимикалися, тому я не платив за зайву обчислювальну потужність.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Упаковуємо!

Чому я навчився: API має бути простим заради простоти та гнучкості використання.

Нарешті я отримав дані у потрібному місці та вигляді. Залишалося максимально спростити процес використання даних, щоб моїм колегам було легше. Я хотів зробити прості API для створення запитів. Якщо в майбутньому я вирішу перейти з .rds на Parquet-файли, то це має бути проблемою для мене, а не для колег. Для цього я вирішив зробити внутрішній R-пакет.

Зібрав та задокументував дуже простий пакет, що містить лише кілька функцій для доступу до даних, зібраних навколо функції get_snp. Також я зробив для колег сайт pkgdown, щоб вони могли легко подивитися приклади та документацію.

Парсим 25TB за допомогою AWK та R

Інтелектуальне кешування

Чому я навчився: якщо ваші дані добре підготовлені, кешувати буде просто!

Оскільки один з основних робочих процесів застосовував до пакету SNP одну й ту саму модель аналізу, я вирішив використати групування (binning) у своїх інтересах. При передачі даних по SNP, до об'єкта, що повертається, прикріплюється і вся інформація з групи (bin). Тобто старі запити можуть (теоретично) прискорювати обробку нових запитів.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

При складанні пакета я прогнав багато бенчмарків, щоб порівняти швидкість використання різних методів. Рекомендую цим не нехтувати, бо іноді результати бувають несподіваними. Наприклад, dplyr::filter виявився набагато швидше за захоплення рядків за допомогою фільтрації на основі індексування, а отримання однієї колонки з відфільтрованого кадру даних працювало набагато швидше, ніж застосування синтаксису індексування.

Зверніть увагу, що об'єкт prev_snp_results містить ключ snps_in_bin. Це масив усіх унікальних SNP у групі (bin), що дозволяє швидко перевіряти, чи є дані з попереднього запиту. Також він спрощує циклічний прохід по всіх SNP у групі (bin) за допомогою цього коду:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результати

Тепер ми можемо (і почали всерйоз) проганяти моделі та сценарії, які раніше нам недоступні. Найкраще, що моїм колегам по лабораторії не доводиться думати про всі складнощі. У них просто є функція, що працює.

І хоча пакет позбавляє їх від подробиць, я намагався зробити формат даних досить простим, щоб у ньому могли розібратися, якщо завтра я раптом зникну.

Швидкість помітно зросла. Зазвичай ми скануємо функціонально значимі фрагменти геному. Раніше ми не могли це робити (виходило занадто дорого), але тепер, завдяки груповій (bin) структурі та кешування, на запит одного SNP йде в середньому менше 0,1 секунди, а використання даних таке низьке, що витрати на виходять S3 копійчані.

Висновок

Ця стаття зовсім не керівництво. Рішення вийшло індивідуальне і майже напевно не оптимальне. Скоріше, це розповідь про подорож. Хочу, щоб інші зрозуміли, що подібні рішення не виникають у голові повністю сформованими, це результат спроб та помилок. Крім того, якщо ви шукаєте спеціаліста з аналізу даних, то враховуйте, що для ефективного використання цих інструментів потрібний досвід, а досвід потребує грошей. Я щасливий, що в мене були кошти на оплату, але у багатьох інших, хто може зробити ту ж роботу краще за мене, ніколи не буде такої можливості через відсутність грошей навіть на спробу.

Інструменти для великих даних є універсальними. Якщо у вас є час, то майже напевно зможете написати швидше рішення, застосувавши «розумну» очищення даних, сховище і методики вилучення. Зрештою, все зводиться до аналізу витрат і вигод.

Чому я навчився:

  • не існує дешевого способу відпарсувати 25 Тб за один раз;
  • будьте обережні з розміром ваших Parquet-файлів та їх організацією;
  • партиції в Spark мають бути збалансовані;
  • взагалі ніколи не намагайтеся робити 2,5 мільйонів партицій;
  • сортувати все ще важко, як і налаштовувати Spark;
  • іноді спеціальні дані вимагають спеціальних рішень;
  • об'єднання в Spark працює швидко, але партиціонування все ще коштує дорого;
  • не спіть, коли вам викладають основи, напевно, хтось уже вирішив вашу проблему ще в 1980-х;
  • gnu parallel - Це чарівна річ, всі повинні її використовувати;
  • Spark любить стиснуті дані і не любить комбінувати партиції;
  • у Spark занадто багато накладних витрат при вирішенні простих завдань;
  • асоціативні масиви в AWK дуже ефективні;
  • можна звертатися до stdin и stdout з R-скрипту, а значить і використовувати його в конвеєрі;
  • завдяки розумній реалізації шляхів S3 може обробляти багато файлів;
  • головна причина втрати часу – передчасна оптимізація вашого методу зберігання;
  • не намагайтеся оптимізувати завдання вручну, нехай це робить комп'ютер;
  • API має бути простим задля простоти та гнучкості використання;
  • Якщо ваші дані добре підготовлені, кешувати буде просто!

Джерело: habr.com

Додати коментар або відгук