AWK va R yordamida 25TB ni tahlil qilish

AWK va R yordamida 25TB ni tahlil qilish
Ushbu maqolani qanday o'qish kerak: Matn uzoq va tartibsiz bo'lgani uchun uzr so'rayman. Vaqtingizni tejash uchun men har bir bobni “Men o‘rganganlarim” muqaddimasi bilan boshlayman, unda bobning mohiyati bir yoki ikkita jumlada jamlanadi.

"Menga yechimni ko'rsating!" Agar siz mening qayerdan kelganimni ko'rishni istasangiz, unda "Ko'proq ixtirochi bo'lish" bo'limiga o'ting, lekin menimcha, muvaffaqiyatsizlik haqida o'qish qiziqroq va foydaliroq.

Yaqinda menga katta hajmdagi xom DNK ketma-ketliklarini (texnik jihatdan SNP chipi) qayta ishlash jarayonini o'rnatish topshirildi. Keyingi modellashtirish va boshqa vazifalar uchun ma'lum bir genetik joylashuv (SNP deb ataladi) haqida tezda ma'lumot olish zarurati edi. R va AWK-dan foydalanib, men ma'lumotlarni tabiiy usulda tozalash va tartibga solishga muvaffaq bo'ldim, bu so'rovlarni qayta ishlashni sezilarli darajada tezlashtirdi. Bu men uchun oson bo'lmadi va ko'p takrorlashni talab qildi. Ushbu maqola ba'zi xatolarimdan qochishingizga yordam beradi va nima bilan yakunlanganimni ko'rsatadi.

Birinchidan, ba'zi kirish tushuntirishlari.

ma'lumotlar

Universitetimiz genetik ma'lumotlarni qayta ishlash markazi bizga 25 TB TSV ko'rinishidagi ma'lumotlarni taqdim etdi. Men ularni Gzip tomonidan siqilgan 5 paketga bo'lingan holda oldim, ularning har birida 240 ga yaqin to'rt gigabaytli fayllar mavjud edi. Har bir qatorda bitta shaxsdan bitta SNP uchun ma'lumotlar mavjud edi. Hammasi bo'lib, ~2,5 million SNP va ~60 ming kishi haqidagi ma'lumotlar uzatildi. SNP ma'lumotlariga qo'shimcha ravishda, fayllar o'qish intensivligi, turli allellarning chastotasi va boshqalar kabi turli xususiyatlarni aks ettiruvchi raqamlarga ega ko'plab ustunlarni o'z ichiga oladi. Hammasi bo'lib noyob qiymatlarga ega 30 ga yaqin ustunlar mavjud edi.

Maqsad

Har qanday ma'lumotlarni boshqarish loyihasida bo'lgani kabi, eng muhimi, ma'lumotlardan qanday foydalanishni aniqlash edi. Ushbu holatda biz asosan SNP asosida SNP uchun modellar va ish oqimlarini tanlaymiz. Ya'ni, bizga bir vaqtning o'zida faqat bitta SNP haqida ma'lumot kerak bo'ladi. Men 2,5 million SNP dan biri bilan bog'liq barcha yozuvlarni iloji boricha oson, tez va arzonroq olishni o'rganishim kerak edi.

Buni qanday qilmaslik kerak

Tegishli klişedan iqtibos keltirish uchun:

Men ming marta muvaffaqiyatsizlikka uchramadim, shunchaki so'rovlar uchun qulay formatda ma'lumotlar to'plamini tahlil qilishdan qochishning minglab usullarini topdim.

Birinchi urinib ko'ring

Men nimani o'rgandim: Bir vaqtning o'zida 25 TB ni tahlil qilishning arzon usuli yo'q.

Vanderbilt universitetida "Katta ma'lumotlarni qayta ishlashning ilg'or usullari" kursini o'rganib, hiyla sumkada ekanligiga amin bo'ldim. Hive serverini barcha ma'lumotlarni ko'rib chiqish va natija haqida xabar berish uchun sozlash uchun bir yoki ikki soat kerak bo'ladi. Bizning ma'lumotlarimiz AWS S3 da saqlanganligi sababli men xizmatdan foydalandim Afina, bu sizga S3 ma'lumotlariga Hive SQL so'rovlarini qo'llash imkonini beradi. Hive klasterini o'rnatish/ko'tarish shart emas, shuningdek, siz faqat izlayotgan ma'lumotlar uchun to'laysiz.

Men Afinaga o'z ma'lumotlarimni va uning formatini ko'rsatganimdan so'ng, men quyidagi so'rovlar bilan bir nechta testlarni o'tkazdim:

select * from intensityData limit 10;

Va tezda yaxshi tuzilgan natijalarni oldi. Tayyor.

Biz ishimizda ma'lumotlardan foydalanishga harakat qilgunimizcha ...

Modelni sinab ko'rish uchun mendan barcha SNP ma'lumotlarini chiqarib olishimni so'rashdi. Men so'rovni bajardim:


select * from intensityData 
where snp = 'rs123456';

...va kuta boshladi. Sakkiz daqiqa va 4 TB dan ortiq so'ralgan ma'lumotlardan so'ng men natijani oldim. Afina topilgan ma'lumotlar hajmi bo'yicha haq oladi, har bir terabayt uchun 5 dollar. Shunday qilib, bu bitta so'rov 20 dollar va sakkiz daqiqa kutishga to'g'ri keldi. Modelni barcha ma'lumotlarda ishlatish uchun biz 38 yil kutishimiz va 50 million dollar to'lashimiz kerak edi.Ochig'i, bu bizga mos emas edi.

Parketdan foydalanish kerak edi...

Men nimani o'rgandim: Parket fayllaringiz hajmi va ularni tashkil qilishda ehtiyot bo'ling.

Avvaliga barcha TSV-larni o'zgartirib, vaziyatni tuzatishga harakat qildim Parket fayllar. Ular katta ma'lumotlar to'plamlari bilan ishlash uchun qulaydir, chunki ulardagi ma'lumotlar ustunli shaklda saqlanadi: matnli fayllardan farqli o'laroq, har bir ustun o'z xotirasi/disk segmentida yotadi, ularning satrlarida har bir ustunning elementlari mavjud. Va agar biror narsa topmoqchi bo'lsangiz, kerakli ustunni o'qing. Bundan tashqari, har bir fayl bir qator qiymatlarni ustunda saqlaydi, shuning uchun siz izlayotgan qiymat ustun oralig'ida bo'lmasa, Spark butun faylni skanerlash uchun vaqtni behuda sarflamaydi.

Men oddiy vazifani bajardim AWS elim TSV-larimizni Parketga aylantirish uchun va yangi fayllarni Afinaga tashladi. Taxminan 5 soat davom etdi. Ammo men so'rovni bajarganimda, uni bajarish uchun taxminan bir xil vaqt va biroz kamroq pul kerak bo'ldi. Gap shundaki, Spark vazifani optimallashtirishga urinib, shunchaki bitta TSV bo'lagini ochib, o'zining Parket bo'lagiga joylashtirdi. Va har bir bo'lak ko'plab odamlarning barcha yozuvlarini o'z ichiga oladigan darajada katta bo'lganligi sababli, har bir faylda barcha SNPlar mavjud edi, shuning uchun Spark kerakli ma'lumotlarni olish uchun barcha fayllarni ochishi kerak edi.

Qizig'i shundaki, Parketning standart (va tavsiya etilgan) siqish turi, tezkor, bo'linmaydi. Shu sababli, har bir ijrochi 3,5 GB hajmdagi to'liq ma'lumotlar to'plamini ochish va yuklab olish vazifasini bajarib qo'ydi.

AWK va R yordamida 25TB ni tahlil qilish

Keling, muammoni tushunaylik

Men nimani o'rgandim: Saralash qiyin, ayniqsa ma'lumotlar taqsimlangan bo'lsa.

Nazarimda, muammoning mohiyatini endi tushungandek bo‘ldim. Men faqat ma'lumotlarni odamlar bo'yicha emas, balki SNP ustuni bo'yicha saralashim kerak edi. Keyin bir nechta SNP alohida ma'lumotlar bo'lagida saqlanadi va keyin Parketning "aqlli" funksiyasi "qiymat diapazonda bo'lsagina ochiladi" butun ulug'vorligi bilan o'zini namoyon qiladi. Afsuski, klaster bo'ylab tarqalgan milliardlab qatorlarni saralash qiyin vazifa bo'lib chiqdi.

AWS, albatta, "Men chalg'igan talabaman" sababi tufayli pulni qaytarishni istamaydi. Amazon Glue-da saralashni boshlaganimdan so'ng, u 2 kun ishladi va qulab tushdi.

Bo'lish haqida nima deyish mumkin?

Men nimani o'rgandim: Spark-dagi qismlar muvozanatli bo'lishi kerak.

Keyin men ma'lumotlarni xromosomalarda bo'lish g'oyasiga keldim. Ulardan 23 tasi bor (agar siz mitoxondriyal DNK va xaritalanmagan hududlarni hisobga olsangiz, yana bir nechtasi).
Bu sizga ma'lumotlarni kichikroq bo'laklarga bo'lish imkonini beradi. Agar siz Glue skriptida Spark eksport funksiyasiga faqat bitta satr qo'shsangiz partition_by = "chr", keyin ma'lumotlar chelaklarga bo'linishi kerak.

AWK va R yordamida 25TB ni tahlil qilish
Genom xromosomalar deb ataladigan ko'plab bo'laklardan iborat.

Afsuski, bu ish bermadi. Xromosomalar turli o'lchamlarga ega, ya'ni har xil miqdordagi ma'lumotlar. Bu shuni anglatadiki, Spark ishchilarga yuborgan vazifalar muvozanatli emas edi va sekin bajarildi, chunki ba'zi tugunlar erta tugadi va ishlamay qoldi. Biroq, vazifalar bajarildi. Ammo bitta SNP so'raganda, nomutanosiblik yana muammolarni keltirib chiqardi. Kattaroq xromosomalarda (ya'ni, biz ma'lumot olishni xohlagan joyda) SNPlarni qayta ishlash narxi atigi 10 baravarga kamaydi. Ko'p, lekin etarli emas.

Agar biz uni kichikroq qismlarga ajratsak nima bo'ladi?

Men nimani o'rgandim: Hech qachon 2,5 million bo'lim qilishga urinmang.

Men hamma narsadan chiqishga qaror qildim va har bir SNPni bo'limga ajratdim. Bu bo'limlarning teng o'lchamda bo'lishini ta'minladi. BU YOMON G'OYA EDI. Men Glue-dan foydalandim va begunoh chiziq qo'shdim partition_by = 'snp'. Vazifa boshlandi va bajarila boshlandi. Bir kundan keyin men tekshirib ko'rdim va S3 ga hali hech narsa yozilmaganligini ko'rdim, shuning uchun men vazifani o'ldirdim. Ko'rinishidan, Glue oraliq fayllarni S3 da yashirin joyga, ko'plab fayllarga, ehtimol bir necha millionga yozganga o'xshaydi. Natijada mening xatoim ming dollardan qimmatga tushdi va ustozimga yoqmadi.

Bo'lish + saralash

Men nimani o'rgandim: Sparkni sozlash kabi saralash hali ham qiyin.

Bo'lishdagi so'nggi urinishim xromosomalarni bo'lish va keyin har bir bo'limni saralashdan iborat edi. Nazariy jihatdan, bu har bir so'rovni tezlashtiradi, chunki kerakli SNP ma'lumotlari ma'lum diapazonda bir nechta Parket bo'laklarida bo'lishi kerak edi. Afsuski, hatto bo'lingan ma'lumotlarni saralash qiyin vazifa bo'lib chiqdi. Natijada, men maxsus klaster uchun EMR-ga o'tdim va yanada moslashuvchan ish oqimini yaratish uchun sakkizta kuchli misol (C5.4xl) va Sparklyr-dan foydalandim...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...ammo, vazifa hali ham bajarilmagan edi. Men uni turli yo'llar bilan sozladim: har bir so'rov ijrochisi uchun xotira ajratishni oshirdim, katta hajmdagi xotiraga ega tugunlardan foydalandim, translyatsiya o'zgaruvchilaridan foydalandim (efir o'zgaruvchilari), lekin har safar bu yarim o'lchovlar bo'lib chiqdi va asta-sekin ijrochilar boshlandi. hamma narsa to'xtamaguncha muvaffaqiyatsiz bo'lish.

Men yanada ijodiy bo'lyapman

Men nimani o'rgandim: Ba'zan maxsus ma'lumotlar maxsus echimlarni talab qiladi.

Har bir SNP pozitsiya qiymatiga ega. Bu uning xromosomasi bo'ylab asoslar soniga mos keladigan raqam. Bu bizning ma'lumotlarimizni tartibga solishning yaxshi va tabiiy usuli. Avvaliga men har bir xromosomaning hududlariga bo'linishni xohladim. Masalan, 1 - 2000, 2001 - 4000 va boshqalar pozitsiyalari. Ammo muammo shundaki, SNPlar xromosomalar bo'ylab teng taqsimlanmagan, shuning uchun guruh o'lchamlari juda katta farq qiladi.

AWK va R yordamida 25TB ni tahlil qilish

Natijada men lavozimlarni toifalarga (darajali) taqsimlashga keldim. Yuklab olingan ma'lumotlardan foydalanib, men noyob SNPlar, ularning joylashuvi va xromosomalari ro'yxatini olish uchun so'rov yubordim. Keyin men har bir xromosoma ichidagi ma'lumotlarni saraladim va SNPlarni ma'lum o'lchamdagi guruhlarga (bin) to'pladim. Aytaylik, har biri 1000 SNP. Bu menga SNP-to-guruh-xromosoma munosabatlarini berdi.

Oxir-oqibat, men 75 SNPdan iborat guruhlarni (bin) qildim, sababi quyida tushuntiriladi.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Avval Spark bilan sinab ko'ring

Men nimani o'rgandim: Spark yig'ish tez, lekin qismlarga ajratish hali ham qimmat.

Men ushbu kichik (2,5 million qator) ma'lumotlar ramkasini Spark-ga o'qib, uni xom ma'lumotlar bilan birlashtirmoqchi bo'ldim va keyin uni yangi qo'shilgan ustunga bo'lindim. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

foydalandim sdf_broadcast(), shuning uchun Spark ma'lumotlar ramkasini barcha tugunlarga yuborishi kerakligini biladi. Agar ma'lumotlar hajmi kichik bo'lsa va barcha vazifalar uchun zarur bo'lsa, bu foydalidir. Aks holda, Spark aqlli bo'lishga harakat qiladi va kerak bo'lganda ma'lumotlarni tarqatadi, bu esa sekinlashuvga olib kelishi mumkin.

Va yana, mening fikrim ish bermadi: vazifalar bir muncha vaqt ishladi, ittifoqni tugatdi va keyin, bo'linish orqali boshlangan ijrochilar singari, ular muvaffaqiyatsizlikka uchradi.

AWK qo'shilmoqda

Men nimani o'rgandim: Sizga asoslarni o'rgatishayotganda uxlamang. Albatta, kimdir sizning muammoingizni 1980-yillarda hal qilgan.

Shu paytgacha Spark bilan bog'liq barcha muvaffaqiyatsizliklarimning sababi klasterdagi ma'lumotlarning chalkashligi edi. Ehtimol, vaziyatni oldindan davolash bilan yaxshilash mumkin. Men xom matn ma'lumotlarini xromosomalar ustunlariga bo'lishga qaror qildim, shuning uchun Spark-ga "oldindan bo'lingan" ma'lumotlarni taqdim etishga umid qildim.

Men StackOverflow-da ustun qiymatlari bo'yicha qanday ajratishni qidirdim va topdim shunday ajoyib javob. AWK yordamida siz matnli faylni natijalarni yuborish o'rniga uni skriptda yozish orqali ustun qiymatlari bo'yicha ajratishingiz mumkin. stdout.

Uni sinab ko'rish uchun Bash skriptini yozdim. Qadoqlangan TSVlardan birini yuklab oldi, so‘ngra uni o‘ramidan chiqardi gzip va yuborildi awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Bu ishladi!

Yadrolarni to'ldirish

Men nimani o'rgandim: gnu parallel - bu sehrli narsa, undan hamma foydalanishi kerak.

Ajralish juda sekin edi va men boshlaganimda htopkuchli (va qimmat) EC2 nusxasidan foydalanishni tekshirish uchun men faqat bitta yadro va taxminan 200 MB xotiradan foydalanayotganim ma'lum bo'ldi. Muammoni hal qilish va ko'p pul yo'qotmaslik uchun biz ishni qanday parallellashtirishni aniqlashimiz kerak edi. Yaxshiyamki, mutlaqo ajoyib kitobda Buyruqlar satrida ma'lumotlar fani Men Jeron Yanssensning parallellashtirish bo'yicha bobini topdim. Undan men o'rgandim gnu parallel, Unixda multithreadingni amalga oshirish uchun juda moslashuvchan usul.

AWK va R yordamida 25TB ni tahlil qilish
Yangi jarayondan foydalangan holda bo'linishni boshlaganimda, hamma narsa yaxshi edi, lekin hali ham muammo bor edi - S3 ob'ektlarini diskka yuklab olish juda tez emas va to'liq parallellashtirilmagan. Buni tuzatish uchun men buni qildim:

  1. Men diskdagi oraliq saqlashni butunlay yo'q qilib, S3 yuklab olish bosqichini to'g'ridan-to'g'ri quvur liniyasida amalga oshirish mumkinligini bilib oldim. Bu shuni anglatadiki, men xom ma'lumotlarni diskka yozishdan qochib, AWS-da undan ham kichikroq va shuning uchun arzonroq saqlashdan foydalanishim mumkin.
  2. jamoa aws configure set default.s3.max_concurrent_requests 50 AWS CLI ishlatadigan iplar sonini sezilarli darajada oshirdi (odatda 10 tasi bor).
  3. Tarmoq tezligi uchun optimallashtirilgan, nomida n harfi bilan EC2 nusxasiga o'tdim. Men n-nashrlardan foydalanganda qayta ishlash quvvatining yo'qolishi yuklash tezligining oshishi bilan qoplanishini aniqladim. Ko'pgina vazifalar uchun men c5n.4xl dan foydalanardim.
  4. O'zgartirildi gzip haqida pigz, bu fayllarni ochishning dastlab parallel bo'lmagan vazifasini parallellashtirish uchun ajoyib narsalarni qila oladigan gzip vositasidir (bu eng kam yordam berdi).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Bu qadamlar bir-biri bilan birlashtirilib, hamma narsa juda tez ishlaydi. Yuklab olish tezligini oshirish va diskda yozishni yo'q qilish orqali men 5 terabaytlik paketni bir necha soat ichida qayta ishlashim mumkin edi.

Ushbu tvitda "TSV" eslatilishi kerak edi. Afsuski.

Yangi tahlil qilingan ma'lumotlardan foydalanish

Men nimani o'rgandim: Spark siqilmagan ma'lumotlarni yoqtiradi va bo'limlarni birlashtirishni yoqtirmaydi.

Endi ma'lumotlar S3-da ochilmagan (o'qish: birgalikda) va yarim tartiblangan formatda edi va men yana Spark-ga qaytishim mumkin edi. Meni syurpriz kutdi: men yana xohlagan narsamga erisha olmadim! Sparkga ma'lumotlar qanday bo'linganligini aniq aytish juda qiyin edi. Va men buni qilganimda ham, juda ko'p bo'limlar (95 ming) borligi va men foydalanganda ma'lum bo'ldi coalesce ularning sonini oqilona chegaralarga qisqartirdi, bu mening bo'linishimni yo'q qildi. Ishonchim komilki, buni tuzatish mumkin, lekin bir necha kun izlashdan keyin men yechim topa olmadim. Oxir-oqibat men Spark-dagi barcha vazifalarni tugatdim, garchi bu biroz vaqt talab qildi va mening bo'lingan Parket fayllarim unchalik kichik bo'lmagan (~ 200 KB). Biroq, ma'lumotlar kerak bo'lgan joyda edi.

AWK va R yordamida 25TB ni tahlil qilish
Juda kichik va notekis, ajoyib!

Mahalliy Spark so'rovlarini sinab ko'rish

Men nimani o'rgandim: Oddiy muammolarni hal qilishda Spark juda ko'p yuklarga ega.

Ma'lumotni aqlli formatda yuklab olish orqali men tezlikni sinab ko'rdim. Mahalliy Spark serverini ishga tushirish uchun R skriptini o'rnating va keyin belgilangan Parket guruhi xotirasidan (bin) Spark ma'lumotlar ramkasini yuklang. Men barcha ma'lumotlarni yuklashga harakat qildim, lekin Sparklyrni qismlarga ajratishni taniy olmadim.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Qatl 29,415 soniya davom etdi. Juda yaxshi, lekin hech narsani ommaviy sinovdan o'tkazish uchun unchalik yaxshi emas. Bundan tashqari, men keshlash bilan ishlarni tezlashtira olmadim, chunki xotirada maʼlumotlar ramkasini keshlashga uringanimda, hatto ogʻirligi 50 dan kam boʻlgan maʼlumotlar toʻplamiga 15 Gb dan ortiq xotira ajratganimda ham Spark har doim qulab tushdi.

AWK sahifasiga qaytish

Men nimani o'rgandim: AWK da assotsiativ massivlar juda samarali.

Men yuqori tezlikka erishishim mumkinligini angladim. Men buni ajoyib tarzda esladim Bryus Barnett tomonidan AWK bo'yicha qo'llanma Men "deb nomlangan ajoyib xususiyat haqida o'qidim.assotsiativ massivlar" Aslida, bular kalit-qiymat juftliklari bo'lib, ular negadir AWK-da boshqacha nomlangan va shuning uchun men ular haqida ko'p o'ylamaganman. Roman Cheplyaka "assotsiativ massivlar" atamasi "kalit-qiymat juftligi" atamasidan ancha eski ekanligini eslatdi. Agar siz Google Ngram-da kalit-qiymatni qidiring, siz u erda bu atamani ko'rmaysiz, lekin assotsiativ massivlarni topasiz! Bundan tashqari, "kalit-qiymat juftligi" ko'pincha ma'lumotlar bazalari bilan bog'liq, shuning uchun uni xashmap bilan solishtirish ancha mantiqiy. Men ushbu assotsiativ massivlardan Spark-dan foydalanmasdan SNP-larni axlat jadvali va xom ma'lumotlar bilan bog'lash uchun foydalanishim mumkinligini angladim.

Buning uchun AWK skriptida men blokdan foydalandim BEGIN. Bu ma'lumotlarning birinchi qatori skriptning asosiy qismiga o'tishidan oldin bajariladigan kod qismidir.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

komanda while(getline...) CSV guruhidagi (bin) barcha qatorlarni yukladi, birinchi ustunni (SNP nomi) assotsiativ massiv uchun kalit sifatida belgilang bin va ikkinchi qiymat (guruh) qiymat sifatida. Keyin blokda { }, asosiy faylning barcha satrlarida bajariladi, har bir satr o'z guruhiga (bin) qarab noyob nom oladigan chiqish fayliga yuboriladi: ..._bin_"bin[$1]"_....

O'zgaruvchilar batch_num и chunk_id poyga holatidan qochib, quvur liniyasi tomonidan taqdim etilgan ma'lumotlarga mos keldi va har bir bajarilish ipi ishlaydi parallel, o'zining noyob fayliga yozgan.

Men barcha xom ma'lumotlarni AWK bilan oldingi tajribamdan qolgan xromosomalardagi papkalarga tarqatganim uchun, endi men bir vaqtning o'zida bitta xromosomani qayta ishlash va S3 ga chuqurroq bo'lingan ma'lumotlarni yuborish uchun boshqa Bash skriptini yozishim mumkin edi.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skript ikki qismdan iborat parallel.

Birinchi bo'limda ma'lumotlar kerakli xromosoma haqida ma'lumotni o'z ichiga olgan barcha fayllardan o'qiladi, so'ngra bu ma'lumotlar fayllarni tegishli guruhlarga (bin) taqsimlovchi iplar bo'ylab taqsimlanadi. Bitta faylga bir nechta mavzular yozganda poyga shartlarini oldini olish uchun AWK ma'lumotlarni turli joylarga yozish uchun fayl nomlarini uzatadi, masalan. chr_10_bin_52_batch_2_aa.csv. Natijada, diskda ko'plab kichik fayllar yaratiladi (buning uchun men terabayt EBS hajmlaridan foydalanganman).

Ikkinchi qismdan konveyer parallel guruhlar (bin) orqali o'tadi va ularning shaxsiy fayllarini umumiy CSV ga birlashtiradi c catva keyin ularni eksportga jo'natadi.

R tilida eshittirish?

Men nimani o'rgandim: Murojaat qilishingiz mumkin stdin и stdout R skriptidan va shuning uchun uni quvur liniyasida ishlating.

Bash skriptingizda ushbu qatorni ko'rgan bo'lishingiz mumkin: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... U barcha birlashtirilgan guruh fayllarini (bin) quyidagi R skriptiga tarjima qiladi. {} maxsus texnika hisoblanadi parallel, bu ko'rsatilgan oqimga yuborgan har qanday ma'lumotlarni to'g'ridan-to'g'ri buyruqning o'ziga kiritadi. Variant {#} noyob mavzu identifikatorini taqdim etadi va {%} ish uyasi raqamini ifodalaydi (takrorlanadi, lekin bir vaqtning o'zida hech qachon). Barcha variantlar ro'yxatini sahifada topishingiz mumkin hujjatlar.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

O'zgaruvchi bo'lganda file("stdin") ga uzatiladi readr::read_csv, R skriptiga tarjima qilingan ma'lumotlar ramkaga yuklanadi, u keyinchalik shaklda bo'ladi .rds-fayl yordamida aws.s3 to'g'ridan-to'g'ri S3 ga yozilgan.

RDS - bu Parketning kichik versiyasiga o'xshaydi, u karnaylarni saqlash uchun qulay emas.

Bash skriptini tugatgandan so'ng, men bir to'plam oldim .rds-S3-da joylashgan fayllar, bu menga samarali siqish va o'rnatilgan turlardan foydalanishga imkon berdi.

Tormoz R dan foydalanishga qaramasdan, hamma narsa juda tez ishladi. R ning ma'lumotlarni o'qiydigan va yozadigan qismlari yuqori darajada optimallashtirilganligi ajablanarli emas. Bitta o'rta o'lchamdagi xromosomada sinovdan so'ng, ish C5n.4xl nusxasida taxminan ikki soat ichida yakunlandi.

S3 cheklovlari

Men nimani o'rgandim: Aqlli yo'lni amalga oshirish tufayli S3 ko'plab fayllarni ishlay oladi.

Men S3 unga uzatilgan ko'plab fayllarni boshqara oladimi, deb xavotirlanardim. Fayl nomlarini mantiqiy qilishim mumkin edi, lekin S3 ularni qanday izlaydi?

AWK va R yordamida 25TB ni tahlil qilish
S3-dagi papkalar faqat ko'rsatish uchun mo'ljallangan, aslida tizim belgi bilan qiziqmaydi /. S3 FAQ sahifasidan.

Ko'rinishidan, S3 ma'lum bir faylga yo'lni xesh-jadval yoki hujjatga asoslangan ma'lumotlar bazasidagi oddiy kalit sifatida ifodalaydi. Chelakni jadval, fayllarni esa ushbu jadvaldagi yozuvlar deb hisoblash mumkin.

Tezlik va samaradorlik Amazonda daromad olish uchun muhim bo'lganligi sababli, ushbu kalit-fayl-yo'l tizimi hayratlanarli darajada optimallashtirilganligi ajablanarli emas. Men muvozanatni topishga harakat qildim: ko'p qabul qilish so'rovlarini qilishim shart emas, balki so'rovlar tezda bajarilishi uchun. Ma'lum bo'lishicha, eng yaxshisi 20 mingga yaqin axlat qutisi. O'ylaymanki, agar biz optimallashtirishni davom ettirsak, biz tezlikni oshirishga erishishimiz mumkin (masalan, faqat ma'lumotlar uchun maxsus chelak yasash, shu bilan qidirish jadvalining hajmini kamaytirish). Ammo keyingi tajribalar uchun vaqt va pul yo'q edi.

O'zaro muvofiqlik haqida nima deyish mumkin?

Men nimani o'rgandim: vaqtni behuda sarflashning birinchi sababi bu saqlash usulini muddatidan oldin optimallashtirishdir.

Shu nuqtada, o'zingizga savol berish juda muhim: "Nega xususiy fayl formatidan foydalanasiz?" Buning sababi yuklash tezligida (gziplangan CSV fayllarni yuklash uchun 7 baravar ko'proq vaqt kerak bo'ldi) va bizning ish oqimlarimiz bilan muvofiqligi. R Spark yukisiz Parket (yoki Arrow) fayllarini osongina yuklay oladimi yoki yo'qligini qayta ko'rib chiqishim mumkin. Laboratoriyamizdagi hamma R dan foydalanadi va agar men ma'lumotlarni boshqa formatga o'zgartirishim kerak bo'lsa, menda hali ham asl matn ma'lumotlari bor, shuning uchun men quvur liniyasini qayta ishga tushirishim mumkin.

Ish taqsimoti

Men nimani o'rgandim: Ishlarni qo'lda optimallashtirishga urinmang, kompyuterga ruxsat bering.

Men bitta xromosomada ish jarayonini tuzatdim, endi boshqa barcha ma'lumotlarni qayta ishlashim kerak.
Men konvertatsiya qilish uchun bir nechta EC2 nusxalarini ko'tarmoqchi edim, lekin shu bilan birga men turli xil ishlov berish ishlarida juda muvozanatsiz yuk olishdan qo'rqardim (xuddi Spark muvozanatsiz bo'limlardan aziyat chekkan). Bundan tashqari, men har bir xromosoma uchun bitta nusxani ko'tarishdan manfaatdor emas edim, chunki AWS hisoblari uchun sukut bo'yicha 10 nusxa chegarasi mavjud.

Keyin qayta ishlash ishlarini optimallashtirish uchun R-da skript yozishga qaror qildim.

Birinchidan, men S3 dan har bir xromosoma qancha saqlash joyini egallashini hisoblashni so'radim.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Keyin men umumiy o'lchamni oladigan, xromosomalarning tartibini aralashtiradigan va ularni guruhlarga ajratadigan funktsiyani yozdim. num_jobs va barcha ishlov berish ishlarining o'lchamlari qanchalik farq qilishini aytadi.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Keyin purrr yordamida minglab aralashtirish orqali yugurdim va eng yaxshisini tanladim.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Shunday qilib, men hajmi jihatidan juda o'xshash vazifalar to'plamini tugatdim. Keyin mening oldingi Bash skriptimni katta halqaga o'rashgina qoldi for. Ushbu optimallashtirish yozish uchun taxminan 10 daqiqa vaqt oldi. Va bu, agar ular muvozanatsiz bo'lsa, qo'lda vazifalarni yaratishga sarflaganimdan ancha kam. Shuning uchun, men ushbu dastlabki optimallashtirish bilan to'g'ri bo'ldim deb o'ylayman.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Oxirida men o'chirish buyrug'ini qo'shaman:

sudo shutdown -h now

... va hammasi yaxshi bo'ldi! AWS CLI-dan foydalanib, men variantni ishlatib, misollarni ko'tardim user_data ularga qayta ishlash uchun topshiriqlarining Bash skriptlarini berdi. Ular ishga tushdi va avtomatik ravishda yopildi, shuning uchun men qo'shimcha ishlov berish uchun to'lamadim.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Kelinglar!

Men nimani o'rgandim: Foydalanish qulayligi va moslashuvchanligi uchun API oddiy bo'lishi kerak.

Nihoyat, men ma'lumotlarni to'g'ri joyda va shaklda oldim. Qolgan narsa, hamkasblarim uchun qulay bo'lishi uchun ma'lumotlardan foydalanish jarayonini iloji boricha soddalashtirish edi. Men so'rovlarni yaratish uchun oddiy API yaratmoqchi edim. Agar kelajakda men o'tishga qaror qilsam .rds Parket fayllariga, keyin bu mening hamkasblarim uchun emas, balki men uchun muammo bo'lishi kerak. Buning uchun men ichki R paketini yaratishga qaror qildim.

Funktsiya atrofida tashkil etilgan bir nechta ma'lumotlarga kirish funktsiyalarini o'z ichiga olgan juda oddiy paketni yarating va hujjatlang get_snp. Men hamkasblarim uchun veb-sayt ham qildim pkgdown, shuning uchun ular misollar va hujjatlarni osongina ko'rishlari mumkin.

AWK va R yordamida 25TB ni tahlil qilish

Aqlli keshlash

Men nimani o'rgandim: Agar ma'lumotlaringiz yaxshi tayyorlangan bo'lsa, keshlash oson bo'ladi!

Asosiy ish oqimlaridan biri SNP to'plamiga bir xil tahlil modelini qo'llaganligi sababli, men o'z foydam uchun biriktirishdan foydalanishga qaror qildim. SNP orqali ma'lumotlarni uzatishda guruhdagi (bin) barcha ma'lumotlar qaytarilgan ob'ektga biriktiriladi. Ya'ni, eski so'rovlar (nazariy jihatdan) yangi so'rovlarni qayta ishlashni tezlashtirishi mumkin.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Paketni yaratishda men turli usullardan foydalanganda tezlikni solishtirish uchun ko'plab mezonlarni o'tkazdim. Men buni e'tiborsiz qoldirmaslikni maslahat beraman, chunki ba'zida natijalar kutilmagan bo'ladi. Masalan, dplyr::filter indekslash asosidagi filtrlash yordamida satrlarni yozib olishdan ko'ra tezroq edi va filtrlangan ma'lumotlar ramkasidan bitta ustunni olish indekslash sintaksisidan foydalanishdan ancha tezroq edi.

E'tibor bering, ob'ekt prev_snp_results kalitni o'z ichiga oladi snps_in_bin. Bu guruhdagi (bin) barcha noyob SNPlar massivi boʻlib, oldingi soʻrovingizdagi maʼlumotlaringiz bor yoki yoʻqligini tezda tekshirish imkonini beradi. Bundan tashqari, ushbu kod yordamida guruhdagi (bin) barcha SNPlar orqali aylanishni osonlashtiradi:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Natijalar

Endi biz ilgari biz uchun mavjud bo'lmagan modellar va stsenariylarni ishga tushirishimiz mumkin (va jiddiy boshladik). Eng yaxshisi, laboratoriyadagi hamkasblarim hech qanday asoratlar haqida o'ylamasligi kerak. Ular faqat ishlaydigan funktsiyaga ega.

Garchi paket ularga tafsilotlarni saqlagan bo'lsa-da, men ma'lumotlar formatini etarlicha sodda qilishga harakat qildim, agar ertaga to'satdan g'oyib bo'lsam, buni tushunishlari mumkin edi ...

Tezlik sezilarli darajada oshdi. Biz odatda funktsional jihatdan muhim genom bo'laklarini skanerlaymiz. Ilgari biz buni qila olmadik (bu juda qimmat bo'lib chiqdi), ammo endi guruh (bin) tuzilishi va keshlash tufayli bitta SNP uchun so'rov o'rtacha 0,1 soniyadan kamroq vaqtni oladi va ma'lumotlardan foydalanish shunchalik ko'p. past S3 uchun xarajatlar yeryong'oq.

xulosa

Ushbu maqola umuman qo'llanma emas. Yechim individual bo'lib chiqdi va deyarli optimal emas. Aksincha, bu sayohatnoma. Boshqalar tushunishlarini istaymanki, bunday qarorlar boshida to'liq shakllangan ko'rinmaydi, ular sinov va xato natijasidir. Bundan tashqari, agar siz ma'lumotlar bo'yicha mutaxassis izlayotgan bo'lsangiz, ushbu vositalardan samarali foydalanish tajribani talab qilishini va tajriba pul talab qilishini yodda tuting. Men to'lash uchun mablag'im borligidan xursandman, lekin mendan ko'ra bir xil ishni qila oladigan ko'plab boshqa odamlarda pul yo'qligi sababli hech qachon sinab ko'rish imkoniyati bo'lmaydi.

Katta ma'lumotlar vositalari ko'p qirrali. Agar vaqtingiz bo'lsa, aqlli ma'lumotlarni tozalash, saqlash va chiqarish usullaridan foydalangan holda tezroq yechim yozishingiz mumkin. Oxir oqibat, bu xarajatlar-foyda tahliliga to'g'ri keladi.

Men nimani o'rgandim:

  • bir vaqtning o'zida 25 TBni tahlil qilishning arzon usuli yo'q;
  • Parket fayllaringiz hajmi va ularni tashkil qilishda ehtiyot bo'ling;
  • Spark-dagi qismlar muvozanatli bo'lishi kerak;
  • Umuman olganda, hech qachon 2,5 million bo'lim qilishga urinmang;
  • Sparkni sozlash kabi saralash hali ham qiyin;
  • ba'zan maxsus ma'lumotlar maxsus echimlarni talab qiladi;
  • Uchqun yig'ish tez, lekin qismlarga ajratish hali ham qimmat;
  • ular sizga asoslarni o'rgatishganda uxlamang, kimdir sizning muammoingizni 1980-yillarda hal qilgan bo'lishi mumkin;
  • gnu parallel - bu sehrli narsa, hamma undan foydalanishi kerak;
  • Spark siqilmagan ma'lumotlarni yoqtiradi va bo'limlarni birlashtirishni yoqtirmaydi;
  • Oddiy muammolarni hal qilishda Spark juda ko'p yuklarga ega;
  • AWK assotsiativ massivlari juda samarali;
  • murojaat qilishingiz mumkin stdin и stdout R skriptidan va shuning uchun uni quvur liniyasida ishlating;
  • Aqlli yo'lni amalga oshirish tufayli S3 ko'plab fayllarni qayta ishlay oladi;
  • Vaqtni behuda sarflashning asosiy sababi - saqlash usulini muddatidan oldin optimallashtirish;
  • vazifalarni qo'lda optimallashtirishga urinmang, kompyuterga ruxsat bering;
  • Foydalanish qulayligi va moslashuvchanligi uchun API sodda bo'lishi kerak;
  • Agar ma'lumotlaringiz yaxshi tayyorlangan bo'lsa, keshlash oson bo'ladi!

Manba: www.habr.com

a Izoh qo'shish