AWK болон R ашиглан 25TB-г задлан шинжлэх

AWK болон R ашиглан 25TB-г задлан шинжлэх
Энэ нийтлэлийг хэрхэн унших вэ: Текст маш урт, эмх замбараагүй байгаад хүлцэл өчье. Би та бүхний цагийг хэмнэхийн тулд бүлэг бүрийг "Би юу сурсан" гэсэн удиртгалаар эхлүүлж байгаа бөгөөд энэ бүлгийн мөн чанарыг нэг юмуу хоёр өгүүлбэрээр хураангуйлсан болно.

"Зүгээр л надад шийдлийг харуулаач!" Хэрэв та намайг хаанаас ирснийг харахыг хүсч байвал "Илүү бүтээлч болох нь" гэсэн хэсэг рүү очоорой, гэхдээ бүтэлгүйтлийн тухай унших нь илүү сонирхолтой бөгөөд хэрэгтэй гэж би бодож байна.

Би саяхан их хэмжээний түүхий ДНХ-ийн дарааллыг (техникийн хувьд SNP чип) боловсруулах процессыг бий болгох даалгавар авсан. Дараачийн загварчлал болон бусад ажилд зориулж өгөгдсөн генетикийн байршлын (SNP гэж нэрлэдэг) мэдээллийг хурдан олж авах шаардлагатай байв. R болон AWK ашиглан би өгөгдлийг байгалийн аргаар цэвэрлэж, цэгцэлж, асуулгын боловсруулалтыг ихээхэн хурдасгасан. Энэ нь надад амаргүй байсан бөгөөд олон давталт шаардсан. Энэ нийтлэл нь танд миний зарим алдаанаас зайлсхийхэд тусалж, юу хийснийг харуулах болно.

Нэгдүгээрт, зарим танилцуулга тайлбар.

мэдээ

Манай их сургуулийн генетикийн мэдээлэл боловсруулах төвөөс бидэнд 25 TB TSV хэлбэрээр мэдээлэл өгсөн. Би тэдгээрийг Gzip-ээр шахсан 5 багцад хуваасан, тус бүр нь 240 орчим дөрвөн гигабайт файл агуулсан хүлээн авсан. Мөр бүр нь нэг хүний ​​нэг SNP-ийн өгөгдлийг агуулна. Нийтдээ ~2,5 сая SNP болон ~60 мянган хүний ​​мэдээллийг дамжуулсан. SNP мэдээллээс гадна файлууд нь унших эрч хүч, янз бүрийн аллелийн давтамж гэх мэт янз бүрийн шинж чанарыг тусгасан тоо бүхий олон тооны багана агуулсан байв. Нийтдээ өвөрмөц утгатай 30 орчим багана байсан.

Зорилго

Аливаа мэдээллийн менежментийн төслийн нэгэн адил хамгийн чухал зүйл бол өгөгдлийг хэрхэн ашиглахыг тодорхойлох явдал байв. Энэ тохиолдолд бид ихэвчлэн SNP-д суурилсан загвар, ажлын урсгалыг сонгох болно. Өөрөөр хэлбэл, бидэнд нэг удаад зөвхөн нэг SNP-ийн өгөгдөл хэрэгтэй болно. Би 2,5 сая SNP-ийн аль нэгтэй холбоотой бүх бүртгэлийг хэрхэн хялбар, хурдан, хямдаар олж авахыг сурах хэрэгтэй болсон.

Үүнийг яаж хийхгүй байх вэ

Тохиромжтой хэллэгийг иш татахын тулд:

Би мянган удаа бүтэлгүйтсэнгүй, асуулгад ээлтэй форматаар олон тооны өгөгдлийг задлан шинжлэхээс зайлсхийх мянга мянган аргыг л олж мэдсэн.

Эхлээд үзээрэй

Би юу сурсан: Нэг удаад 25 TB-г задлан шинжлэх хямд арга байхгүй.

Вандербилтийн их сургуульд "Том өгөгдөл боловсруулах дэвшилтэт аргууд" курст хамрагдсаны дараа би заль мэх нь уутанд байгаа гэдэгт итгэлтэй байсан. Бүх өгөгдлийг ажиллуулж, үр дүнг мэдээлэхийн тулд Hive серверийг тохируулахад нэг эсвэл хоёр цаг шаардагдах байх. Бидний өгөгдөл AWS S3-д хадгалагддаг тул би уг үйлчилгээг ашигласан Athena, энэ нь танд Hive SQL асуулгыг S3 өгөгдөлд ашиглах боломжийг олгодог. Та Hive кластер үүсгэх/босгох шаардлагагүй бөгөөд зөвхөн хайж буй өгөгдлийнхөө төлбөрийг төлнө.

Би Афина-д өөрийн өгөгдөл болон түүний форматыг үзүүлсний дараа би дараах асуултуудтай тест хийсэн:

select * from intensityData limit 10;

Мөн сайн бүтэцтэй үр дүнг хурдан хүлээн авсан. Бэлэн.

Бид өгөгдлийг ажилдаа ашиглахыг оролдох хүртэл ...

Загварыг туршихын тулд SNP-ийн бүх мэдээллийг гаргаж авахыг надаас хүссэн. Би асуулга явуулсан:


select * from intensityData 
where snp = 'rs123456';

... бас хүлээж эхлэв. Найман минут, 4 ТБ-аас дээш хүссэн өгөгдлийг авсны дараа би үр дүнг хүлээн авлаа. Афина олдсон мэдээллийн хэмжээгээрээ нэг терабайт тутамд 5 доллар авдаг. Тиймээс энэ ганц хүсэлт нь 20 доллар, найман минут хүлээхэд зарцуулагдсан. Загварыг бүх өгөгдөл дээр ажиллуулахын тулд бид 38 жил хүлээж, 50 сая доллар төлөх шаардлагатай болсон.Энэ нь бидэнд тохиромжгүй нь ойлгомжтой.

Паркет ашиглах шаардлагатай байсан ...

Би юу сурсан: Паркетан файлын хэмжээ болон тэдгээрийн зохион байгуулалтад болгоомжтой хандана уу.

Би эхлээд бүх TSV-г хөрвүүлэх замаар нөхцөл байдлыг засах гэж оролдсон Паркетан файлууд. Эдгээр нь том өгөгдлийн багцтай ажиллахад тохиромжтой, учир нь тэдгээрт байгаа мэдээлэл нь багана хэлбэрээр хадгалагддаг: багана бүр нь текст файлуудаас ялгаатай нь өөрийн санах ой/дискний сегментэд байрладаг бөгөөд мөр нь багана бүрийн элементүүдийг агуулдаг. Хэрэв танд ямар нэг зүйл олох шаардлагатай бол шаардлагатай баганыг уншаарай. Нэмж дурдахад, файл бүр тодорхой хэмжээний утгыг баганад хадгалдаг тул хэрэв таны хайж буй утга баганын мужид байхгүй бол Spark файлыг бүхэлд нь скан хийхэд цаг алдахгүй.

Би энгийн даалгавар гүйцэтгэсэн AWS цавуу Манай TSV-г Паркет болгон хөрвүүлж, шинэ файлуудыг Афина руу буулгасан. Энэ нь ойролцоогоор 5 цаг зарцуулсан. Гэхдээ би хүсэлтийг биелүүлэхэд ойролцоогоор ижил хэмжээний цаг хугацаа, арай бага мөнгө зарцуулсан. Баримт нь Spark даалгавраа оновчтой болгохыг хичээж, зүгээр л нэг TSV хэсгийг задалж, өөрийн Паркетин хэсэгт хийсэн. Хэсэг бүр нь олон хүний ​​бүх бичлэгийг багтаах хангалттай том байсан тул файл бүр бүх SNP-г агуулсан байсан тул Spark шаардлагатай мэдээллийг задлахын тулд бүх файлыг нээх шаардлагатай болсон.

Сонирхолтой нь, Паркетын анхдагч (мөн санал болгож буй) шахалтын төрөл, хурдан, хуваагдах боломжгүй юм. Тиймээс гүйцэтгэгч бүр 3,5 ГБ-ын багтаамжтай өгөгдлийн багцыг задлах, татаж авах даалгавар дээр гацсан.

AWK болон R ашиглан 25TB-г задлан шинжлэх

Асуудлыг ойлгоцгооё

Би юу сурсан: Ялангуяа өгөгдөл тараагдсан тохиолдолд эрэмбэлэх нь хэцүү байдаг.

Асуудлын мөн чанарыг одоо л ойлгосон юм шиг санагдсан. Би зөвхөн өгөгдлийг хүнээр бус SNP баганаар эрэмбэлэх шаардлагатай болсон. Дараа нь хэд хэдэн SNP нь тусдаа өгөгдлийн хэсэгт хадгалагдах бөгөөд дараа нь Паркетын "ухаалаг" функц нь "зөвхөн үнэ цэнэ нь хязгаарт байгаа тохиолдолд нээгддэг" бүх алдар суугаараа өөрийгөө харуулах болно. Харамсалтай нь кластерт тархсан хэдэн тэрбум мөрийг эрэмбэлэх нь хэцүү ажил байсан.

AWS нь "Би анхаарал сарниулсан оюутан" гэсэн шалтгааны улмаас буцаан олголт олгохыг хүсэхгүй байгаа нь лавтай. Би Amazon Glue дээр эрэмбэлэн ажиллуулсны дараа 2 хоног ажиллаад гацсан.

Хуваалтын талаар юу хэлэх вэ?

Би юу сурсан: Spark дахь хуваалтууд тэнцвэртэй байх ёстой.

Дараа нь би хромосом дахь өгөгдлийг хуваах санааг олсон. Тэдгээрийн 23 нь (хэрэв та митохондрийн ДНХ болон зураглаагүй бүс нутгийг харгалзан үзвэл хэд хэдэн) байдаг.
Энэ нь өгөгдлийг жижиг хэсгүүдэд хуваах боломжийг танд олгоно. Хэрэв та Glue скрипт дэх Spark экспортын функцэд зөвхөн нэг мөр нэмбэл partition_by = "chr", дараа нь өгөгдлийг хувин болгон хуваах ёстой.

AWK болон R ашиглан 25TB-г задлан шинжлэх
Геном нь хромосом гэж нэрлэгддэг олон тооны хэсгүүдээс бүрдэнэ.

Харамсалтай нь бүтсэнгүй. Хромосомууд нь өөр өөр хэмжээтэй байдаг бөгөөд энэ нь өөр өөр мэдээлэлтэй байдаг. Энэ нь зарим зангилаа эрт дуусч, сул зогсолттой байсан тул Spark-ийн ажилчдад илгээсэн даалгавруудыг тэнцвэржүүлж, удаан гүйцэтгэсэн гэсэн үг юм. Гэсэн хэдий ч даалгавруудыг биелүүлсэн. Гэхдээ нэг SNP асуухад тэнцвэргүй байдал дахин асуудал үүсгэв. Том хромосом дээр (өөрөөр хэлбэл бид мэдээлэл авахыг хүсч байгаа) SNP-ийг боловсруулах зардал ердөө 10 дахин буурсан байна. Маш их, гэхдээ хангалттай биш.

Хэрэв бид үүнийг бүр жижиг хэсгүүдэд хуваавал яах вэ?

Би юу сурсан: 2,5 сая хуваалт хийхийг хэзээ ч бүү оролд.

Би бүх зүйлийг гадагш гаргахаар шийдэж, SNP бүрийг хуваасан. Энэ нь хуваалтууд ижил хэмжээтэй байхыг баталгаажуулсан. МУУ САНАА БАЙСАН. Би цавуу хэрэглэж, гэмгүй мөрийг нэмсэн partition_by = 'snp'. Даалгавраа эхлүүлж, гүйцэтгэж эхлэв. Нэг өдрийн дараа би шалгаж үзээд S3 дээр юу ч бичээгүй байсан тул би даалгавраа алсан. Glue завсрын файлуудыг S3-ийн далд байршилд бичиж байсан бололтой, маш олон файл, магадгүй хэдэн сая. Үүний үр дүнд миний алдаа мянга гаруй долларын үнэтэй байсан бөгөөд миний зөвлөгчийг баярлуулсангүй.

Хуваалт + ангилах

Би юу сурсан: Spark тааруулахтай адил эрэмбэлэх нь хэцүү хэвээр байна.

Миний хамгийн сүүлд хуваах оролдлого нь хромосомуудыг хувааж, дараа нь хуваалт бүрийг ангилах явдал байв. Онолын хувьд энэ нь асуулга бүрийг хурдасгах болно, учир нь хүссэн SNP өгөгдөл нь өгөгдсөн мужид хэдэн Паркетан хэсгүүдэд байх ёстой. Харамсалтай нь бүр хуваасан өгөгдлийг ангилах нь хэцүү ажил болж хувирав. Үүний үр дүнд би захиалгат кластерт зориулж EMR рүү шилжиж, илүү уян хатан ажлын урсгалыг бий болгохын тулд найман хүчирхэг жишээ (C5.4xl) болон Sparklyr ашигласан...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...гэхдээ ажил нь дуусаагүй байсан. Би үүнийг янз бүрийн аргаар тохируулсан: асуулга гүйцэтгэгч бүрийн санах ойн хуваарилалтыг нэмэгдүүлсэн, их хэмжээний санах ойтой зангилаа ашигласан, өргөн нэвтрүүлгийн хувьсагч (өргөн нэвтрүүлгийн хувьсагч) ашигласан, гэхдээ эдгээр нь хагас хэмжигдэхүүн болж, аажмаар гүйцэтгэгчид ажиллаж эхэлсэн. бүх зүйл зогсох хүртэл бүтэлгүйтэх.

Би илүү бүтээлч болж байна

Би юу сурсан: Заримдаа тусгай өгөгдөл нь тусгай шийдлийг шаарддаг.

SNP бүр байрлалын утгатай байдаг. Энэ нь хромосомын дагуух суурийн тоотой тохирч буй тоо юм. Энэ бол бидний өгөгдлийг цэгцлэх сайхан бөгөөд байгалийн арга юм. Эхлээд би хромосом бүрийн бүсээр хуваахыг хүссэн. Жишээлбэл, 1-р байр - 2000, 2001 - 4000 гэх мэт. Гэхдээ асуудал нь SNP нь хромосомуудад жигд тархдаггүй тул бүлгийн хэмжээ ихээхэн ялгаатай байх болно.

AWK болон R ашиглан 25TB-г задлан шинжлэх

Үүний үр дүнд би албан тушаалуудыг ангилал (зэрэглэл) болгон хуваах болсон. Татаж авсан өгөгдлийг ашиглан би өвөрмөц SNP, тэдгээрийн байрлал, хромосомын жагсаалтыг авах хүсэлтийг явуулсан. Дараа нь би хромосом бүрийн доторх өгөгдлийг ангилж, SNP-ийг өгөгдсөн хэмжээтэй бүлэгт (хогийн сав) цуглуулсан. Тус бүр нь 1000 SNP гэж үзье. Энэ нь надад SNP-н бүлэг-хромосомын харьцааг өгсөн.

Эцэст нь би 75 SNP-ийн бүлгүүдийг (хогийн сав) хийсэн бөгөөд шалтгааныг доор тайлбарлах болно.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Эхлээд Spark-г туршиж үзээрэй

Би юу сурсан: Оч нэгтгэх нь хурдан боловч хуваах нь үнэтэй хэвээр байна.

Би энэ жижиг (2,5 сая мөр) өгөгдлийн хүрээг Spark руу уншиж, түүнийг түүхий өгөгдөлтэй нэгтгэж, дараа нь шинээр нэмэгдсэн баганаар хуваахыг хүссэн. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

би хэрэглэсэн sdf_broadcast(), тиймээс Spark өгөгдлийн хүрээг бүх зангилаа руу илгээх ёстойг мэддэг. Хэрэв өгөгдөл нь жижиг хэмжээтэй, бүх ажилд шаардлагатай бол энэ нь ашигтай. Үгүй бол Spark ухаалаг байхыг хичээдэг бөгөөд шаардлагатай бол өгөгдлийг түгээдэг бөгөөд энэ нь удаашрал үүсгэж болзошгүй юм.

Дахин хэлэхэд миний санаа бүтсэнгүй: даалгаварууд хэсэг хугацаанд ажиллаж, эвлэлдэн нэгдэж, дараа нь хуваах замаар эхлүүлсэн гүйцэтгэгчид шиг бүтэлгүйтэж эхлэв.

AWK нэмж байна

Би юу сурсан: Чамд анхан шатны мэдлэгийг зааж байгаа үед бүү унт. 1980-аад онд хэн нэгэн таны асуудлыг шийдэж чадсан нь лавтай.

Энэ хүртэл миний Spark-тай холбоотой бүтэлгүйтлийн шалтгаан нь кластер дахь өгөгдлийн төөрөгдөл байсан. Магадгүй урьдчилсан эмчилгээ хийснээр нөхцөл байдал сайжирч магадгүй юм. Би түүхий текстийн өгөгдлийг хромосомын багана болгон хувааж үзэхээр шийдсэн тул Spark-д "урьдчилан хуваагдсан" өгөгдөл өгнө гэж найдаж байсан.

Би StackOverflow дээр баганын утгуудаар хэрхэн хуваахыг хайж олоод олсон ийм сайхан хариулт байна. AWK-ийн тусламжтайгаар та текст файлыг үр дүнг илгээхийн оронд скриптээр бичиж баганын утгуудаар хувааж болно. stdout.

Би үүнийг туршиж үзэхийн тулд Bash скрипт бичсэн. Савласан TSV-н аль нэгийг татаж аваад дараа нь ашиглан задаллаа gzip болон илгээсэн awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Энэ болчихлоо!

Цөмийг дүүргэх

Би юу сурсан: gnu parallel - Энэ бол ид шидийн зүйл, хүн бүр үүнийг ашиглах ёстой.

салах нь нэлээд удаан байсан бөгөөд би эхэлсэн үед htopхүчирхэг (болон үнэтэй) EC2 жишээний ашиглалтыг шалгахын тулд би зөвхөн нэг цөм, 200 МБ санах ой ашиглаж байсан нь тогтоогдсон. Асуудлыг шийдэж, их хэмжээний мөнгө алдахгүйн тулд бид ажлыг хэрхэн зэрэгцүүлэх талаар бодох хэрэгтэй болсон. Аз болоход үнэхээр гайхалтай номонд Тушаалын мөрөнд өгөгдлийн шинжлэх ухаан Би Жерон Янссенсийн параллель байдлын тухай бүлгийг олсон. Үүнээс би сурсан gnu parallel, Unix дээр multithreading-ийг хэрэгжүүлэх маш уян хатан арга.

AWK болон R ашиглан 25TB-г задлан шинжлэх
Би шинэ процессыг ашиглан хуваалт хийж эхлэхэд бүх зүйл хэвийн байсан, гэхдээ саад бэрхшээл байсаар байсан - S3 объектуудыг диск рүү татаж авах нь тийм ч хурдан биш бөгөөд бүрэн параллель биш байв. Үүнийг засахын тулд би үүнийг хийсэн:

  1. Диск дээрх завсрын хадгалалтыг бүрмөсөн арилгаж, S3 татаж авах үе шатыг дамжуулах хоолойд шууд хэрэгжүүлэх боломжтой гэдгийг би олж мэдсэн. Энэ нь би диск рүү түүхий өгөгдөл бичихээс зайлсхийж, AWS дээр илүү жижиг, тиймээс хямдхан хадгалах боломжтой гэсэн үг юм.
  2. баг aws configure set default.s3.max_concurrent_requests 50 AWS CLI-ийн ашигладаг хэлхээний тоог ихээхэн нэмэгдүүлсэн (анхдагчаар 10 байдаг).
  3. Би нэрэндээ n үсэг бүхий сүлжээний хурдыг оновчтой болгосон EC2 хувилбар руу шилжсэн. n-инстанцуудыг ашиглах үед боловсруулах хүчин чадлын алдагдал нь ачаалах хурдыг нэмэгдүүлснээр нөхөгддөг болохыг би олж мэдсэн. Ихэнх ажлуудад би c5n.4xl ашигласан.
  4. Өөрчлөгдсөн gzip тухай pigz, энэ нь файлуудыг задлах анхдагч зэрэгцээ бус даалгаврыг зэрэгцүүлэхийн тулд гайхалтай зүйлсийг хийж чадах gzip хэрэгсэл юм (энэ нь хамгийн бага тусалсан).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Бүх зүйл маш хурдан ажиллахын тулд эдгээр алхмуудыг бие биетэйгээ хослуулсан. Татаж авах хурдыг нэмэгдүүлж, диск бичихийг арилгаснаар би 5 терабайт багцыг хэдхэн цагийн дотор боловсруулах боломжтой болсон.

Энэ жиргээнд 'TSV' гэж дурдсан байх ёстой. Харамсалтай нь.

Шинээр задалсан өгөгдлийг ашиглаж байна

Би юу сурсан: Spark нь шахагдаагүй өгөгдөлд дуртай бөгөөд хуваалтуудыг нэгтгэх дургүй.

Одоо өгөгдөл S3-д задлагдаагүй (унших: хуваалцсан) болон хагас дараалсан форматтай байсан бөгөөд би дахин Spark руу буцаж очих боломжтой болсон. Намайг гэнэтийн зүйл хүлээж байв: би хүссэн зүйлдээ дахин хүрч чадсангүй! Spark-д өгөгдөл хэрхэн хуваагдсаныг хэлэхэд маш хэцүү байсан. Би үүнийг хийсэн ч гэсэн хэтэрхий олон хуваалтууд (95 мянга) байсан бөгөөд би ашигласан үед coalesce Тэдний тоог боломжийн хязгаар хүртэл бууруулсан нь миний хуваалтыг устгасан. Үүнийг засч залруулж чадна гэдэгт итгэлтэй байна, гэвч хоёр өдрийн турш хайсны эцэст би шийдэл олж чадаагүй. Эцэст нь би Spark дээрх бүх даалгаврыг дуусгасан, гэвч үүнд хэсэг хугацаа шаардагдах бөгөөд миний хуваагдсан Паркетын файлууд тийм ч жижиг биш (~200 KB) байсан. Гэсэн хэдий ч өгөгдөл шаардлагатай газар байсан.

AWK болон R ашиглан 25TB-г задлан шинжлэх
Хэт жижиг, тэгш бус, гайхалтай!

Орон нутгийн Spark асуулгын туршилтыг хийж байна

Би юу сурсан: Энгийн асуудлыг шийдвэрлэхэд Spark хэт их ачаалалтай байдаг.

Өгөгдлийг ухаалаг форматаар татаж авснаар би хурдыг шалгаж чадсан. Орон нутгийн Spark серверийг ажиллуулахын тулд R скриптийг тохируулж, дараа нь заасан Паркетын бүлгийн хадгалах сангаас Spark өгөгдлийн хүрээг ачаална уу. Би бүх өгөгдлийг ачаалах гэж оролдсон боловч Sparklyr-д хуваалтыг таньж чадсангүй.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Цаазаар авах ажиллагаа 29,415 секунд үргэлжилсэн. Илүү сайн, гэхдээ аливаа зүйлийг массаар туршиж үзэхэд тийм ч сайн биш. Нэмж дурдахад би санах ойд өгөгдлийн хүрээг кэш хийх гэж оролдох үед 50-аас бага жинтэй өгөгдлийн багцад 15 ГБ-аас дээш санах ой хуваарилсан ч Spark үргэлж гацаж байсан тул кэш хийх ажлыг хурдасгаж чадаагүй.

AWK-руу буцах

Би юу сурсан: AWK дахь ассоциатив массивууд нь маш үр дүнтэй байдаг.

Би илүү өндөр хурд авч чадна гэдгээ ойлгосон. Би үүнийг гайхалтайгаар санав Брюс Барнеттийн AWK заавар Би "гэж нэрлэдэг гайхалтай функцийн талаар уншсан.ассоциатив массивууд" Үндсэндээ эдгээр нь AWK-д ямар нэг шалтгаанаар өөрөөр нэрлэгддэг байсан түлхүүр-утга хосууд бөгөөд тиймээс би тэдний талаар тийм ч их бодсонгүй. Роман Чепляка "ассоциатив массив" гэсэн нэр томъёо нь "түлхүүр-утга хос" гэсэн нэр томъёоноос хамаагүй эртний гэдгийг дурсав. Та ч гэсэн Google Ngram-аас түлхүүр-утга хайх, та энэ нэр томъёог тэнд харахгүй, гэхдээ та ассоциатив массивуудыг олох болно! Нэмж дурдахад "түлхүүр-утга хос" нь ихэвчлэн мэдээллийн сантай холбоотой байдаг тул үүнийг hasshmap-тай харьцуулах нь илүү утга учиртай юм. Би эдгээр ассоциатив массивуудыг ашиглан Spark ашиглахгүйгээр SNP-ээ бин хүснэгт болон түүхий өгөгдөлтэй холбох боломжтой гэдгээ ойлгосон.

Үүнийг хийхийн тулд AWK скрипт дээр би блок ашигласан BEGIN. Энэ бол өгөгдлийн эхний мөрийг скриптийн үндсэн хэсэгт дамжуулахаас өмнө гүйцэтгэгддэг кодын хэсэг юм.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

баг while(getline...) CSV бүлгийн бүх мөрийг ачаалж, эхний баганыг (SNP нэр) ассоциатив массивын түлхүүр болгон тохируулна уу. bin ба хоёр дахь утгыг (бүлэг) утга гэж үзнэ. Дараа нь блок дотор { }, үндсэн файлын бүх мөрөнд гүйцэтгэгддэг, мөр бүрийг гаралтын файл руу илгээдэг бөгөөд энэ нь бүлэг (бин) -ээс хамааран өвөрмөц нэрийг хүлээн авдаг: ..._bin_"bin[$1]"_....

Хувьсагчид batch_num и chunk_id дамжуулах хоолойн өгсөн өгөгдөлтэй таарч, уралдааны нөхцөл байдлаас зайлсхийж, гүйцэтгэлийн хэлхээ бүрийг ажиллуулж байна parallel, өөрийн өвөрмөц файлд бичсэн.

Би бүх түүхий өгөгдлийг AWK-тэй хийсэн өмнөх туршилтаас үлдсэн хромосомын хавтсанд тараасан тул одоо би өөр Bash скрипт бичиж, нэг хромосомыг нэг нэгээр нь боловсруулж, S3 руу илүү гүнзгий хуваагдсан өгөгдлийг илгээх боломжтой болсон.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Скрипт нь хоёр хэсэгтэй parallel.

Эхний хэсэгт өгөгдлийг хүссэн хромосомын талаархи мэдээллийг агуулсан бүх файлаас уншиж, дараа нь энэ өгөгдлийг урсгалуудаар тарааж, файлуудыг зохих бүлгүүдэд (хогийн сав) хуваарилдаг. Нэг файлд олон хэлхээ бичих үед уралдааны нөхцлөөс зайлсхийхийн тулд AWK өөр өөр газар өгөгдөл бичихийн тулд файлын нэрийг дамжуулдаг. chr_10_bin_52_batch_2_aa.csv. Үүний үр дүнд дискэн дээр олон жижиг файлууд үүсдэг (үүнд би терабайт EBS хэмжээг ашигласан).

Хоёр дахь хэсгийн конвейер parallel бүлгүүдийг (хогийн сав) дамжуулж, тэдгээрийн тусдаа файлуудыг нийтлэг CSV болгон нэгтгэдэг c catдараа нь экспортод илгээдэг.

R хэлээр нэвтрүүлэг хийх үү?

Би юу сурсан: Та холбогдож болно stdin и stdout R скриптээс, тиймээс үүнийг дамжуулах хоолойд ашиглах.

Та Bash скрипт дээрээ энэ мөрийг анзаарсан байх: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Энэ нь бүх холбосон бүлгийн файлуудыг (бин) доорх R скрипт рүү хөрвүүлдэг. {} тусгай техник юм parallel, энэ нь заасан урсгал руу илгээсэн аливаа өгөгдлийг тушаалд шууд оруулдаг. Сонголт {#} өвөрмөц урсгалын ID өгдөг, мөн {%} ажлын үүрний дугаарыг илэрхийлнэ (давтан, гэхдээ хэзээ ч нэгэн зэрэг биш). Бүх сонголтуудын жагсаалтыг эндээс олж болно баримт бичиг.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Хувьсагч үед file("stdin") руу дамжуулсан readr::read_csv, R скрипт рүү хөрвүүлсэн өгөгдлийг фрейм рүү ачаалж, дараа нь хэлбэрт оруулна .rds-файл ашиглана aws.s3 S3 руу шууд бичсэн.

RDS нь чанга яригчийг хадгалах шаардлагагүй, Паркетын бага хувилбартай адил зүйл юм.

Bash скриптийг дуусгасны дараа би багц авсан .rds-S3-д байрлах файлууд нь надад үр ашигтай шахалт болон суурилуулсан төрлүүдийг ашиглах боломжийг олгосон.

Тоормос R ашигласан ч бүх зүйл маш хурдан ажилласан. R-ийн өгөгдөл уншиж, бичих хэсгүүд нь маш оновчтой байдаг нь гайхах зүйл биш юм. Дунд зэргийн хэмжээтэй нэг хромосом дээр туршилт хийсний дараа ажил C5n.4xl дээр хоёр цагийн дотор дууссан.

S3 хязгаарлалт

Би юу сурсан: Ухаалаг замын хэрэгжилтийн ачаар S3 нь олон файлыг удирдах боломжтой.

S3 нь түүн рүү шилжүүлсэн олон файлыг зохицуулж чадах болов уу гэж би санаа зовж байсан. Би файлын нэрийг утга учиртай болгож чадах байсан ч S3 тэдгээрийг хэрхэн хайх вэ?

AWK болон R ашиглан 25TB-г задлан шинжлэх
S3 дахь хавтаснууд нь зөвхөн харуулах зориулалттай, үнэндээ систем нь тэмдэгтийг сонирхдоггүй /. S3 FAQ хуудаснаас.

S3 нь тодорхой файлд хүрэх замыг хэш хүснэгт эсвэл баримт бичигт суурилсан мэдээллийн сан дахь энгийн түлхүүр болгон төлөөлдөг бололтой. Хувинг хүснэгт гэж үзэж болох ба файлуудыг тухайн хүснэгтийн бичлэг гэж үзэж болно.

Амазонд ашиг олохын тулд хурд, үр ашигтай байх нь чухал байдаг тул энэ файлын түлхүүр систем нь гайхалтай оновчтой болсон нь гайхах зүйл биш юм. Би тэнцвэрийг олохыг хичээсэн: ингэснээр би олон хүсэлт гаргах шаардлагагүй, харин хүсэлтийг хурдан гүйцэтгэх болно. 20 мянга орчим бин файл хийх нь хамгийн сайн арга юм. Хэрэв бид үргэлжлүүлэн оновчтой болгох юм бол бид хурдыг нэмэгдүүлэх боломжтой гэж бодож байна (жишээлбэл, зөвхөн өгөгдөлд зориулж тусгай хувин хийх, ингэснээр хайлтын хүснэгтийн хэмжээг багасгах). Гэхдээ цаашдын туршилт хийхэд цаг хугацаа, мөнгө байсангүй.

Хөндлөн нийцтэй байдлын талаар юу хэлэх вэ?

Миний сурсан зүйл: Цагийг дэмий үрэх гол шалтгаан бол хадгалах аргыг хугацаанаас нь өмнө оновчтой болгох явдал юм.

Энэ үед өөрөөсөө "Яагаад хувийн файлын форматыг ашигладаг вэ?" гэж асуух нь маш чухал юм. Үүний шалтгаан нь ачаалах хурд (gzip CSV файлуудыг ачаалахад 7 дахин их хугацаа зарцуулсан) болон бидний ажлын урсгалтай нийцэж байгаа явдал юм. R нь Паркет (эсвэл Arrow) файлуудыг Spark ачаалалгүйгээр хялбархан ачаалж чадах эсэхийг дахин бодож магадгүй. Манай лабораторийн бүх хүмүүс R-г ашигладаг бөгөөд хэрэв би өгөгдлийг өөр формат руу хөрвүүлэх шаардлагатай бол би анхны текст өгөгдөлтэй хэвээр байгаа тул би дамжуулах хоолойг дахин ажиллуулж болно.

Ажлын хуваагдал

Би юу сурсан: Ажлыг гараар оновчтой болгох гэж бүү оролд, үүнийг компьютерт зөвшөөр.

Би нэг хромосомын ажлын урсгалыг дибаг хийсэн тул одоо бусад бүх өгөгдлийг боловсруулах шаардлагатай байна.
Би хөрвүүлэхийн тулд хэд хэдэн EC2 инстанцуудыг босгохыг хүссэн боловч тэр үед янз бүрийн боловсруулалтын ажилд маш тэнцвэргүй ачаалал авахаас айж байсан (Яг Spark тэнцвэргүй хуваалтаас болж зовж шаналж байсан). Нэмж дурдахад би хромосом бүрт нэг инстанц өсгөх сонирхолгүй байсан, учир нь AWS акаунтуудын хувьд 10 тохиолдлын өгөгдмөл хязгаар байдаг.

Дараа нь би боловсруулалтын ажлыг оновчтой болгохын тулд R дээр скрипт бичихээр шийдсэн.

Эхлээд би S3-аас хромосом бүр хэр их зай эзэлдэгийг тооцоолохыг хүссэн.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Дараа нь би нийт хэмжээг авч, хромосомын дарааллыг хольж, тэдгээрийг бүлэгт хуваадаг функц бичсэн. num_jobs мөн бүх боловсруулалтын ажлын хэмжээ хэр өөр болохыг хэлж өгнө.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Дараа нь би purrr ашиглан мянган шаффлай гүйж, хамгийн сайныг нь сонгосон.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Тиймээс би том хэмжээтэй ижил төстэй даалгавартай болсон. Дараа нь миний өмнөх Bash скриптийг том гогцоонд боох л үлдлээ for. Энэ оновчлолыг бичихэд 10 минут зарцуулсан. Энэ нь даалгаврууд тэнцвэргүй байсан бол гараар бүтээхэд зарцуулахаас хамаагүй бага юм. Тиймээс би энэ урьдчилсан оновчлолыг зөв хийсэн гэж бодож байна.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Төгсгөлд нь би унтраах командыг нэмнэ:

sudo shutdown -h now

... тэгээд бүх зүйл бүтсэн! AWS CLI-г ашиглан би сонголтыг ашиглан жишээг босгосон user_data тэдэнд боловсруулах даалгаврынхаа Bash скриптийг өгсөн. Тэд гүйж, автоматаар унтарсан тул би нэмэлт боловсруулалтын хүчин чадал төлөөгүй.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Баглацгаая!

Би юу сурсан: Ашиглахад хялбар, уян хатан байх үүднээс API нь энгийн байх ёстой.

Эцэст нь би өгөгдлийг зөв газар, хэлбэрээр авсан. Хамт ажиллагсдад хялбар болгох үүднээс өгөгдлийг ашиглах үйл явцыг аль болох хялбарчлах л үлдлээ. Би хүсэлт үүсгэх энгийн API хийхийг хүссэн. Ирээдүйд би солихоор шийдсэн бол .rds Паркетан файлууд, тэгвэл энэ нь миний хамтран ажиллагсдад биш харин надад асуудал байх ёстой. Үүний тулд би дотоод R багц хийхээр шийдсэн.

Функцийн эргэн тойронд зохион байгуулагдсан цөөн тооны өгөгдөлд хандах функцийг агуулсан маш энгийн багцыг бүтээж, баримтжуул get_snp. Би бас хамт олондоо зориулж вэб сайт хийсэн pkgdown, ингэснээр тэд жишээ болон баримт бичгийг хялбархан харах боломжтой.

AWK болон R ашиглан 25TB-г задлан шинжлэх

Ухаалаг кэш

Би юу сурсан: Хэрэв таны өгөгдөл сайн бэлтгэгдсэн бол кэш хийхэд хялбар байх болно!

Үндсэн ажлын урсгалуудын нэг нь SNP багцад ижил шинжилгээний загварыг ашигласан тул би биннингийг өөртөө ашигтайгаар ашиглахаар шийдсэн. Өгөгдлийг SNP-ээр дамжуулахдаа бүлгийн (хогийн сав) бүх мэдээллийг буцаасан объектод хавсаргана. Өөрөөр хэлбэл, хуучин асуулга нь (онолын хувьд) шинэ асуулгын боловсруулалтыг хурдасгаж чадна.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Багцыг бүтээхдээ би янз бүрийн аргыг ашиглах үед хурдыг харьцуулахын тулд олон шалгуур үзүүлэлтийг гүйлгэсэн. Заримдаа үр дүн нь гэнэтийн байдаг тул би үүнийг үл тоомсорлож болохгүй гэж зөвлөж байна. Жишээлбэл, dplyr::filter индексжүүлсэн өгөгдлийн хүрээнээс нэг баганыг татаж авах нь индексжүүлэх синтаксийг ашиглахаас хамаагүй хурдан байсан.

объект гэдгийг анхаарна уу prev_snp_results түлхүүрийг агуулдаг snps_in_bin. Энэ нь бүлэг (хогийн сав) дахь бүх өвөрмөц SNP-ийн массив бөгөөд танд өмнөх асуулгын өгөгдөл байгаа эсэхийг хурдан шалгах боломжийг олгоно. Энэ нь мөн бүлэгт (хогийн сав) байгаа бүх SNP-г дараах кодоор эргүүлэхэд хялбар болгодог:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

Одоо бид өмнө нь ашиглах боломжгүй байсан загвар, хувилбаруудыг ажиллуулах боломжтой (мөн нухацтай ажиллаж эхэлсэн). Хамгийн сайн зүйл бол лабораторийн хамт олон ямар нэгэн хүндрэлийн талаар бодох шаардлагагүй юм. Тэд зүгээр л ажилладаг функцтэй байдаг.

Хэдийгээр багц нь тэдэнд нарийн ширийн зүйлийг хадгалсан ч би өгөгдлийн форматыг хангалттай энгийн болгохыг хичээсэн бөгөөд хэрэв би маргааш гэнэт алга болвол тэд үүнийг ойлгох болно ...

Хурд нь мэдэгдэхүйц нэмэгдсэн. Бид ихэвчлэн функциональ ач холбогдолтой геномын хэсгүүдийг сканнердсан. Өмнө нь бид үүнийг хийж чадахгүй байсан (энэ нь хэтэрхий үнэтэй байсан), харин одоо бүлгийн (хогийн сав) бүтэц, кэшийн ачаар нэг SNP-ийн хүсэлт дунджаар 0,1 секундээс бага хугацаа шаардагддаг бөгөөд өгөгдлийн хэрэглээ маш их байна. S3-ийн зардал нь самрын зардал багатай.

дүгнэлт

Энэ нийтлэл нь гарын авлага огт биш юм. Шийдэл нь хувь хүн болж хувирсан бөгөөд бараг оновчтой биш юм. Харин ч энэ бол аялалын тэмдэглэл юм. Ийм шийдвэрүүд нь толгойдоо бүрэн төлөвшөөгүй, туршилт, алдааны үр дүн гэдгийг бусад хүмүүс ойлгоосой гэж хүсч байна. Түүнчлэн, хэрэв та өгөгдөл судлаач хайж байгаа бол эдгээр хэрэгслийг үр дүнтэй ашиглах нь туршлага шаарддаг бөгөөд туршлага нь мөнгө шаарддаг гэдгийг санаарай. Төлбөр төлөх боломжтой байсандаа баяртай байна, гэхдээ надаас илүү ижил ажлыг хийж чадах бусад олон хүнд мөнгөгүйн улмаас оролдох боломж хэзээ ч олдохгүй.

Том өгөгдлийн хэрэгслүүд нь олон талт байдаг. Хэрэв танд цаг байгаа бол ухаалаг өгөгдөл цэвэрлэх, хадгалах, задлах арга техникийг ашиглан илүү хурдан шийдлийг бичиж чадна. Эцсийн эцэст энэ нь зардал-үр ашгийн шинжилгээнд ордог.

Миний сурсан зүйл:

  • 25 TB-г нэг дор задлах хямд арга байхгүй;
  • Паркетан файлуудын хэмжээ, тэдгээрийн зохион байгуулалтад болгоомжтой хандах;
  • Spark дахь хуваалтууд тэнцвэртэй байх ёстой;
  • Ерөнхийдөө 2,5 сая хуваалт хийхийг хэзээ ч бүү оролдоорой;
  • Spark-ийг тохируулахтай адил эрэмбэлэх нь хэцүү хэвээр байна;
  • заримдаа тусгай өгөгдөл нь тусгай шийдэл шаарддаг;
  • Очыг нэгтгэх нь хурдан боловч хуваах нь үнэтэй хэвээр байна;
  • Тэд танд анхан шатны мэдлэг олгох үед бүү унт, магадгүй хэн нэгэн таны асуудлыг 1980-аад оны үед шийдэж байсан байх;
  • gnu parallel - энэ бол ид шидийн зүйл, хүн бүр үүнийг ашиглах ёстой;
  • Spark нь шахагдаагүй өгөгдөлд дуртай бөгөөд хуваалтуудыг нэгтгэх дургүй;
  • Энгийн асуудлыг шийдэхэд Spark хэт их ачаалалтай байдаг;
  • AWK-ийн ассоциатив массивууд нь маш үр дүнтэй байдаг;
  • холбогдож болно stdin и stdout R скриптээс, тиймээс үүнийг дамжуулах хоолойд ашиглах;
  • Ухаалаг замын хэрэгжилтийн ачаар S3 нь олон файлыг боловсруулах боломжтой;
  • Цаг хугацаа алдах гол шалтгаан нь хадгалах аргыг хугацаанаас нь өмнө оновчтой болгох явдал юм;
  • даалгавруудыг гараар оновчтой болгохыг бүү оролд, компьютерт үүнийг хийхийг зөвшөөр;
  • Ашиглахад хялбар, уян хатан байх үүднээс API нь энгийн байх ёстой;
  • Хэрэв таны өгөгдөл сайн бэлтгэгдсэн бол кэш хийхэд хялбар байх болно!

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх