Як чытаць гэты артыкул: прашу прабачэння за тое, што тэкст атрымаўся такім доўгім і хаатычным. Каб зэканоміць ваш час, я кожны раздзел пачынаю са ўступа «Чаму я навучыўся», у якім адным-двума прапановамі выкладаю істу раздзела.
«Проста пакажы рашэнне!» Калі вы хочаце ўсяго толькі ўбачыць, да чаго я прыйшоў, то пераходзіце да раздзела «Станаўлюся вынаходлівей», але я лічу, што цікавей і карысней пачытаць пра няўдачы.
Нядаўна мне даручылі наладзіць працэс апрацоўкі вялікага аб'ёму зыходных паслядоўнасцяў ДНК (тэхнічна гэта SNP-чып). Трэба было хутка атрымліваць дадзеныя аб зададзеным генетычным месцазнаходжанні (якое называецца SNP) для наступнага мадэлявання і іншых задач. З дапамогай R і AWK мне ўдалося ачысціць і арганізаваць дадзеныя натуральным чынам, моцна паскорыўшы апрацоўку запытаў. Далося мне гэта нялёгка і запатрабавала шматлікіх ітэрацый. Гэты артыкул дапаможа вам пазбегнуць некаторых маіх памылак і прадэманструе, што ж у мяне ў рэшце рэшт атрымалася.
Для пачатку некаторыя ўступныя тлумачэнні.
Дадзеныя
Наш універсітэцкі цэнтр апрацоўкі генетычнай інфармацыі падаў нам дадзеныя ў выглядзе TSV аб'ёмам 25 Тб. Мне яны дасталіся пабітымі на 5 пакетаў, сціснутых Gzip, кожны з якіх утрымоўваў каля 240 чатырохгігабайтных файлаў. Кожны шэраг утрымліваў дадзеныя для аднаго SNP аднаго чалавека. Усяго былі перададзены дадзеныя па ~2,5 млн SNP і ~60 тыс. чалавек. Акрамя SNP-інфармацыі ў файлах былі шматлікія калонкі з лікамі, якія адлюстроўваюць розныя характарыстыкі, такія як інтэнсіўнасць чытання, частата розных алеляў і г.д. Усяго было каля 30 калонак з унікальнымі значэннямі.
Мэта
Як і ў любым праекце па кіраванні дадзенымі, найважнейшым было вызначыць, як будуць выкарыстоўвацца дадзеныя. У гэтым выпадку мы па большай частцы будзем падбіраць мадэлі і працоўныя працэсы для SNP на аснове SNP. Гэта значыць адначасова нам будуць патрэбны дадзеныя толькі па адным SNP. Я павінен быў навучыцца як мага прасцей, хутчэй і танней здабываць усе запісы, якія адносяцца да аднаго з 2,5 SNP.
Як гэтага не рабіць
Працытую падыходнае клішэ:
Я не цярпеў тысячу разоў няўдачу, я толькі адкрыў тысячу спосабаў не парсіць кучу дадзеных у фармаце, зручным для запытаў.
Першая спроба
Чаму я навучыўся: не існуе таннага спосабу адпарсіць 25 Тб за раз.
Праслухаўшы ва Ўніверсітэце Вандэрбільта прадмет "Пашыраныя метады апрацоўкі вялікіх дадзеных", я быў упэўнены, што справа ў капелюшы. Мабыць, гадзіну-дзве сыдзе на наладу Hive-сервера, каб прабегчы па ўсіх дадзеных і даць справаздачу аб выніку. Паколькі нашыя дадзеныя захоўваюцца ў AWS S3, я скарыстаўся сэрвісам Афіна, які дазваляе прымяняць Hive SQL-запыты да S3-дадзеным. Не трэба наладжваць/паднімаць Hive-кластар, ды яшчэ і плаціш толькі за тыя дадзеныя, якія шукаеш.
Пасля таго як я паказаў Athena свае дадзеныя і іх фармат, я прагнаў некалькі тэстаў з падобнымі запытамі:
select * from intensityData limit 10;
І хутка атрымаў добра структураваныя вынікі. Гатова.
Пакуль мы не паспрабавалі выкарыстоўваць дадзеныя ў працы…
Мяне папрасілі выцягнуць усю інфармацыю па SNP, каб пратэставаць на ёй мадэль. Я запусціў запыт:
select * from intensityData
where snp = 'rs123456';
…і стаў чакаць. Праз восем хвілін і больш за 4 Тб запытаных дадзеных я атрымаў вынік. Athena бярэ плату за аб'ём знойдзеных дадзеных, па $5 за тэрабайт. Так што гэты адзіны запыт каштаваў $20 і восем хвілін чакання. Каб прагнаць мадэль па ўсіх дадзеных, трэба было прачакаць 38 гадоў і заплаціць $50 млн. Відавочна, што нам гэта не пасавала.
Трэба было выкарыстоўваць Parquet…
Чаму я навучыўся: будзьце асцярожныя з памерам вашых Parquet-файлаў і іх арганізацыяй.
Спачатку я паспрабаваў выправіць сітуацыю, канвертаваўшы ўсе TSV у Parquet-файлы. Яны зручныя для працы з вялікімі наборамі дадзеных, таму што інфармацыя ў іх захоўваецца ў калоначным выглядзе: кожная калонка ляжыць ва ўласным сегменце памяці/дыска, у адрозненне ад тэкставых файлаў, у якіх радкі ўтрымоўваюць элементы кожнай калонкі. І калі трэба нешта знайсці, то дастаткова прачытаць неабходную калонку. Акрамя таго, у кожным файле ў калонцы захоўваецца дыяпазон значэнняў, так што калі шуканае значэнне адсутнічае ў дыяпазоне калонкі, Spark не будзе марнаваць час на сканіраванне ўсяго файла.
Я запусціў простую задачу Клей AWS для пераўтварэння нашых TSV у Parquet і закінуў новыя файлы ў Athena. Гэта заняло каля 5 гадзін. Але калі я запусціў запыт, то на яго выкананне пайшло прыкладна столькі ж часу і крыху менш грошай. Справа ў тым, што Spark, спрабуючы аптымізаваць задачу, проста распакаваў адзін TSV-чанк і паклаў яго ва ўласны Parquet-чанк. І паколькі кожны чанк быў досыць вялікі і мясціў поўныя запісы шматлікіх людзей, то ў кожным файле захоўваліся ўсе SNP, таму Spark'у прыходзілася адчыняць усе файлы, каб атрымаць патрэбную інфармацыю.
Цікаўна, што выкарыстоўваны па змаўчанні (і рэкамендуемы) тып кампрэсіі ў Parquet - snappy, - не з'яўляецца падзяляным (splitable). Таму кожны выканавец (executor) заліпаў на задачы распакавання і загрузі поўнага датасета на 3,5 Гб.
Разбіраемся ў праблеме
Чаму я навучыўся: сартаваць складана, асабліва, калі дадзеныя размеркаваны.
Мне здавалася, што зараз я зразумеў сутнасць праблемы. Мне трэба было толькі адсартаваць дадзеныя па калонцы SNP, а не па людзях. Тады ў асобным чанцы дадзеных будзе захоўвацца некалькі SNP, і тады сябе ва ўсёй красе выявіць "разумная" функцыя Parquet "адчыняць толькі калі значэнне знаходзіцца ў дыяпазоне". На жаль, адсартаваць мільярды радкоў, раскіданых па кластары, аказалася складанай задачай.
Мы ацэньваем algorithms class in college: "Ужо, не адна справа аб фізічнай complexity of all these sorting algorithms"
Мей trying to sort on a column in a 20TB #іскра table: "Why is this taking so long?" #Даныянавукі змагаецца.
AWS сапраўды не жадае вяртаць грошы па чынніку "Я безуважлівы студэнт". Пасля таго, як я запусціў сартаванне на Amazon Glue, яна прапрацавала 2 дні і завяршылася збоем.
Што наконт партыцыянавання?
Чаму я навучыўся: партыцыі ў Spark павінны быць збалансаваны.
Затым мне прыйшла ў галаву ідэя партыцыянаваць дадзеныя ў храмасомах. Іх 23 штукі (і яшчэ некалькі, калі ўлічваць мітахандрыяльнай ДНК і нерасшыфраваныя (unmapped) вобласці).
Гэта дазволіць падзяліць дадзеныя на драбнейшыя порцыі. Калі дадаць у Spark-функцыю экспарту ў скрыпце Glue усяго толькі адзін радок partition_by = "chr", то дадзеныя павінны быць раскладзены па бакетах (buckets).
Геном складаецца з шматлікіх фрагментаў, якія называюцца храмасомамі.
Нажаль, гэта не спрацавала. У храмасом розныя памеры, а значыць і розная колькасць інфармацыі. Гэта значыць, задачы, якія Spark адпраўляў воркерам, не былі збалансаваны і выконваліся павольна, таму што некаторыя вузлы сканчалі раней і прастойвалі. Аднак задачы былі выкананы. Але пры запыце аднаго SNP незбалансаванасць зноў стала прычынай праблем. Кошт апрацоўкі SNP у буйнейшых храмасомах (гэта значыць тамака, адкуль мы і жадаем атрымаць дадзеныя) паменшылася ўсяго толькі прыкладна ў 10 раз. Шмат, але не дастаткова.
А калі падзяліць на яшчэ драбнейшыя партіціі?
Чаму я навучыўся: увогуле ніколі не спрабуйце рабіць 2,5 мільёна партыцый.
Я вырашыў гуляць па поўнай і партыцыянаваў кожны SNP. Гэта гарантавала аднолькавы памер партыцый. Дрэнная была ідэя. Я скарыстаўся Glue і дадаў нявінны радок partition_by = 'snp'. Задача запусцілася і пачала выконвацца. Праз дзень я праверыў, і ўбачыў, што ў S3 да гэтага часу нічога не запісана, таму забіў задачу. Падобна, Glue запісваў прамежкавыя файлы ва ўтоенае месца ў S3, і шмат файлаў, магчыма, пару мільёнаў. У выніку мая памылка абышлася больш за тысячу даляраў і не ўзрадавала майго настаўніка.
Партыцыянаванне + сартаванне
Чаму я навучыўся: сартаваць усё яшчэ цяжка, як і наладжваць Spark.
Апошняя спроба партыцыянавання складалася ў тым, што я партыцыянаваў храмасомы, а затым сартаваў кожную партыцыю. У тэорыі, гэта дазваляла б паскорыць кожны запыт, таму што жаданыя дадзеныя аб SNP павінны былі знаходзіцца ў межах некалькіх Parquet-чанкаў ўнутры зададзенага дыяпазону. Нажаль, сартаванне нават партыцыянаваных дадзеных апынулася цяжкай задачай. У выніку я перайшоў на EMR для кастамнага кластара і выкарыстоўваў восем магутных інстансаў (C5.4xl) і Sparklyr для стварэння больш гнуткага працоўнага працэсу…
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
…аднак задача ўсё роўна так і не была выканана. Я наладжваў па-ўсякаму: павялічваў вылучэнне памяці для кожнага выканаўцы запытаў, выкарыстоўваў вузлы з вялікім аб'ёмам памяці, ужываў шырокавяшчальныя зменныя (broadcasting variable), але кожны раз гэта апыняліся паўмеры, і паступова выканаўцы пачыналі збаіць, пакуль усё не спынілася.
Чаму я навучыўся: часам асаблівыя дадзеныя патрабуюць асаблівых рашэнняў.
У кожнага SNP ёсць значэнне пазіцыі. Гэты лік, які адпавядае колькасці падстаў, якія ляжаць уздоўж яго храмасомы. Гэта добры і натуральны спосаб арганізацыі нашых даных. Спачатку я хацеў партыцыянаваць па абласцях кожнай храмасомы. Напрыклад, пазіцыі 1 - 2000, 2001 - 4000 і г.д. Але праблема ў тым, што SNP размеркаваны па храмасомах нераўнамерна, таму таму памер груп будзе моцна адрознівацца.
У выніку я прыйшоў да разбіцця па катэгорыях (rank) пазіцый. Па ўжо загружаных дадзеных я прагнаў запыт на атрыманне спісу ўнікальных SNP, іх пазіцый і храмасом. Затым адсартаваў дадзеныя ўнутры кожнай храмасомы і сабраў SNP ў групы (bin) зададзенага памеру. Скажам, па 1000 SNP. Гэта дало мне ўзаемасувязь SNP з групай-у-храмасоме.
У рэшце рэшт я зрабіў групы (bin) па 75 SNP, прычыну растлумачу ніжэй.
Чаму я навучыўся: аб'яднанне ў Spark працуе хутка, але партыцыянаванне ўсё яшчэ абыходзіцца дорага.
Я хацеў прачытаць гэты маленькі (2,5 млн радкоў) фрэйм дадзеных у Spark, аб'яднаць яго з волкімі дадзенымі, а затым партыцыянаваць па свежадабаўленай калонцы bin.
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
Я выкарыстоўваў sdf_broadcast(), такім чынам Spark даведаецца, што ён павінен адправіць фрэйм дадзеных ва ўсе вузлы. Гэта карысна, калі дадзеныя невялікага памеру і патрабуюцца для ўсіх задач. Інакш Spark спрабуе быць разумным і размяркоўвае дадзеныя па меры патрэбы, што можа стаць прычынай тармазоў.
І зноў мая задумка не спрацавала: задачы нейкі час працавалі, завяршалі аб'яднанне, а затым, як і запушчаныя партыяцыяваннем выканаўцы, пачалі збаіць.
Дадаю AWK
Чаму я навучыўся: не спіце, калі вам выкладаюць асновы. Напэўна, хтосьці ўжо вырашыў вашу праблему яшчэ ў 1980-х.
Да гэтага моманту прычынай усіх маіх няўдач са Spark была перамяшанасць дадзеных у кластары. Магчыма, сітуацыю можна палепшыць з дапамогай папярэдняй апрацоўкі. Я вырашыў паспрабаваць падзяліць волкія тэкставыя дадзеныя на калонкі храмасом, так я спадзяваўся падаць Spark'у «папярэдне партыцыянаваныя» дадзеныя.
Я пашукаў на StackOverflow, як разбіваць па значэннях калонак, і знайшоў такі цудоўны адказ. З дапамогай AWK вы можаце падзяліць тэкставы файл па значэннях калонак, выканаўшы запіс у скрыпце, а не адпраўляючы вынікі ў stdout.
Для спробы я напісаў Bash-скрыпт. Запампаваў адзін з запакаваных TSV, затым распакаваў яго з дапамогай gzip і адправіў у awk.
Чаму я навучыўся: gnu parallel - Гэта чароўная рэч, усе павінны яе выкарыстоўваць.
Падзел праходзіў даволі павольна, і калі я запусціў htop, Каб праверыць выкарыстанне магутнага (і дарагога) EC2-інстанса, то аказалася, што я задзейнічаю толькі адно ядро і прыкладна 200 Мб памяці. Каб вырашыць задачу і не страціць кучу грошай, трэба было прыдумаць, як распаралеліць працу. На шчасце, у зусім узрушаючай кнізе Data Science at the Command Line Джэрона Джансэнса я знайшоў раздзел, прысвечаны распаралельванню. З яе я даведаўся пра gnu parallel, Вельмі гнуткі метад рэалізацыі шматструменнасці ў Unix.
Калі я запусціў падзел з дапамогай новага працэсу, усё было выдатна, але заставалася вузкае месца – спампоўка S3-аб'ектаў на дыск было не занадта хуткім і не цалкам распаралелены. Каб гэта выправіць, я зрабіў вось што:
Высветліў, што можна прама ў канвееры рэалізаваць этап S3-запампоўкі, цалкам выключыўшы прамежкавае захоўванне на дыску. Гэта азначае, што я магу пазбегнуць запісы волкіх дадзеных на дыск і выкарыстоўваць яшчэ больш маленькае, а значыць і таннейшае сховішча на AWS.
Камандай aws configure set default.s3.max_concurrent_requests 50 моцна павялічыў колькасць патокаў, якія выкарыстоўвае AWS CLI (па змаўчанні іх 10).
Перайшоў на аптымізаваны па хуткасці сеткі інстанс EC2, з літарай n у назве. Я выявіў, што страта вылічальнай магутнасці пры выкарыстанні n-інстансаў з лішкам кампенсуецца павелічэннем хуткасці загрузкі. Для большасці задач я выкарыстоўваў c5n.4xl.
Памяняў gzip на pigz, гэта gzip-інструмент, які ўмее рабіць класныя штукі для распаралельвання першапачаткова не распаралеленай задачы распакавання файлаў (гэта дапамагло менш за ўсё).
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe
"awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
Гэтыя крокі скамбінаваны сябар з сябрам, каб усё працавала вельмі хутка. Дзякуючы павелічэнню хуткасці запампоўкі і адмове ад запісу на дыск я зараз мог апрацаваць 5-тэрабайтны пакет усяго за некалькі гадзін.
Там няма арангутару, калі плацяць за ўсё, што вы будзеце плаціць за AWS. Адукацыйны партал www.adu.by Page 19 XNUMX Thanks to gnu-parallel можа быць зачынены і шпіталь XNUMXgig csv just as fast a can download it. I я не магу нават спагнаць да гэтага. #Даныянавукі#Linuxpic.twitter.com/Nqyba2zqEk
У гэтым твіце павінна было згадвацца 'TSV'. Нажаль.
Выкарыстанне зноўку адпарсенных дадзеных
Чаму я навучыўся: Spark любіць несціснутыя дадзеныя і не любіць камбінаваць партыцыі.
Цяпер дадзеныя ляжалі ў S3 у незапакаваным (чытай, які падзяляецца) і полуупорядоченном фармаце, і я мог зноў вярнуцца да Spark'у. Мяне чакаў сюрпрыз: мне зноў не ўдалося дабіцца жаданага! Было вельмі складана дакладна сказаць Spark'у, як партыцыянаваны дадзеныя. І нават калі я гэта зрабіў, аказалася, што партыцый занадта шмат (95 тыс.), і калі я з дапамогай coalesce паменшыў іх колькасць да разумных межаў, гэта парушыла маё партыцыянаванне. Упэўнены, гэта можна выправіць, але за пару дзён пошукаў мне не ўдалося знайсці рашэнне. У выніку я дарабіў усе задачы ў Spark, хоць гэта заняло нейкі час, а мае падзеленыя Parquet-файлы былі не вельмі маленькімі (~200 Кб). Аднак дадзеныя ляжалі тамака, дзе трэба.
Занадта маленькія і неаднолькавыя, цудоўна!
Тэставанне лакальных Spark-запытаў
Чаму я навучыўся: у Spark занадта шмат накладных выдаткаў пры рашэнні простых задач.
Загрузіўшы дадзеныя ў прадуманым фармаце, я змог пратэставаць хуткасць. Наладзіў скрыпт на R для запуску лакальнага Spark-сервера, а потым загрузіў фрэйм дадзеных Spark з паказанага сховішча Parquet-груп (bin). Я спрабаваў загрузіць усе дадзеныя, але не змог прымусіць Sparklyr распазнаць партыцыянаванне.
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
Выкананне заняло 29,415 секунды. Значна лепш, але не занадта добра для масавага тэсціравання чаго-небудзь. Акрамя таго, я не мог паскорыць працу з дапамогай кэшавання, таму што калі я спрабаваў кэшаваць у памяці фрэйм дадзеных, Spark заўсёды падаў, нават калі я вылучыў больш за 50 Гб памяці для датасета, які важыў менш за 15.
Вяртанне да AWK
Чаму я навучыўся: асацыятыўныя масівы ў AWK вельмі эфектыўныя.
Я разумеў, што магу дабіцца больш высокай хуткасці. Мне ўспомнілася, што ў цудоўным кіраўніцтве па AWK Бруса Барнетта я чытаў пра клёвую фічу, якая называецца «асацыятыўныя масівы». Па сутнасці, гэта пары ключ-значэнне, якія чамусьці ў AWK назвалі інакш, і таму я неяк пра іх і не ўспамінаў асабліва. Раман Чапляка нагадаў, што тэрмін «асацыятыўныя масівы» значна старэйшы за тэрмін «пара ключ-значэнне». Нават калі вы шукаеце ключ-значэнне ў Google Ngram, гэты тэрмін вы там не ўбачыце, затое знойдзеце асацыятыўныя масівы! Да таго ж "пара ключ-значэнне" часцей за ўсё асацыюецца базамі дадзеных, таму значна лагічней параўноўваць з hashmap. Я зразумеў, што магу выкарыстоўваць гэтыя асацыятыўныя масівы для сувязі маіх SNP з табліцай груп (bin table) і волкімі дадзенымі без ужывання Spark.
Для гэтага ў AWK-скрыпце я выкарыстоўваў блок BEGIN. Гэта фрагмент кода, які выконваецца да таго, як першы радок дадзеных будзе перададзены ў асноўнае цела скрыпту.
Каманда while(getline...) загрузіла ўсе радкі з CSV групы (bin), задала першую калонку (назва SNP) у якасці ключа для асацыятыўнага масіва bin і другое значэнне (група) у якасці значэння. Затым у блоку {}, які выконваецца ў дачыненні да ўсіх радкоў асноўнага файла, кожны радок адпраўляецца ў выходны файл, які атрымлівае ўнікальнае імя ў залежнасці ад яго групы (bin): ..._bin_"bin[$1]"_....
зменныя batch_num и chunk_id адпавядалі дадзеным, прадстаўленым канвеерам, што дазволіла пазбегнуць станы гонкі, і кожны струмень выканання, запушчаны parallel, пісаў ва ўласны унікальны файл.
Паколькі ўсе волкія дадзеныя я раскідаў па тэчках па храмасомах, пакінутым пасля майго папярэдняга эксперыменту з AWK, зараз я мог напісаць іншы Bash-скрыпт, каб апрацоўваць па храмасоме за раз і аддаваць у S3 глыбей партыцыянаваныя дадзеныя.
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
У скрыпце два раздзелы parallel.
У першай частцы счытваюцца дадзеныя з усіх файлаў, утрымоўвальных інфармацыю па патрэбнай храмасоме, затым гэтыя дадзеныя размяркоўваюцца па струменях, якія раскідваюць файлы па адпаведных групах (bin). Каб не ўзнікала станы гонкі, калі некалькі струменяў запісваюць у адзін файл, AWK перадае імёны файлаў для запісу дадзеных у розныя месцы, напрыклад, chr_10_bin_52_batch_2_aa.csv. У выніку на дыску ствараецца мноства маленькіх файлаў (для гэтага я выкарыстоўваў тэрабайтныя EBS-томы).
Канвеер з другога раздзела parallel праходзіць па групах (bin) і аб'ядноўвае іх асобныя файлы ў агульныя CSV c cat, А затым адпраўляе іх на экспарт.
Трансляванне ў R?
Чаму я навучыўся: можна звяртацца да stdin и stdout з R-скрыпту, а значыць і выкарыстоўваць яго ў канвееры.
У Bash-скрыпце вы маглі заўважыць такі радок: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Яна транслюе ўсе канкатэнаваныя файлы групы (bin) у ніжэйпрыведзены R-скрыпт. {} з'яўляецца спецыяльнай методыкай parallel, якая любыя якія адпраўляюцца ёю ў паказаны струмень дадзеныя ўстаўляе ў прама ў саму каманду. Опцыя {#} дае унікальны ID патоку выканання, а {%} уяўляе сабой нумар слота задання (паўтараецца, але ніколі адначасова). Спіс усіх опцый можна знайсці ў дакументацыі.
Калі пераменная file("stdin") перадаецца ў readr::read_csv, дадзеныя, трансляваныя ў R-скрыпт, загружаюцца ва фрэйм, які потым у выглядзе .rds-файла з дапамогай aws.s3 запісваецца прама ў S3.
RDS – гэта нешта накшталт малодшай версіі Parquet, без вынаходстваў калонкавага сховішчы.
Пасля завяршэння Bash-скрыпту я атрымаў пачак .rds-файлаў, якія ляжаць у S3, што дало дазволіла мне выкарыстоўваць эфектыўны сціск і ўбудаваныя тыпы.
Нягледзячы на выкарыстанне тармазнога R, усё працавала вельмі хутка. Нядзіўна, што фрагменты на R, якія адказваюць за чытанне і запіс дадзеных, добра аптымізаваны. Пасля тэставання на адной храмасоме сярэдняга памеру, заданне выканалася на інстансе C5n.4xl прыкладна за дзве гадзіны.
Абмежаванні S3
Чаму я навучыўся: дзякуючы разумнай рэалізацыі шляхоў S3 можа апрацоўваць шмат файлаў.
Я турбаваўся, ці зможа S3 апрацаваць мноства перададзеных ёй файлаў. Я мог зрабіць імёны файлаў асэнсаванымі, але як S3 будзе па іх шукаць?
Тэчкі ў S3 усяго толькі для прыгажосці, насамрэч сістэму не цікавіць знак. /. З FAQ-старонкі S3.
Падобна, S3 уяўляе шлях да пэўнага файла ў выглядзе простага ключа ў своеасаблівай хэш-табліцы або базе дадзеных на аснове дакументаў. Бакет (bucket) можна лічыць табліцай, а файлы - запісамі ў гэтай табліцы.
Паколькі хуткасць і эфектыўнасць важныя для атрымання прыбытку ў Amazon, нядзіўна, што гэтая сістэма «ключ-у-якасці-шляху-да-файла» афігенна аптымізавана. Я спрабаваў знайсці баланс: каб не трэба было рабіць мноства get-запытаў, але каб пры гэтым запыты выконваліся хутка. Аказалася, што лепш за ўсё рабіць каля 20 тыс. bin-файлаў. Думаю, калі працягнуць аптымізаваць, то можна дамагчыся падвышэнні хуткасці (напрыклад, рабіць адмысловы бакет толькі для дадзеных, такім чынам памяншаючы памер пошукавай табліцы). Але на далейшыя эксперыменты ўжо не было чакай і грошай.
Што наконт крыжаванай сумяшчальнасці?
Чаму я навучыўся: галоўная прычына страты часу - заўчасная аптымізацыя вашага метаду захоўвання.
У гэты момант вельмі важна спытаць сябе: "Навошта выкарыстоўваць прапрыетарны фармат файлаў?" Чыннік крыецца ў хуткасці загрузкі (запакаваныя gzip CSV-файлы грузіліся ў 7 раз даўжэй) і сумяшчальнасці з нашымі працоўнымі працэсамі. Я магу перагледзець сваё рашэнне, калі R зможа лёгка загружаць файлы Parquet (ці Arrow) без нагрузкі ў выглядзе Spark. У нас у лабараторыі ўсё выкарыстоўваюць R, і калі мне спатрэбіцца пераўтварыць дадзеныя ў іншы фармат, то ў мяне ўсё яшчэ застаюцца зыходныя тэкставыя дадзеныя, таму я магу проста зноў запусціць канвеер.
Падзел працы
Чаму я навучыўся: не спрабуйце аптымізаваць заданні ўручную, няхай гэта робіць кампутар.
Я адладзіў працоўны працэс на адной храмасоме, зараз трэба апрацаваць усе астатнія дадзеныя.
Хацелася падняць некалькі інстансаў EC2 для пераўтварэння, але ў той жа час я баяўся атрымаць вельмі незбалансаваную нагрузку ў розных заданнях на апрацоўку (гэтак жа, як Spark пакутаваў ад незбалансаваных партыцый). Акрамя таго, мне не ўсміхалася паднімаць па адным інстансе на кожную храмасому, таму што для AWS-акаўнтаў ёсць абмежаванне па змаўчанні ў 10 інстансаў.
Тады я вырашыў напісаць на R скрыпт для аптымізацыі заданняў на апрацоўку.
Спачатку папрасіў S3 вылічыць, колькі месца ў сховішча займае кожная храмасома.
library(aws.s3)
library(tidyverse)
chr_sizes <- get_bucket_df(
bucket = '...', prefix = '...', max = Inf
) %>%
mutate(Size = as.numeric(Size)) %>%
filter(Size != 0) %>%
mutate(
# Extract chromosome from the file name
chr = str_extract(Key, 'chr.{1,4}.csv') %>%
str_remove_all('chr|.csv')
) %>%
group_by(chr) %>%
summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB
# A tibble: 27 x 2
chr total_size
<chr> <dbl>
1 0 163.
2 1 967.
3 10 541.
4 11 611.
5 12 542.
6 13 364.
7 14 375.
8 15 372.
9 16 434.
10 17 443.
# … with 17 more rows
Затым я напісаў функцыю, якая бярэ агульны памер, змешвае парадак храмасом, дзеліць іх на групы num_jobs і паведамляе, наколькі адрозніваюцца памеры ўсіх заданняў на апрацоўку.
num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7
shuffle_job <- function(i){
chr_sizes %>%
sample_frac() %>%
mutate(
cum_size = cumsum(total_size),
job_num = ceiling(cum_size/job_size)
) %>%
group_by(job_num) %>%
summarise(
job_chrs = paste(chr, collapse = ','),
total_job_size = sum(total_size)
) %>%
mutate(sd = sd(total_job_size)) %>%
nest(-sd)
}
shuffle_job(1)
# A tibble: 1 x 2
sd data
<dbl> <list>
1 153. <tibble [7 × 3]>
Потым я прагнаў з дапамогай purrr тысячу мяшанняў і абраў лепшае.
Так я атрымаў набор заданняў, вельмі падобных да памеру. Затым заставалася толькі абгарнуць мой папярэдні Bash-скрыпт у вялікі цыкл for. На напісанне гэтай аптымізацыі спатрэбілася каля 10 хвілін. І гэта куды менш, чым я патраціў бы на ручное стварэнне заданняў у выпадку з іх незбалансаванасцю. Таму лічу, што з гэтай папярэдняй аптымізацыяй я не пралічыўся.
for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi
У канцы дадаю каманду выключэння:
sudo shutdown -h now
… і ўсё атрымалася! З дапамогай AWS CLI я паднімаў інстансы і праз опцыю user_data перадаваў ім Bash-скрыпты іх заданняў на апрацоўку. Яны выконваліся і аўтаматычна выключаліся, так што я не плаціў за залішнюю вылічальную магутнасць.
Чаму я навучыўся: API павінен быць простым дзеля прастаты і гнуткасці выкарыстання.
Нарэшце я атрымаў дадзеныя ў патрэбным месцы і выглядзе. Заставалася максімальна спрасціць працэс выкарыстання дадзеных, каб маім калегам было лягчэй. Я хацеў зрабіць просты API для стварэння запытаў. Калі ў будучыні я вырашу перайсці з .rds на Parquet-файлы, гэта павінна быць праблемай для мяне, а не для калегаў. Для гэтага я вырашыў зрабіць унутраны R-пакет.
Сабраў і задакументаваў вельмі просты пакет, які змяшчае ўсяго некалькі функцый для доступу да дадзеных, сабраных вакол функцыі get_snp. Таксама я зрабіў для калег сайт pkgdown, каб яны маглі лёгка паглядзець прыклады і дакументацыю.
Інтэлектуальнае кэшаванне
Чаму я навучыўся: калі вашы дадзеныя добра падрыхтаваны, кэшаваць будзе проста!
Паколькі адзін з асноўных працоўных працэсаў ужываў да пакета SNP адну і тую ж мадэль аналізу, я вырашыў выкарыстоўваць групаванне (binning) у сваіх інтарэсах. Пры перадачы дадзеных па SNP, да які вяртаецца аб'екту прымацоўваецца і ўся інфармацыя з групы (bin). Гэта значыць, старыя запыты могуць (у тэорыі) паскараць апрацоўку новых запытаў.
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
Пры зборцы пакета я прагнаў шмат бенчмаркаў, каб параўнаць хуткасць пры выкарыстанні розных метадаў. Рэкамендую гэтым не грэбаваць, таму што часам вынікі бываюць нечаканымі. Напрыклад, dplyr::filter апынуўся значна хутчэй захопу радкоў з дапамогай фільтрацыі на аснове індэксавання, а атрыманне адной калонкі з адфільтраванага фрэйма дадзеных працавала значна хутчэй, чым ужыванне сінтаксісу індэксавання.
Звярніце ўвагу, што аб'ект prev_snp_results змяшчае ключ snps_in_bin. Гэта масіў усіх унікальных SNP у групе (bin), які дазваляе хутка правяраць, ці ёсць ужо дадзеныя з папярэдняга запыту. Таксама ён спрашчае цыклічны праход па ўсіх SNP у групе (bin) з дапамогай гэтага кода:
# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}
Вынікі
Цяпер мы можам (і пачалі сур'ёзна) праганяць мадэлі і сцэнары, раней нам недаступныя. Самае лепшае, што маім калегам па лабараторыі не даводзіцца думаць пра ўсякія складанасці. У іх проста ёсць функцыя, якая працуе.
І хоць пакет пазбаўляе іх ад падрабязнасцяў, я спрабаваў зрабіць фармат дадзеных дастаткова простым, каб у ім маглі разабрацца, калі заўтра я раптам знікну…
Хуткасць прыкметна вырасла. Звычайна мы скануем функцыянальна значныя фрагменты геному. Раней мы не маглі гэта рабіць (атрымлівалася занадта дорага), але зараз, дзякуючы групавой (bin) структуры і кэшаванню, на запыт аднаго SNP сыходзіць у сярэднім менш 0,1 секунды, а выкарыстанне дадзеных такое нізкае, што выдаткі на атрымліваюцца S3 капейкавыя.
Recently I got put in change of wrangling 25+ TB of raw genotyping data for my lab. When I started, using spark took 8 min & cost $20 to query a SNP. Пасля выкарыстання AWK + #rstats да працэсу, гэта цяпер прымаецца не толькі 10th of a second and costs $0.00001. My personal # ВялікіяДата перамога. pic.twitter.com/ANOXVGrmkk
Гэты артыкул - зусім не кіраўніцтва. Рашэнне атрымалася індывідуальнае, і амаль напэўна не аптымальнае. Хутчэй, гэта аповяд аб вандраванні. Жадаю, каб іншыя зразумелі, што падобныя рашэнні не ўзнікаюць у галаве цалкам сфармаванымі, гэта вынік спроб і памылак. Акрамя таго, калі вы шукаеце адмыслоўца па аналізе дадзеных, то ўлічвайце, што для эфектыўнага выкарыстання гэтых прылад патрэбен досвед, а досвед патрабуе грошай. Я шчаслівы, што ў мяне былі сродкі на аплату, але ў многіх іншых, хто можа зрабіць тую ж працу лепш за мяне, ніколі не будзе такой магчымасці з-за адсутнасці грошай нават на спробу.
Інструменты для вялікіх дадзеных ўніверсальныя. Калі ў вас ёсць час, то амаль напэўна зможаце напісаць хутчэйшае рашэнне, ужыўшы «разумную» ачыстку дадзеных, сховішча і методыкі вымання. У канчатковым рахунку ўсё зводзіцца да аналізу выдаткаў і выгод.
Чаму я навучыўся:
не існуе таннага спосабу адпарсіць 25 Тб за раз;
будзьце асцярожныя з памерам вашых Parquet-файлаў і іх арганізацыяй;
партыцыі ў Spark павінны быць збалансаваны;
наогул ніколі не спрабуйце рабіць 2,5 мільёна партыцый;
сартаваць усё яшчэ цяжка, як і наладжваць Spark;
часам адмысловыя дадзеныя патрабуюць адмысловых рашэнняў;
аб'яднанне ў Spark працуе хутка, але партыцыянаванне ўсё яшчэ абыходзіцца дорага;
не спіце, калі вам выкладаюць асновы, напэўна хтосьці ўжо вырашыў вашу праблему яшчэ ў 1980-х;
gnu parallel - гэта чароўная рэч, усе павінны яе выкарыстоўваць;
Spark кахае несціснутыя дадзеныя і не кахае камбінаваць партіціі;
у Spark занадта шмат накладных выдаткаў пры рашэнні простых задач;
асацыятыўныя масівы ў AWK вельмі эфектыўныя;
можна звяртацца да stdin и stdout з R-скрыпту, а значыць і выкарыстоўваць яго ў канвееры;
дзякуючы разумнай рэалізацыі шляхоў S3 можа апрацоўваць шмат файлаў;
галоўная прычына страты часу - заўчасная аптымізацыя вашага метаду захоўвання;
не спрабуйце аптымізаваць заданні ўручную, няхай гэта робіць кампутар;
API павінен быць простым дзеля прастаты і гнуткасці выкарыстання;
калі вашыя дадзеныя добра падрыхтаваны, кэшаваць будзе проста!