Парсім 25TB з дапамогай AWK і R

Парсім 25TB з дапамогай AWK і R
Як чытаць гэты артыкул: прашу прабачэння за тое, што тэкст атрымаўся такім доўгім і хаатычным. Каб зэканоміць ваш час, я кожны раздзел пачынаю са ўступа «Чаму я навучыўся», у якім адным-двума прапановамі выкладаю істу раздзела.

«Проста пакажы рашэнне!» Калі вы хочаце ўсяго толькі ўбачыць, да чаго я прыйшоў, то пераходзіце да раздзела «Станаўлюся вынаходлівей», але я лічу, што цікавей і карысней пачытаць пра няўдачы.

Нядаўна мне даручылі наладзіць працэс апрацоўкі вялікага аб'ёму зыходных паслядоўнасцяў ДНК (тэхнічна гэта SNP-чып). Трэба было хутка атрымліваць дадзеныя аб зададзеным генетычным месцазнаходжанні (якое называецца SNP) для наступнага мадэлявання і іншых задач. З дапамогай R і AWK мне ўдалося ачысціць і арганізаваць дадзеныя натуральным чынам, моцна паскорыўшы апрацоўку запытаў. Далося мне гэта нялёгка і запатрабавала шматлікіх ітэрацый. Гэты артыкул дапаможа вам пазбегнуць некаторых маіх памылак і прадэманструе, што ж у мяне ў рэшце рэшт атрымалася.

Для пачатку некаторыя ўступныя тлумачэнні.

Дадзеныя

Наш універсітэцкі цэнтр апрацоўкі генетычнай інфармацыі падаў нам дадзеныя ў выглядзе TSV аб'ёмам 25 Тб. Мне яны дасталіся пабітымі на 5 пакетаў, сціснутых Gzip, кожны з якіх утрымоўваў каля 240 чатырохгігабайтных файлаў. Кожны шэраг утрымліваў дадзеныя для аднаго SNP аднаго чалавека. Усяго былі перададзены дадзеныя па ~2,5 млн SNP і ~60 тыс. чалавек. Акрамя SNP-інфармацыі ў файлах былі шматлікія калонкі з лікамі, якія адлюстроўваюць розныя характарыстыкі, такія як інтэнсіўнасць чытання, частата розных алеляў і г.д. Усяго было каля 30 калонак з унікальнымі значэннямі.

Мэта

Як і ў любым праекце па кіраванні дадзенымі, найважнейшым было вызначыць, як будуць выкарыстоўвацца дадзеныя. У гэтым выпадку мы па большай частцы будзем падбіраць мадэлі і працоўныя працэсы для SNP на аснове SNP. Гэта значыць адначасова нам будуць патрэбны дадзеныя толькі па адным SNP. Я павінен быў навучыцца як мага прасцей, хутчэй і танней здабываць усе запісы, якія адносяцца да аднаго з 2,5 SNP.

Як гэтага не рабіць

Працытую падыходнае клішэ:

Я не цярпеў тысячу разоў няўдачу, я толькі адкрыў тысячу спосабаў не парсіць кучу дадзеных у фармаце, зручным для запытаў.

Першая спроба

Чаму я навучыўся: не існуе таннага спосабу адпарсіць 25 Тб за раз.

Праслухаўшы ва Ўніверсітэце Вандэрбільта прадмет "Пашыраныя метады апрацоўкі вялікіх дадзеных", я быў упэўнены, што справа ў капелюшы. Мабыць, гадзіну-дзве сыдзе на наладу Hive-сервера, каб прабегчы па ўсіх дадзеных і даць справаздачу аб выніку. Паколькі нашыя дадзеныя захоўваюцца ў AWS S3, я скарыстаўся сэрвісам Афіна, які дазваляе прымяняць Hive SQL-запыты да S3-дадзеным. Не трэба наладжваць/паднімаць Hive-кластар, ды яшчэ і плаціш толькі за тыя дадзеныя, якія шукаеш.

Пасля таго як я паказаў Athena свае дадзеныя і іх фармат, я прагнаў некалькі тэстаў з падобнымі запытамі:

select * from intensityData limit 10;

І хутка атрымаў добра структураваныя вынікі. Гатова.

Пакуль мы не паспрабавалі выкарыстоўваць дадзеныя ў працы…

Мяне папрасілі выцягнуць усю інфармацыю па SNP, каб пратэставаць на ёй мадэль. Я запусціў запыт:


select * from intensityData 
where snp = 'rs123456';

…і стаў чакаць. Праз восем хвілін і больш за 4 Тб запытаных дадзеных я атрымаў вынік. Athena бярэ плату за аб'ём знойдзеных дадзеных, па $5 за тэрабайт. Так што гэты адзіны запыт каштаваў $20 і восем хвілін чакання. Каб прагнаць мадэль па ўсіх дадзеных, трэба было прачакаць 38 гадоў і заплаціць $50 млн. Відавочна, што нам гэта не пасавала.

Трэба было выкарыстоўваць Parquet…

Чаму я навучыўся: будзьце асцярожныя з памерам вашых Parquet-файлаў і іх арганізацыяй.

Спачатку я паспрабаваў выправіць сітуацыю, канвертаваўшы ўсе TSV у Parquet-файлы. Яны зручныя для працы з вялікімі наборамі дадзеных, таму што інфармацыя ў іх захоўваецца ў калоначным выглядзе: кожная калонка ляжыць ва ўласным сегменце памяці/дыска, у адрозненне ад тэкставых файлаў, у якіх радкі ўтрымоўваюць элементы кожнай калонкі. І калі трэба нешта знайсці, то дастаткова прачытаць неабходную калонку. Акрамя таго, у кожным файле ў калонцы захоўваецца дыяпазон значэнняў, так што калі шуканае значэнне адсутнічае ў дыяпазоне калонкі, Spark не будзе марнаваць час на сканіраванне ўсяго файла.

Я запусціў простую задачу Клей AWS для пераўтварэння нашых TSV у Parquet і закінуў новыя файлы ў Athena. Гэта заняло каля 5 гадзін. Але калі я запусціў запыт, то на яго выкананне пайшло прыкладна столькі ж часу і крыху менш грошай. Справа ў тым, што Spark, спрабуючы аптымізаваць задачу, проста распакаваў адзін TSV-чанк і паклаў яго ва ўласны Parquet-чанк. І паколькі кожны чанк быў досыць вялікі і мясціў поўныя запісы шматлікіх людзей, то ў кожным файле захоўваліся ўсе SNP, таму Spark'у прыходзілася адчыняць усе файлы, каб атрымаць патрэбную інфармацыю.

Цікаўна, што выкарыстоўваны па змаўчанні (і рэкамендуемы) тып кампрэсіі ў Parquet - snappy, - не з'яўляецца падзяляным (splitable). Таму кожны выканавец (executor) заліпаў на задачы распакавання і загрузі поўнага датасета на 3,5 Гб.

Парсім 25TB з дапамогай AWK і R

Разбіраемся ў праблеме

Чаму я навучыўся: сартаваць складана, асабліва, калі дадзеныя размеркаваны.

Мне здавалася, што зараз я зразумеў сутнасць праблемы. Мне трэба было толькі адсартаваць дадзеныя па калонцы SNP, а не па людзях. Тады ў асобным чанцы дадзеных будзе захоўвацца некалькі SNP, і тады сябе ва ўсёй красе выявіць "разумная" функцыя Parquet "адчыняць толькі калі значэнне знаходзіцца ў дыяпазоне". На жаль, адсартаваць мільярды радкоў, раскіданых па кластары, аказалася складанай задачай.

AWS сапраўды не жадае вяртаць грошы па чынніку "Я безуважлівы студэнт". Пасля таго, як я запусціў сартаванне на Amazon Glue, яна прапрацавала 2 дні і завяршылася збоем.

Што наконт партыцыянавання?

Чаму я навучыўся: партыцыі ў Spark павінны быць збалансаваны.

Затым мне прыйшла ў галаву ідэя партыцыянаваць дадзеныя ў храмасомах. Іх 23 штукі (і яшчэ некалькі, калі ўлічваць мітахандрыяльнай ДНК і нерасшыфраваныя (unmapped) вобласці).
Гэта дазволіць падзяліць дадзеныя на драбнейшыя порцыі. Калі дадаць у Spark-функцыю экспарту ў скрыпце Glue усяго толькі адзін радок partition_by = "chr", то дадзеныя павінны быць раскладзены па бакетах (buckets).

Парсім 25TB з дапамогай AWK і R
Геном складаецца з шматлікіх фрагментаў, якія называюцца храмасомамі.

Нажаль, гэта не спрацавала. У храмасом розныя памеры, а значыць і розная колькасць інфармацыі. Гэта значыць, задачы, якія Spark адпраўляў воркерам, не былі збалансаваны і выконваліся павольна, таму што некаторыя вузлы сканчалі раней і прастойвалі. Аднак задачы былі выкананы. Але пры запыце аднаго SNP незбалансаванасць зноў стала прычынай праблем. Кошт апрацоўкі SNP у буйнейшых храмасомах (гэта значыць тамака, адкуль мы і жадаем атрымаць дадзеныя) паменшылася ўсяго толькі прыкладна ў 10 раз. Шмат, але не дастаткова.

А калі падзяліць на яшчэ драбнейшыя партіціі?

Чаму я навучыўся: увогуле ніколі не спрабуйце рабіць 2,5 мільёна партыцый.

Я вырашыў гуляць па поўнай і партыцыянаваў кожны SNP. Гэта гарантавала аднолькавы памер партыцый. Дрэнная была ідэя. Я скарыстаўся Glue і дадаў нявінны радок partition_by = 'snp'. Задача запусцілася і пачала выконвацца. Праз дзень я праверыў, і ўбачыў, што ў S3 да гэтага часу нічога не запісана, таму забіў задачу. Падобна, Glue запісваў прамежкавыя файлы ва ўтоенае месца ў S3, і шмат файлаў, магчыма, пару мільёнаў. У выніку мая памылка абышлася больш за тысячу даляраў і не ўзрадавала майго настаўніка.

Партыцыянаванне + сартаванне

Чаму я навучыўся: сартаваць усё яшчэ цяжка, як і наладжваць Spark.

Апошняя спроба партыцыянавання складалася ў тым, што я партыцыянаваў храмасомы, а затым сартаваў кожную партыцыю. У тэорыі, гэта дазваляла б паскорыць кожны запыт, таму што жаданыя дадзеныя аб SNP павінны былі знаходзіцца ў межах некалькіх Parquet-чанкаў ўнутры зададзенага дыяпазону. Нажаль, сартаванне нават партыцыянаваных дадзеных апынулася цяжкай задачай. У выніку я перайшоў на EMR для кастамнага кластара і выкарыстоўваў восем магутных інстансаў (C5.4xl) і Sparklyr для стварэння больш гнуткага працоўнага працэсу…

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

…аднак задача ўсё роўна так і не была выканана. Я наладжваў па-ўсякаму: павялічваў вылучэнне памяці для кожнага выканаўцы запытаў, выкарыстоўваў вузлы з вялікім аб'ёмам памяці, ужываў шырокавяшчальныя зменныя (broadcasting variable), але кожны раз гэта апыняліся паўмеры, і паступова выканаўцы пачыналі збаіць, пакуль усё не спынілася.

Станаўлюся вынаходлівей

Чаму я навучыўся: часам асаблівыя дадзеныя патрабуюць асаблівых рашэнняў.

У кожнага SNP ёсць значэнне пазіцыі. Гэты лік, які адпавядае колькасці падстаў, якія ляжаць уздоўж яго храмасомы. Гэта добры і натуральны спосаб арганізацыі нашых даных. Спачатку я хацеў партыцыянаваць па абласцях кожнай храмасомы. Напрыклад, пазіцыі 1 - 2000, 2001 - 4000 і г.д. Але праблема ў тым, што SNP размеркаваны па храмасомах нераўнамерна, таму таму памер груп будзе моцна адрознівацца.

Парсім 25TB з дапамогай AWK і R

У выніку я прыйшоў да разбіцця па катэгорыях (rank) пазіцый. Па ўжо загружаных дадзеных я прагнаў запыт на атрыманне спісу ўнікальных SNP, іх пазіцый і храмасом. Затым адсартаваў дадзеныя ўнутры кожнай храмасомы і сабраў SNP ў групы (bin) зададзенага памеру. Скажам, па 1000 SNP. Гэта дало мне ўзаемасувязь SNP з групай-у-храмасоме.

У рэшце рэшт я зрабіў групы (bin) па 75 SNP, прычыну растлумачу ніжэй.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Першая спроба са Spark

Чаму я навучыўся: аб'яднанне ў Spark працуе хутка, але партыцыянаванне ўсё яшчэ абыходзіцца дорага.

Я хацеў прачытаць гэты маленькі (2,5 млн радкоў) фрэйм ​​дадзеных у Spark, аб'яднаць яго з волкімі дадзенымі, а затым партыцыянаваць па свежадабаўленай калонцы bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Я выкарыстоўваў sdf_broadcast(), такім чынам Spark даведаецца, што ён павінен адправіць фрэйм ​​дадзеных ва ўсе вузлы. Гэта карысна, калі дадзеныя невялікага памеру і патрабуюцца для ўсіх задач. Інакш Spark спрабуе быць разумным і размяркоўвае дадзеныя па меры патрэбы, што можа стаць прычынай тармазоў.

І зноў мая задумка не спрацавала: задачы нейкі час працавалі, завяршалі аб'яднанне, а затым, як і запушчаныя партыяцыяваннем выканаўцы, пачалі збаіць.

Дадаю AWK

Чаму я навучыўся: не спіце, калі вам выкладаюць асновы. Напэўна, хтосьці ўжо вырашыў вашу праблему яшчэ ў 1980-х.

Да гэтага моманту прычынай усіх маіх няўдач са Spark была перамяшанасць дадзеных у кластары. Магчыма, сітуацыю можна палепшыць з дапамогай папярэдняй апрацоўкі. Я вырашыў паспрабаваць падзяліць волкія тэкставыя дадзеныя на калонкі храмасом, так я спадзяваўся падаць Spark'у «папярэдне партыцыянаваныя» дадзеныя.

Я пашукаў на StackOverflow, як разбіваць па значэннях калонак, і знайшоў такі цудоўны адказ. З дапамогай AWK вы можаце падзяліць тэкставы файл па значэннях калонак, выканаўшы запіс у скрыпце, а не адпраўляючы вынікі ў stdout.

Для спробы я напісаў Bash-скрыпт. Запампаваў адзін з запакаваных TSV, затым распакаваў яго з дапамогай gzip і адправіў у awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Гэта спрацавала!

Запаўненне ядраў

Чаму я навучыўся: gnu parallel - Гэта чароўная рэч, усе павінны яе выкарыстоўваць.

Падзел праходзіў даволі павольна, і калі я запусціў htop, Каб праверыць выкарыстанне магутнага (і дарагога) EC2-інстанса, то аказалася, што я задзейнічаю толькі адно ядро ​​і прыкладна 200 Мб памяці. Каб вырашыць задачу і не страціць кучу грошай, трэба было прыдумаць, як распаралеліць працу. На шчасце, у зусім узрушаючай кнізе Data Science at the Command Line Джэрона Джансэнса я знайшоў раздзел, прысвечаны распаралельванню. З яе я даведаўся пра gnu parallel, Вельмі гнуткі метад рэалізацыі шматструменнасці ў Unix.

Парсім 25TB з дапамогай AWK і R
Калі я запусціў падзел з дапамогай новага працэсу, усё было выдатна, але заставалася вузкае месца – спампоўка S3-аб'ектаў на дыск было не занадта хуткім і не цалкам распаралелены. Каб гэта выправіць, я зрабіў вось што:

  1. Высветліў, што можна прама ў канвееры рэалізаваць этап S3-запампоўкі, цалкам выключыўшы прамежкавае захоўванне на дыску. Гэта азначае, што я магу пазбегнуць запісы волкіх дадзеных на дыск і выкарыстоўваць яшчэ больш маленькае, а значыць і таннейшае сховішча на AWS.
  2. Камандай aws configure set default.s3.max_concurrent_requests 50 моцна павялічыў колькасць патокаў, якія выкарыстоўвае AWS CLI (па змаўчанні іх 10).
  3. Перайшоў на аптымізаваны па хуткасці сеткі інстанс EC2, з літарай n у назве. Я выявіў, што страта вылічальнай магутнасці пры выкарыстанні n-інстансаў з лішкам кампенсуецца павелічэннем хуткасці загрузкі. Для большасці задач я выкарыстоўваў c5n.4xl.
  4. Памяняў gzip на pigz, гэта gzip-інструмент, які ўмее рабіць класныя штукі для распаралельвання першапачаткова не распаралеленай задачы распакавання файлаў (гэта дапамагло менш за ўсё).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Гэтыя крокі скамбінаваны сябар з сябрам, каб усё працавала вельмі хутка. Дзякуючы павелічэнню хуткасці запампоўкі і адмове ад запісу на дыск я зараз мог апрацаваць 5-тэрабайтны пакет усяго за некалькі гадзін.

У гэтым твіце павінна было згадвацца 'TSV'. Нажаль.

Выкарыстанне зноўку адпарсенных дадзеных

Чаму я навучыўся: Spark любіць несціснутыя дадзеныя і не любіць камбінаваць партыцыі.

Цяпер дадзеныя ляжалі ў S3 у незапакаваным (чытай, які падзяляецца) і полуупорядоченном фармаце, і я мог зноў вярнуцца да Spark'у. Мяне чакаў сюрпрыз: мне зноў не ўдалося дабіцца жаданага! Было вельмі складана дакладна сказаць Spark'у, як партыцыянаваны дадзеныя. І нават калі я гэта зрабіў, аказалася, што партыцый занадта шмат (95 тыс.), і калі я з дапамогай coalesce паменшыў іх колькасць да разумных межаў, гэта парушыла маё партыцыянаванне. Упэўнены, гэта можна выправіць, але за пару дзён пошукаў мне не ўдалося знайсці рашэнне. У выніку я дарабіў усе задачы ў Spark, хоць гэта заняло нейкі час, а мае падзеленыя Parquet-файлы былі не вельмі маленькімі (~200 Кб). Аднак дадзеныя ляжалі тамака, дзе трэба.

Парсім 25TB з дапамогай AWK і R
Занадта маленькія і неаднолькавыя, цудоўна!

Тэставанне лакальных Spark-запытаў

Чаму я навучыўся: у Spark занадта шмат накладных выдаткаў пры рашэнні простых задач.

Загрузіўшы дадзеныя ў прадуманым фармаце, я змог пратэставаць хуткасць. Наладзіў скрыпт на R для запуску лакальнага Spark-сервера, а потым загрузіў фрэйм ​​дадзеных Spark з паказанага сховішча Parquet-груп (bin). Я спрабаваў загрузіць усе дадзеныя, але не змог прымусіць Sparklyr распазнаць партыцыянаванне.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Выкананне заняло 29,415 секунды. Значна лепш, але не занадта добра для масавага тэсціравання чаго-небудзь. Акрамя таго, я не мог паскорыць працу з дапамогай кэшавання, таму што калі я спрабаваў кэшаваць у памяці фрэйм ​​дадзеных, Spark заўсёды падаў, нават калі я вылучыў больш за 50 Гб памяці для датасета, які важыў менш за 15.

Вяртанне да AWK

Чаму я навучыўся: асацыятыўныя масівы ў AWK вельмі эфектыўныя.

Я разумеў, што магу дабіцца больш высокай хуткасці. Мне ўспомнілася, што ў цудоўным кіраўніцтве па AWK Бруса Барнетта я чытаў пра клёвую фічу, якая называецца «асацыятыўныя масівы». Па сутнасці, гэта пары ключ-значэнне, якія чамусьці ў AWK назвалі інакш, і таму я неяк пра іх і не ўспамінаў асабліва. Раман Чапляка нагадаў, што тэрмін «асацыятыўныя масівы» значна старэйшы за тэрмін «пара ключ-значэнне». Нават калі вы шукаеце ключ-значэнне ў Google Ngram, гэты тэрмін вы там не ўбачыце, затое знойдзеце асацыятыўныя масівы! Да таго ж "пара ключ-значэнне" часцей за ўсё асацыюецца базамі дадзеных, таму значна лагічней параўноўваць з hashmap. Я зразумеў, што магу выкарыстоўваць гэтыя асацыятыўныя масівы для сувязі маіх SNP з табліцай груп (bin table) і волкімі дадзенымі без ужывання Spark.

Для гэтага ў AWK-скрыпце я выкарыстоўваў блок BEGIN. Гэта фрагмент кода, які выконваецца да таго, як першы радок дадзеных будзе перададзены ў асноўнае цела скрыпту.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Каманда while(getline...) загрузіла ўсе радкі з CSV групы (bin), задала першую калонку (назва SNP) у якасці ключа для асацыятыўнага масіва bin і другое значэнне (група) у якасці значэння. Затым у блоку { }, які выконваецца ў дачыненні да ўсіх радкоў асноўнага файла, кожны радок адпраўляецца ў выходны файл, які атрымлівае ўнікальнае імя ў залежнасці ад яго групы (bin): ..._bin_"bin[$1]"_....

зменныя batch_num и chunk_id адпавядалі дадзеным, прадстаўленым канвеерам, што дазволіла пазбегнуць станы гонкі, і кожны струмень выканання, запушчаны parallel, пісаў ва ўласны унікальны файл.

Паколькі ўсе волкія дадзеныя я раскідаў па тэчках па храмасомах, пакінутым пасля майго папярэдняга эксперыменту з AWK, зараз я мог напісаць іншы Bash-скрыпт, каб апрацоўваць па храмасоме за раз і аддаваць у S3 глыбей партыцыянаваныя дадзеныя.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

У скрыпце два раздзелы parallel.

У першай частцы счытваюцца дадзеныя з усіх файлаў, утрымоўвальных інфармацыю па патрэбнай храмасоме, затым гэтыя дадзеныя размяркоўваюцца па струменях, якія раскідваюць файлы па адпаведных групах (bin). Каб не ўзнікала станы гонкі, калі некалькі струменяў запісваюць у адзін файл, AWK перадае імёны файлаў для запісу дадзеных у розныя месцы, напрыклад, chr_10_bin_52_batch_2_aa.csv. У выніку на дыску ствараецца мноства маленькіх файлаў (для гэтага я выкарыстоўваў тэрабайтныя EBS-томы).

Канвеер з другога раздзела parallel праходзіць па групах (bin) і аб'ядноўвае іх асобныя файлы ў агульныя CSV c cat, А затым адпраўляе іх на экспарт.

Трансляванне ў R?

Чаму я навучыўся: можна звяртацца да stdin и stdout з R-скрыпту, а значыць і выкарыстоўваць яго ў канвееры.

У Bash-скрыпце вы маглі заўважыць такі радок: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Яна транслюе ўсе канкатэнаваныя файлы групы (bin) у ніжэйпрыведзены R-скрыпт. {} з'яўляецца спецыяльнай методыкай parallel, якая любыя якія адпраўляюцца ёю ў паказаны струмень дадзеныя ўстаўляе ў прама ў саму каманду. Опцыя {#} дае унікальны ID патоку выканання, а {%} уяўляе сабой нумар слота задання (паўтараецца, але ніколі адначасова). Спіс усіх опцый можна знайсці ў дакументацыі.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Калі пераменная file("stdin") перадаецца ў readr::read_csv, дадзеныя, трансляваныя ў R-скрыпт, загружаюцца ва фрэйм, які потым у выглядзе .rds-файла з дапамогай aws.s3 запісваецца прама ў S3.

RDS – гэта нешта накшталт малодшай версіі Parquet, без вынаходстваў калонкавага сховішчы.

Пасля завяршэння Bash-скрыпту я атрымаў пачак .rds-файлаў, якія ляжаць у S3, што дало дазволіла мне выкарыстоўваць эфектыўны сціск і ўбудаваныя тыпы.

Нягледзячы на ​​выкарыстанне тармазнога R, усё працавала вельмі хутка. Нядзіўна, што фрагменты на R, якія адказваюць за чытанне і запіс дадзеных, добра аптымізаваны. Пасля тэставання на адной храмасоме сярэдняга памеру, заданне выканалася на інстансе C5n.4xl прыкладна за дзве гадзіны.

Абмежаванні S3

Чаму я навучыўся: дзякуючы разумнай рэалізацыі шляхоў S3 можа апрацоўваць шмат файлаў.

Я турбаваўся, ці зможа S3 апрацаваць мноства перададзеных ёй файлаў. Я мог зрабіць імёны файлаў асэнсаванымі, але як S3 будзе па іх шукаць?

Парсім 25TB з дапамогай AWK і R
Тэчкі ў S3 усяго толькі для прыгажосці, насамрэч сістэму не цікавіць знак. /. З FAQ-старонкі S3.

Падобна, S3 уяўляе шлях да пэўнага файла ў выглядзе простага ключа ў своеасаблівай хэш-табліцы або базе дадзеных на аснове дакументаў. Бакет (bucket) можна лічыць табліцай, а файлы - запісамі ў гэтай табліцы.

Паколькі хуткасць і эфектыўнасць важныя для атрымання прыбытку ў Amazon, нядзіўна, што гэтая сістэма «ключ-у-якасці-шляху-да-файла» афігенна аптымізавана. Я спрабаваў знайсці баланс: каб не трэба было рабіць мноства get-запытаў, але каб пры гэтым запыты выконваліся хутка. Аказалася, што лепш за ўсё рабіць каля 20 тыс. bin-файлаў. Думаю, калі працягнуць аптымізаваць, то можна дамагчыся падвышэнні хуткасці (напрыклад, рабіць адмысловы бакет толькі для дадзеных, такім чынам памяншаючы памер пошукавай табліцы). Але на далейшыя эксперыменты ўжо не было чакай і грошай.

Што наконт крыжаванай сумяшчальнасці?

Чаму я навучыўся: галоўная прычына страты часу - заўчасная аптымізацыя вашага метаду захоўвання.

У гэты момант вельмі важна спытаць сябе: "Навошта выкарыстоўваць прапрыетарны фармат файлаў?" Чыннік крыецца ў хуткасці загрузкі (запакаваныя gzip CSV-файлы грузіліся ў 7 раз даўжэй) і сумяшчальнасці з нашымі працоўнымі працэсамі. Я магу перагледзець сваё рашэнне, калі R зможа лёгка загружаць файлы Parquet (ці Arrow) без нагрузкі ў выглядзе Spark. У нас у лабараторыі ўсё выкарыстоўваюць R, і калі мне спатрэбіцца пераўтварыць дадзеныя ў іншы фармат, то ў мяне ўсё яшчэ застаюцца зыходныя тэкставыя дадзеныя, таму я магу проста зноў запусціць канвеер.

Падзел працы

Чаму я навучыўся: не спрабуйце аптымізаваць заданні ўручную, няхай гэта робіць кампутар.

Я адладзіў працоўны працэс на адной храмасоме, зараз трэба апрацаваць усе астатнія дадзеныя.
Хацелася падняць некалькі інстансаў EC2 для пераўтварэння, але ў той жа час я баяўся атрымаць вельмі незбалансаваную нагрузку ў розных заданнях на апрацоўку (гэтак жа, як Spark пакутаваў ад незбалансаваных партыцый). Акрамя таго, мне не ўсміхалася паднімаць па адным інстансе на кожную храмасому, таму што для AWS-акаўнтаў ёсць абмежаванне па змаўчанні ў 10 інстансаў.

Тады я вырашыў напісаць на R скрыпт для аптымізацыі заданняў на апрацоўку.

Спачатку папрасіў S3 вылічыць, колькі месца ў сховішча займае кожная храмасома.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Затым я напісаў функцыю, якая бярэ агульны памер, змешвае парадак храмасом, дзеліць іх на групы num_jobs і паведамляе, наколькі адрозніваюцца памеры ўсіх заданняў на апрацоўку.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Потым я прагнаў з дапамогай purrr тысячу мяшанняў і абраў лепшае.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Так я атрымаў набор заданняў, вельмі падобных да памеру. Затым заставалася толькі абгарнуць мой папярэдні Bash-скрыпт у вялікі цыкл for. На напісанне гэтай аптымізацыі спатрэбілася каля 10 хвілін. І гэта куды менш, чым я патраціў бы на ручное стварэнне заданняў у выпадку з іх незбалансаванасцю. Таму лічу, што з гэтай папярэдняй аптымізацыяй я не пралічыўся.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

У канцы дадаю каманду выключэння:

sudo shutdown -h now

… і ўсё атрымалася! З дапамогай AWS CLI я паднімаў інстансы і праз опцыю user_data перадаваў ім Bash-скрыпты іх заданняў на апрацоўку. Яны выконваліся і аўтаматычна выключаліся, так што я не плаціў за залішнюю вылічальную магутнасць.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Пакуем!

Чаму я навучыўся: API павінен быць простым дзеля прастаты і гнуткасці выкарыстання.

Нарэшце я атрымаў дадзеныя ў патрэбным месцы і выглядзе. Заставалася максімальна спрасціць працэс выкарыстання дадзеных, каб маім калегам было лягчэй. Я хацеў зрабіць просты API для стварэння запытаў. Калі ў будучыні я вырашу перайсці з .rds на Parquet-файлы, гэта павінна быць праблемай для мяне, а не для калегаў. Для гэтага я вырашыў зрабіць унутраны R-пакет.

Сабраў і задакументаваў вельмі просты пакет, які змяшчае ўсяго некалькі функцый для доступу да дадзеных, сабраных вакол функцыі get_snp. Таксама я зрабіў для калег сайт pkgdown, каб яны маглі лёгка паглядзець прыклады і дакументацыю.

Парсім 25TB з дапамогай AWK і R

Інтэлектуальнае кэшаванне

Чаму я навучыўся: калі вашы дадзеныя добра падрыхтаваны, кэшаваць будзе проста!

Паколькі адзін з асноўных працоўных працэсаў ужываў да пакета SNP адну і тую ж мадэль аналізу, я вырашыў выкарыстоўваць групаванне (binning) у сваіх інтарэсах. Пры перадачы дадзеных па SNP, да які вяртаецца аб'екту прымацоўваецца і ўся інфармацыя з групы (bin). Гэта значыць, старыя запыты могуць (у тэорыі) паскараць апрацоўку новых запытаў.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Пры зборцы пакета я прагнаў шмат бенчмаркаў, каб параўнаць хуткасць пры выкарыстанні розных метадаў. Рэкамендую гэтым не грэбаваць, таму што часам вынікі бываюць нечаканымі. Напрыклад, dplyr::filter апынуўся значна хутчэй захопу радкоў з дапамогай фільтрацыі на аснове індэксавання, а атрыманне адной калонкі з адфільтраванага фрэйма дадзеных працавала значна хутчэй, чым ужыванне сінтаксісу індэксавання.

Звярніце ўвагу, што аб'ект prev_snp_results змяшчае ключ snps_in_bin. Гэта масіў усіх унікальных SNP у групе (bin), які дазваляе хутка правяраць, ці ёсць ужо дадзеныя з папярэдняга запыту. Таксама ён спрашчае цыклічны праход па ўсіх SNP у групе (bin) з дапамогай гэтага кода:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Вынікі

Цяпер мы можам (і пачалі сур'ёзна) праганяць мадэлі і сцэнары, раней нам недаступныя. Самае лепшае, што маім калегам па лабараторыі не даводзіцца думаць пра ўсякія складанасці. У іх проста ёсць функцыя, якая працуе.

І хоць пакет пазбаўляе іх ад падрабязнасцяў, я спрабаваў зрабіць фармат дадзеных дастаткова простым, каб у ім маглі разабрацца, калі заўтра я раптам знікну…

Хуткасць прыкметна вырасла. Звычайна мы скануем функцыянальна значныя фрагменты геному. Раней мы не маглі гэта рабіць (атрымлівалася занадта дорага), але зараз, дзякуючы групавой (bin) структуры і кэшаванню, на запыт аднаго SNP сыходзіць у сярэднім менш 0,1 секунды, а выкарыстанне дадзеных такое нізкае, што выдаткі на атрымліваюцца S3 капейкавыя.

Заключэнне

Гэты артыкул - зусім не кіраўніцтва. Рашэнне атрымалася індывідуальнае, і амаль напэўна не аптымальнае. Хутчэй, гэта аповяд аб вандраванні. Жадаю, каб іншыя зразумелі, што падобныя рашэнні не ўзнікаюць у галаве цалкам сфармаванымі, гэта вынік спроб і памылак. Акрамя таго, калі вы шукаеце адмыслоўца па аналізе дадзеных, то ўлічвайце, што для эфектыўнага выкарыстання гэтых прылад патрэбен досвед, а досвед патрабуе грошай. Я шчаслівы, што ў мяне былі сродкі на аплату, але ў многіх іншых, хто можа зрабіць тую ж працу лепш за мяне, ніколі не будзе такой магчымасці з-за адсутнасці грошай нават на спробу.

Інструменты для вялікіх дадзеных ўніверсальныя. Калі ў вас ёсць час, то амаль напэўна зможаце напісаць хутчэйшае рашэнне, ужыўшы «разумную» ачыстку дадзеных, сховішча і методыкі вымання. У канчатковым рахунку ўсё зводзіцца да аналізу выдаткаў і выгод.

Чаму я навучыўся:

  • не існуе таннага спосабу адпарсіць 25 Тб за раз;
  • будзьце асцярожныя з памерам вашых Parquet-файлаў і іх арганізацыяй;
  • партыцыі ў Spark павінны быць збалансаваны;
  • наогул ніколі не спрабуйце рабіць 2,5 мільёна партыцый;
  • сартаваць усё яшчэ цяжка, як і наладжваць Spark;
  • часам адмысловыя дадзеныя патрабуюць адмысловых рашэнняў;
  • аб'яднанне ў Spark працуе хутка, але партыцыянаванне ўсё яшчэ абыходзіцца дорага;
  • не спіце, калі вам выкладаюць асновы, напэўна хтосьці ўжо вырашыў вашу праблему яшчэ ў 1980-х;
  • gnu parallel - гэта чароўная рэч, усе павінны яе выкарыстоўваць;
  • Spark кахае несціснутыя дадзеныя і не кахае камбінаваць партіціі;
  • у Spark занадта шмат накладных выдаткаў пры рашэнні простых задач;
  • асацыятыўныя масівы ў AWK вельмі эфектыўныя;
  • можна звяртацца да stdin и stdout з R-скрыпту, а значыць і выкарыстоўваць яго ў канвееры;
  • дзякуючы разумнай рэалізацыі шляхоў S3 можа апрацоўваць шмат файлаў;
  • галоўная прычына страты часу - заўчасная аптымізацыя вашага метаду захоўвання;
  • не спрабуйце аптымізаваць заданні ўручную, няхай гэта робіць кампутар;
  • API павінен быць простым дзеля прастаты і гнуткасці выкарыстання;
  • калі вашыя дадзеныя добра падрыхтаваны, кэшаваць будзе проста!

Крыніца: habr.com

Дадаць каментар