AWK жана R аркылуу 25TB талдоо

AWK жана R аркылуу 25TB талдоо
Бул макаланы кантип окуу керек: Текст ушунчалык узун жана башаламан болгону үчүн кечирим сурайм. Убакытты үнөмдөө үчүн мен ар бир бөлүмдү “Мен эмнени үйрөндүм?” деген кириш сөз менен баштайм, анда бөлүмдүн маңызы бир же эки сүйлөм менен кыскача баяндалат.

"Мага чечүү жолун көрсөт!" Эгерде сиз жөн гана менин кайдан келгенимди көргүңүз келсе, анда "Көбүрөөк ойлоп табуучулук" бөлүмүнө өтүңүз, бирок ийгиликсиздик жөнүндө окуу кызыктуу жана пайдалуу деп ойлойм.

Мага жакында чийки ДНК ырааттуулугунун чоң көлөмүн (техникалык жактан SNP чип) иштетүү процессин орнотуу тапшырмасы берилди. Кийинки моделдөө жана башка тапшырмалар үчүн берилген генетикалык жер (SNP деп аталат) жөнүндө маалыматтарды тез алуу зарыл болгон. R жана AWKди колдонуу менен мен маалыматтарды табигый жол менен тазалап, иреттей алдым, бул суроону иштетүүнү тездетти. Бул мен үчүн оңой болгон жок жана көптөгөн кайталоолорду талап кылды. Бул макала менин кээ бир каталарымдан качууга жардам берет жана мен эмне менен аяктаганымды көрсөтөт.

Биринчиден, кээ бир кириш түшүндүрмөлөр.

маалымат

Биздин университеттин генетикалык маалыматты иштетүү борбору бизге 25 ТБ TSV түрүндөгү маалыматтарды берди. Мен аларды Gzip тарабынан кысылган 5 пакетке бөлүп алдым, алардын ар биринде 240 төрт гигабайттык файлдар бар. Ар бир сап бир адамдын бир SNP маалыматтарын камтыган. Жалпысынан ~ 2,5 миллион SNP жана ~ 60 миң адам жөнүндө маалыматтар берилди. SNP маалыматынан тышкары, файлдар окуу интенсивдүүлүгү, ар кандай аллельдердин жыштыгы ж. Жалпысынан уникалдуу маанилери бар 30га жакын тилке бар болчу.

максат

Бардык маалыматтарды башкаруу долбоору сыяктуу эле, эң негизгиси бул маалыматтар кандайча колдонулаарын аныктоо болгон. Бул учурда биз негизинен SNP негизинде SNP үчүн моделдерди жана иш процесстерин тандайбыз. Башкача айтканда, бизге бир эле учурда бир SNP боюнча маалыматтар керек болот. Мен 2,5 миллион SNPдин бири менен байланышкан бардык жазууларды мүмкүн болушунча оңой, тез жана арзан алууну үйрөнүшүм керек болчу.

Муну кантип кылбаш керек

Ылайыктуу клише цитата кылуу үчүн:

Мен миң жолу ийгиликсиз болгон жокмун, мен жөн гана суроо-талапка ылайыктуу форматта бир топ маалыматтарды талдоодон качуунун миң жолдорун таптым.

Биринчи аракет

Мен эмнени үйрөндүм: Бир убакта 25 ТБ талдоо үчүн арзан жол жок.

Вандербилт университетинде "Чоң маалыматтарды иштетүүнүн өркүндөтүлгөн ыкмалары" курсунан өтүп, мен айла-амал сумкада экенине ишенгем. Уюк серверин орнотуу үчүн бир же эки саат талап кылынышы мүмкүн. Биздин маалыматтар AWS S3 ичинде сакталгандыктан, мен кызматты колдондум Афина, бул сизге Hive SQL сурамдарын S3 берилиштерине колдонууга мүмкүндүк берет. Сиз Hive кластерин орнотуу/көбөйтүүнүн кереги жок, ошондой эле сиз издеп жаткан маалыматтар үчүн гана төлөйсүз.

Мен Афинага маалыматтарымды жана анын форматын көрсөткөндөн кийин, мен төмөнкүдөй суроолор менен бир нече тесттерди өткөрдүм:

select * from intensityData limit 10;

Жана тез эле жакшы структуралаштырылган натыйжаларды алды. Даяр.

Биз өз ишибизде маалыматтарды колдонууга аракет кылганга чейин...

Менден моделди сынап көрүү үчүн бардык SNP маалыматын алып чыгууну суранышты. Мен суроону иштеттим:


select * from intensityData 
where snp = 'rs123456';

...жана күтө баштады. Сегиз мүнөттөн жана 4 ТБдан ашык суралган маалыматтан кийин мен жыйынтыкты алдым. Athena табылган маалыматтардын көлөмү боюнча төлөйт, терабайт үчүн 5 доллар. Ошентип, бул бир өтүнүч $ 20 жана күтүү сегиз мүнөт турат. Моделди бардык маалыматтар боюнча иштетүү үчүн биз 38 жыл күтүп, 50 миллион доллар төлөшүбүз керек болчу.Албетте, бул бизге ылайыктуу эмес.

Паркетти колдонуш керек болчу...

Мен эмнени үйрөндүм: Паркет файлдарыңыздын өлчөмүнө жана аларды уюштурууга этият болуңуз.

Мен адегенде бардык TSV'лерди өзгөртүү менен кырдаалды оңдоого аракет кылдым Паркет файлдары. Алар чоң маалымат топтомдору менен иштөө үчүн ыңгайлуу, анткени алардагы маалымат мамычалык формада сакталат: ар бир тилке тексттик файлдардан айырмаланып, өз эстутум/диск сегментинде жатат, алардын саптары ар бир мамычанын элементтерин камтыйт. Эгер сиз бир нерсе табышыңыз керек болсо, анда керектүү тилкени окуп чыгыңыз. Кошумчалай кетсек, ар бир файл бир катар маанилерди тилкеде сактайт, андыктан сиз издеп жаткан маани тилкенин диапазонунда болбосо, Spark бүт файлды сканерлөө үчүн убакытты текке кетирбейт.

Мен жөнөкөй тапшырманы аткардым AWS клей биздин TSV'лерди Паркетке айландыруу үчүн жана жаңы файлдарды Афинага таштады. Бул 5 саатка жакын убакытты алды. Бирок мен өтүнүчтү аткарганда, аны аткаруу үчүн бирдей убакыт жана бир аз азыраак акча талап кылынды. Чындыгында, Spark тапшырманы оптималдаштырууга аракет кылып, жөн гана TSV бир бөлүгүн таңгактан чыгарып, аны өзүнүн Паркет бөлүгүнө салды. Жана ар бир бөлчөк көптөгөн адамдардын бардык жазууларын камтый тургандай чоң болгондуктан, ар бир файл бардык SNPлерди камтыган, ошондуктан Spark керектүү маалыматты алуу үчүн бардык файлдарды ачууга туура келди.

Кызыгы, Паркеттин демейки (жана сунуш кылынган) кысуу түрү, тез, бөлүү мүмкүн эмес. Ошондуктан, ар бир аткаруучу 3,5 ГБ толук маалымат топтомун ачуу жана жүктөп алуу тапшырмасына тыгылып калган.

AWK жана R аркылуу 25TB талдоо

Келгиле, көйгөйдү түшүнөлү

Мен эмнени үйрөндүм: Сорттоо кыйын, айрыкча, маалыматтар бөлүштүрүлгөн болсо.

Мага маселенин маңызын эми түшүнгөндөй болдум. Мага маалыматтарды адамдар боюнча эмес, SNP тилкеси боюнча сорттоо керек болчу. Андан кийин бир нече SNP өзүнчө маалымат бөлүгүндө сакталат, андан кийин Паркеттин "акылдуу" функциясы "маани диапазондо болсо гана ачылат" бардык даңкы менен өзүн көрсөтөт. Тилекке каршы, кластер боюнча чачыраган миллиарддаган саптарды иреттөө кыйын иш болуп чыкты.

AWS "Мен алаксыган студентмин" деген себеп менен акчаны кайтарууну каалабайт. Мен Amazon клейде сорттоп жүргөндөн кийин, ал 2 күн иштеп, бузулуп калды.

Бөлүү жөнүндө эмне айтууга болот?

Мен эмнени үйрөндүм: Spark ичиндеги бөлүктөр тең салмактуу болушу керек.

Андан кийин мен хромосомадагы маалыматтарды бөлүү идеясына келдим. Алардын 23ү бар (жана дагы бир нечеси, эгерде митохондриялык ДНКны жана карталанбаган аймактарды эске алсаңыз).
Бул маалыматты майда бөлүктөргө бөлүүгө мүмкүндүк берет. Эгерде сиз Clue скриптиндеги Spark экспорттоо функциясына бир эле сап кошсоңуз partition_by = "chr", анда маалыматтар чакаларга бөлүнүшү керек.

AWK жана R аркылуу 25TB талдоо
Геном хромосома деп аталган көптөгөн фрагменттерден турат.

Тилекке каршы, ал ишке ашкан жок. Хромосомалар ар кандай өлчөмдөргө ээ, бул ар кандай маалымат көлөмүн билдирет. Бул Spark жумушчуларга жиберген тапшырмалар тең салмактуу эмес жана жай аткарылганын билдирет, анткени кээ бир түйүндөр эрте бүтүп, бош туруп калган. Бирок, тапшырмалар аткарылды. Бирок бир SNP сураганда, дисбаланс кайрадан көйгөйлөрдү жаратты. Чоңураак хромосомаларда (башкача айтканда, биз маалыматтарды алууну каалаган жерде) SNPлерди иштетүүнүн баасы 10 эсеге гана азайды. Көп, бирок жетишсиз.

Андан да кичине бөлүктөргө бөлсөк эмне болот?

Мен эмнени үйрөндүм: Эч качан 2,5 миллион бөлүктү жасоого аракет кылбаңыз.

Мен бардыгын чечип, ар бир SNPди бөлдүм. Бул бөлүктөр бирдей өлчөмдө болушун камсыз кылды. БУЛ ЖАМАН ИДЕЯ БОЛГОН. Мен Клейди колдонуп, бейкүнөө сызыкты коштум partition_by = 'snp'. Тапшырма башталып, аткарыла баштады. Бир күндөн кийин мен текшерип көрдүм жана S3ке дагы деле эч нерсе жазылбаганын көрдүм, ошондуктан мен тапшырманы өлтүрдүм. Клей S3деги жашыруун жерге ортоңку файлдарды жазып жаткан окшойт, көптөгөн файлдар, балким, бир нече миллион. Жыйынтыгында менин катам миң доллардан ашып, устатыма жаккан жок.

Бөлүү + сорттоо

Мен эмнени үйрөндүм: Спаркты жөндөө сыяктуу эле сорттоо дагы деле кыйын.

Бөлүү боюнча менин акыркы аракетим хромосомаларды бөлүп, андан кийин ар бир бөлүүнү сорттоо менен камтылган. Теориялык жактан алганда, бул ар бир суроону тездетет, анткени каалаган SNP маалыматтары берилген диапазондо бир нече Паркет бөлүктөрүндө болушу керек болчу. Тилекке каршы, ал тургай бөлүнгөн маалыматтарды сорттоо кыйын иш болуп чыкты. Натыйжада, мен ыңгайлаштырылган кластер үчүн EMRге өтүп, ийкемдүү иш процессин түзүү үчүн сегиз күчтүү инстанцияны (C5.4xl) жана Sparklyr колдондум...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...бирок тапшырма дагы эле аткарыла элек. Мен аны ар кандай жолдор менен конфигурацияладым: ар бир суроону аткаруучу үчүн эстутум бөлүштүрүүнү көбөйттү, эстутумдун чоң көлөмү менен түйүндөрдү колдондум, уктуруу өзгөрмөлөрүн колдондум (берүү өзгөрмөлөрү), бирок ар бир жолу булар жарым өлчөм болуп чыкты, акырындык менен аткаруучулар иштей баштады. баары токтогонго чейин ийгиликсиз болуу.

Чыгармачыл болуп баратам

Мен эмнени үйрөндүм: Кээде атайын маалыматтар атайын чечимдерди талап кылат.

Ар бир SNP позициянын маанисине ээ. Бул анын хромосомасынын боюндагы базалардын санына туура келген сан. Бул биздин маалыматтарды уюштуруунун жакшы жана табигый жолу. Адегенде мен ар бир хромосоманын аймактарына бөлгүм келди. Мисалы, позициялар 1 - 2000, 2001 - 4000 ж.б. Бирок маселе SNPs хромосомалар боюнча бирдей бөлүштүрүлгөн эмес, ошондуктан топтун өлчөмдөрү абдан ар түрдүү болот.

AWK жана R аркылуу 25TB талдоо

Натыйжада мен кызмат орундарын категорияларга (ранам) бөлүштүрүүгө келдим. Жүктөлүп алынган маалыматтарды колдонуп, мен уникалдуу SNPтердин, алардын позицияларынын жана хромосомаларынын тизмесин алуу өтүнүчүн аткардым. Андан кийин мен ар бир хромосоманын ичиндеги маалыматтарды сорттоп, SNPдерди берилген өлчөмдөгү топторго (бин) чогулттум. Ар бири 1000 SNP дейли. Бул мага SNP-то-группа-ар-хромосома мамилесин берди.

Акыр-аягы, мен 75 SNP топторун (бин) жасадым, себеби төмөндө түшүндүрүлөт.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Алгач Spark менен аракет кылыңыз

Мен эмнени үйрөндүм: Учкун топтоо тез, бирок бөлүү дагы деле кымбат.

Мен бул кичинекей (2,5 миллион сап) маалымат алкагын Sparkка окуп, аны чийки маалыматтар менен бириктирип, анан жаңы кошулган тилкеге ​​бөлгүм келди bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Мен колдондум sdf_broadcast(), ошондуктан Spark маалымат алкагын бардык түйүндөргө жөнөтүшү керек экенин билет. Бул маалымат кичинекей жана бардык тапшырмалар үчүн талап кылынса пайдалуу. Болбосо, Spark акылдуу болууга аракет кылат жана керек болсо маалыматтарды таркатат, бул жайлоолорго алып келиши мүмкүн.

Анан дагы менин оюм ишке ашкан жок: тапшырмалар бир аз убакыт иштеп, союзду бүтүрүп, анан бөлүү жолу менен ишке киргизген аткаруучулардай болуп, ишке ашпай калды.

AWK кошуу

Мен эмнени үйрөндүм: Негизги нерселерди үйрөтүп жатканда уктабаңыз. Албетте, кимдир бирөө сиздин көйгөйүңүздү 1980-жылдары чечкен.

Ушул убакка чейин менин Spark менен болгон бардык кемчиликтеримдин себеби кластердеги маалыматтардын башаламандыгы болду. Балким, абалды алдын ала дарылоо менен жакшыртууга болот. Мен чийки тексттик маалыматтарды хромосомалардын мамычаларына бөлүүнү чечтим, ошондуктан Sparkка "алдын ала бөлүнгөн" маалыматтарды берем деп үмүттөнгөм.

Мен StackOverflow'тан тилке маанилери боюнча кантип бөлүү керектигин издеп таап, таптым ушундай сонун жооп. AWK менен сиз текст файлын натыйжаларды жөнөтпөй, аны скриптке жазып, мамычанын маанилери боюнча бөлсөңүз болот. stdout.

Мен аны сынап көрүү үчүн Bash сценарийин жаздым. Пакеттелген TSV'лердин бирин жүктөп алып, андан кийин аны колдонуу менен ачты gzip жана жөнөтүлдү awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ал иштеген!

Өзөктөрдү толтуруу

Мен эмнени үйрөндүм: gnu parallel - бул сыйкырдуу нерсе, аны баары колдонушу керек.

Ажырашуу абдан жай жана мен баштаганда болду htopкүчтүү (жана кымбат) EC2 инстанциясын колдонууну текшерүү үчүн, мен бир гана өзөктү жана 200 МБ эстутумду колдонуп жаткан экенмин. Маселени чечүү жана көп акчаны жоготпоо үчүн биз ишти кантип параллелдештирүүнү чечишибиз керек болчу. Бактыга жараша, таптакыр укмуштуудай китепте Буйрук сабында маалымат илими Мен Жерон Янссенстин параллелизация боюнча бөлүмүн таптым. Андан үйрөндүм gnu parallel, Unixте multithreadingди ишке ашыруу үчүн абдан ийкемдүү ыкма.

AWK жана R аркылуу 25TB талдоо
Мен жаңы процессти колдонуу менен бөлүүнү баштаганда, баары жакшы болду, бирок дагы эле кыйынчылык бар болчу - S3 объекттерин дискке жүктөө өтө тез эмес жана толук параллелдүү эмес. Муну оңдоо үчүн мен муну жасадым:

  1. Мен S3 жүктөө стадиясын түздөн-түз куурда ишке ашырууга болорун билдим, ал дискте аралык сактагычты толугу менен жок кылат. Бул мен дискке чийки маалыматтарды жазуудан качып, AWSде андан да кичине, демек, арзаныраак сактоону колдоно аларымды билдирет.
  2. команда aws configure set default.s3.max_concurrent_requests 50 AWS CLI колдонгон жиптердин санын бир топ көбөйттү (демейки боюнча 10 бар).
  3. Мен аталышында n тамгасы менен тармактын ылдамдыгы үчүн оптималдаштырылган EC2 инстанциясына өттүм. Мен n-инстанцияларды колдонууда иштетүү күчүн жоготуу жүктөө ылдамдыгынын өсүшүнө караганда компенсацияланарын байкадым. Көпчүлүк тапшырмалар үчүн c5n.4xl колдондум.
  4. Өзгөрүлдү gzip боюнча pigz, бул файлдарды ачуу боюнча параллелдүү эмес тапшырманы параллелдөө үчүн сонун нерселерди жасай турган gzip куралы (бул эң аз жардам берди).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Бул кадамдар бири-бири менен айкалышып, бардыгынын тез иштешин камсыз кылат. Жүктөп алуу ылдамдыгын жогорулатуу жана дискке жазууларды жок кылуу менен, мен азыр бир нече сааттын ичинде 5 терабайт пакетти иштете алмакмын.

Бул твитте "TSV" деген сөз болушу керек. Аттиң.

Жаңы талданган маалыматтарды колдонуу

Мен эмнени үйрөндүм: Spark кысылбаган маалыматтарды жактырат жана бөлүмдөрдү бириктирүүнү жактырбайт.

Азыр маалыматтар S3 форматында ачылбаган (окуу: бөлүшүлгөн) жана жарым иреттелген форматта болгон жана мен Sparkка кайра кайтып келе алам. Мени сюрприз күтүп турду: мен дагы каалаган нерсеме жете албадым! Sparkка маалыматтар кандайча бөлүнгөнүн так айтуу абдан кыйын болду. Мен муну жасаганымда да, өтө көп бөлүмдөр (95 миң) болуп чыкты жана мен колдонгондо coalesce алардын санын акылга сыярлык чектерге чейин кыскартты, бул менин бөлүнүшүмдү жок кылды. Мен муну оңдоого болоруна ишенем, бирок бир-эки күн издегенден кийин мен чечим таба алган жокмун. Акыры, мен Sparkтын бардык тапшырмаларын бүтүрдүм, бирок ал бир аз убакытты талап кылды жана менин бөлүнгөн Паркет файлдарым анчалык деле кичинекей эмес (~200 КБ). Бирок, маалыматтар керектүү жерде болгон.

AWK жана R аркылуу 25TB талдоо
Өтө кичинекей жана тегиз эмес, сонун!

Жергиликтүү Spark сурамдары сыналууда

Мен эмнени үйрөндүм: Жөнөкөй маселелерди чечүүдө Spark өтө көп чыгымдарды талап кылат.

Маалыматты акылдуу форматта жүктөп алуу менен мен ылдамдыкты сынай алдым. Жергиликтүү Spark серверин иштетүү үчүн R скриптин орнотуңуз, андан кийин көрсөтүлгөн Паркет тобунун сактагычынан Spark маалымат алкагын жүктөңүз. Мен бардык маалыматтарды жүктөөгө аракет кылдым, бирок Sparklyr бөлүүнү тааный алган жок.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Аткаруу 29,415 секундга созулган. Бир топ жакшы, бирок бир нерсени массалык түрдө сыноо үчүн өтө жакшы эмес. Кошумчалай кетсек, мен кэштөө менен ишти тездете алган жокмун, анткени мен эстутумдагы маалымат алкагын кэштоого аракет кылганымда, Spark 50тен аз салмактагы маалымат топтомуна 15 ГБдан ашык эстутумду бөлгөнүмдө да дайыма бузулуп калат.

AWK барагына кайтуу

Мен эмнени үйрөндүм: AWKдагы ассоциативдик массивдер абдан эффективдүү.

Мен жогорку ылдамдыкка жетише аларымды түшүндүм. Мен ошону эстедим керемет Брюс Барнетт тарабынан AWK окуу куралы Мен "деген сонун функция жөнүндө окудум.ассоциативдик массивдер" Негизи, булар AWKде кандайдыр бир себептерден улам башкача аталып калган ачкыч-нарк жуптары, ошондуктан мен алар жөнүндө көп ойлонгон жокмун. Роман Чепляка "ассоциативдик массивдер" термини "ачкыч-маанилик жуп" терминине караганда бир топ эски экенин эске салды. Сен болсо да Google Ngramдагы ачкыч-маанисин изде, сиз ал жерде бул терминди көрбөйсүз, бирок сиз ассоциативдик массивдерди табасыз! Мындан тышкары, "ачкыч-баа жуп" көбүнчө маалымат базалары менен байланышкан, ошондуктан аны хэшмап менен салыштыруу алда канча мааниси бар. Мен бул ассоциативдик массивдерди Spark'ты колдонбостон, SNPлерди бин таблицасы жана чийки маалыматтар менен байланыштыруу үчүн колдоно аларымды түшүндүм.

Бул үчүн, AWK скриптинде мен блокту колдондум BEGIN. Бул маалыматтардын биринчи сабы скрипттин негизги бөлүгүнө өткөнгө чейин аткарылуучу коддун бир бөлүгү.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

команда while(getline...) CSV тобунан бардык саптарды жүктөгөн (бин), биринчи тилкени (SNP аталышын) ассоциативдик массивдин ачкычы катары коюңуз bin жана экинчи маани (топ) маани катары. Андан кийин блокто { }, негизги файлдын бардык саптарында аткарылат, ар бир сап өзүнүн тобуна (бин) жараша уникалдуу аталышты алган чыгаруу файлына жөнөтүлөт: ..._bin_"bin[$1]"_....

өзгөрмөлөр batch_num и chunk_id түтүк тарабынан берилген маалыматтарга дал келип, жарыш абалынан качуу жана ар бир аткарылуучу жип чуркоо parallel, өзүнүн уникалдуу файлына жазган.

Мен бардык чийки маалыматтарды AWK менен болгон мурунку экспериментимден калган хромосомалардын папкаларына чачыратып салгандыктан, эми мен бир эле учурда бир хромосоманы иштеп чыгуу үчүн дагы бир Bash скриптин жазып, S3ге тереңирээк бөлүнгөн маалыматтарды жөнөтө алам.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Сценарий эки бөлүктөн турат parallel.

Биринчи бөлүмдө маалыматтар керектүү хромосома боюнча маалыматты камтыган бардык файлдардан окулат, андан кийин бул маалыматтар жиптер боюнча бөлүштүрүлөт, алар файлдарды тиешелүү топторго (бин) бөлүштүрүшөт. Бир эле файлга бир нече жип жазганда жарыш шарттарын болтурбоо үчүн, AWK ар кайсы жерлерге маалыматтарды жазуу үчүн файл атын өткөрүп берет, мис. chr_10_bin_52_batch_2_aa.csv. Натыйжада, дискте көптөгөн майда файлдар түзүлөт (бул үчүн мен терабайт EBS томдорун колдондум).

Экинчи участоктон конвейер parallel топтор (бин) аркылуу өтүп, алардын жеке файлдарын жалпы CSV c catанан аларды экспортко жиберет.

R тилинде берүү?

Мен эмнени үйрөндүм: Байланышса болот stdin и stdout R скриптинен, ошондуктан аны түтүктө колдонуңуз.

Сиз Bash скриптиңизде бул сапты байкаган болушуңуз мүмкүн: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ал бардык бириктирилген топ файлдарын (бин) төмөндөгү R скриптине которот. {} атайын техника болуп саналат parallel, ал көрсөтүлгөн агымга жөнөткөн маалыматтарды түздөн-түз буйруктун өзүнө киргизет. Опция {#} уникалдуу жип ID камсыз кылат, жана {%} жумуш уясынын номерин билдирет (кайталануучу, бирок эч качан бир убакта эмес). Бардык варианттардын тизмесин бул жерден тапса болот документтер.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Качан өзгөрмө file("stdin") га берилет readr::read_csv, R скриптине которулган маалыматтар кадрга жүктөлөт, ал андан кийин формада болот .rds- файлды колдонуу aws.s3 түздөн-түз S3 жазылган.

RDS - бул паркеттин кенже версиясы сыяктуу, динамиктердин сактагычы жок.

Баш сценарийин аяктагандан кийин мен бир таңгак алдым .rds-Files S3 жайгашкан, бул мага натыйжалуу кысуу жана камтылган түрлөрүн колдонууга мүмкүндүк берди.

тормоз R колдонууга карабастан, баары абдан тез иштеген. Таң калыштуу эмес, Rдын маалыматтарды окуган жана жазган бөлүктөрү абдан оптималдаштырылган. Орто чоңдуктагы бир хромосоманы сынап көргөндөн кийин, жумуш C5n.4xl инстанциясында эки сааттын ичинде бүттү.

S3 Чектөөлөр

Мен эмнени үйрөндүм: Акылдуу жолду ишке ашыруунун аркасында S3 көптөгөн файлдарды иштете алат.

Мен S3 ага өткөрүлүп берилген көптөгөн файлдарды иштете алабы деп чочулап жаттым. Мен файлдын атын түшүнө алмакмын, бирок S3 аларды кантип издейт?

AWK жана R аркылуу 25TB талдоо
S3 ичиндеги папкалар жөн гана көрсөтүү үчүн, чындыгында система символго кызыкдар эмес /. S3 FAQ баракчасынан.

Көрүнүп тургандай, S3 белгилүү бир файлга жолду хэш таблицасында же документке негизделген маалымат базасында жөнөкөй ачкыч катары көрсөтөт. Чаканы таблица катары, ал эми файлдарды ошол таблицадагы жазуулар катары кароого болот.

Амазонкадан киреше алуу үчүн ылдамдык жана натыйжалуулук маанилүү болгондуктан, бул ачкыч-файл-жол системасы укмуш оптималдаштырылганы таң калыштуу эмес. Мен тең салмактуулукту табууга аракет кылдым: көп суроо-талаптарды кабыл албоо үчүн, бирок сурамдар тез аткарылышы үчүн. 20 миңге жакын бин файлдарды жасоо эң жакшы экени белгилүү болду. Менин оюмча, биз оптималдаштырууну уланта берсек, ылдамдыктын өсүшүнө жетише алабыз (мисалы, маалыматтар үчүн гана атайын чака жасап, ошону менен издөө таблицасынын көлөмүн азайтабыз). Бирок андан аркы эксперименттерге убакыт да, акча да жок болчу.

Кайчылаш шайкештик жөнүндө эмне айтууга болот?

Мен эмнени билдим: Убакытты текке кетирүүнүн биринчи себеби - сактоо ыкмасын мөөнөтүнөн мурда оптималдаштыруу.

Бул учурда, өзүңүзгө суроо берүү абдан маанилүү: "Эмне үчүн менчик файл форматын колдоносуз?" Мунун себеби жүктөө ылдамдыгында (gzip CSV файлдарын жүктөөгө 7 эсе көп убакыт талап кылынды) жана иш процесстерибизге шайкеш келет. R Spark жүгү жок Parket (же Жебе) файлдарын оңой жүктөй алабы, мен кайра карап чыгышым мүмкүн. Биздин лабораторияда бардыгы R колдонушат, эгер мен маалыматтарды башка форматка айландыруу керек болсо, менде дагы эле баштапкы текст маалыматтары бар, ошондуктан мен кайра куурду иштете алам.

Иш бөлүштүрүү

Мен эмнени үйрөндүм: Жумуштарды кол менен оптималдаштырууга аракет кылбаңыз, аны компьютер жасай берсин.

Мен бир хромосомадагы иш процессин оңдодум, эми калган бардык маалыматтарды иштеп чыгышым керек.
Мен конвертациялоо үчүн бир нече EC2 инстанцияларын көтөргүм келди, бирок ошол эле учурда мен ар кандай иштетүү жумуштарында өтө тең салмаксыз жүктү алуудан корктум (Spark тең салмаксыз бөлүмдөрдөн жапа чеккен сыяктуу). Мындан тышкары, мен бир хромосома үчүн бир инстанцияны көтөрүүгө кызыккан жокмун, анткени AWS каттоо эсептери үчүн демейки 10 инстанция чеги бар.

Андан кийин кайра иштетүү жумуштарын оптималдаштыруу үчүн R тилинде сценарий жазууну чечтим.

Биринчиден, мен S3тен ар бир хромосома канча сактагыч мейкиндигин ээлегенин эсептөөнү сурандым.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Анан мен жалпы өлчөмүн алып, хромосомалардын тартибин аралаштырып, аларды топторго бөлүүчү функцияны жаздым. num_jobs жана бардык иштетүү жумуштарынын өлчөмдөрү кандайча ар түрдүү экенин айтып берет.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Анан мен purrr аркылуу миң аралаштыруудан өтүп, эң жакшысын тандап алдым.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Ошентип, мен көлөмү жагынан абдан окшош болгон бир катар тапшырмаларды аткардым. Андан кийин менин мурунку Баш сценарийимди чоң циклге ороп коюу гана калды for. Бул оптималдаштыруу жазууга 10 мүнөткө жакын убакытты алды. Жана бул менин тапшырмаларды кол менен түзүүгө жумшагандан алда канча аз, эгерде алар тең салмактуу болбосо. Ошондуктан, мен бул алдын ала оптималдаштыруу менен туура болду деп ойлойм.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Аягында мен өчүрүү буйругун кошом:

sudo shutdown -h now

... жана баары ойдогудай болду! AWS CLI колдонуп, мен вариантты колдонуп инстанцияларды көтөрдүм user_data иштеп чыгуу үчүн алардын тапшырмаларынын Баш сценарийлерин берди. Алар чуркап, автоматтык түрдө өчүп калышты, ошондуктан мен кошумча иштетүү үчүн акча төлөгөн жокмун.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Кел, таңгактайлы!

Мен эмнени үйрөндүм: API колдонуунун жеңил жана ийкемдүүлүгү үчүн жөнөкөй болушу керек.

Акыры мен керектүү жерде жана формада маалыматтарды алдым. Болгону менин кесиптештериме жеңилдетүү үчүн маалыматтарды колдонуу процессин мүмкүн болушунча жөнөкөйлөштүрүү гана калды. Сурамдарды түзүү үчүн жөнөкөй API жасагым келди. Келечекте мен андан өтүүнү чечсем .rds Паркет файлдарына, анда бул менин кесиптештерим үчүн эмес, мен үчүн көйгөй болушу керек. Бул үчүн мен ички R пакетин жасоону чечтим.

Функциянын айланасында уюштурулган бир нече маалыматка жетүү функцияларын камтыган өтө жөнөкөй пакетти түзүп, документтештириңиз get_snp. Кесиптештерим үчүн да сайт жасадым pkgdown, ошондуктан алар мисалдарды жана документтерди оңой көрө алышат.

AWK жана R аркылуу 25TB талдоо

Акылдуу кэштөө

Мен эмнени үйрөндүм: Эгер маалыматыңыз жакшы даярдалган болсо, кэштөө оңой болот!

Негизги иш процесстеринин бири SNP пакетине бир эле талдоо моделин колдонгондуктан, мен биннингди өз пайдам үчүн колдонууну чечтим. SNP аркылуу берилиштерди өткөрүп жатканда, топтун (бин) бардык маалыматы кайтарылган объектке тиркелет. Башкача айтканда, эски сурамдар (теорияда) жаңы сурамдарды иштетүүнү тездете алат.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Пакетти курууда мен ар кандай ыкмаларды колдонууда ылдамдыкты салыштыруу үчүн көптөгөн эталондорду иштеттим. Мен муну кайдыгер калтырбоону сунуштайм, анткени кээде натыйжалар күтүүсүз болот. Мисалы, dplyr::filter индекстөөнүн негизиндеги чыпкалоо аркылуу саптарды басып алуудан алда канча тезирээк болгон жана чыпкаланган маалымат алкагынан бир тилкени алуу индекстөө синтаксисин колдонууга караганда алда канча тезирээк болгон.

Объект экенин эске алыңыз prev_snp_results ачкычты камтыйт snps_in_bin. Бул топтун бардык уникалдуу SNP массиви (бин), сизде мурунку сурамдагы маалыматтарыңыз бар-жогун тез текшерүүгө мүмкүндүк берет. Бул ошондой эле бул код менен бир топтун (бин) бардык SNPs аркылуу айланууну жеңилдетет:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

натыйжалары

Эми биз мурда бизге жеткиликсиз болгон моделдерди жана сценарийлерди иштете алабыз (жана олуттуу түрдө иштей баштадык). Эң жакшысы менин лабораториядагы кесиптештерим эч кандай татаалдыктар жөнүндө ойлонбосо керек. Алардын жөн гана иштеген функциясы бар.

Пакет аларга майда-чүйдөсүнө чейин сактаса да, мен эртең күтүлбөгөн жерден жоголуп кетсем, алар аны түшүнө тургандай, маалымат форматын жөнөкөй кылууга аракет кылдым...

Ылдамдыгы байкаларлык өстү. Биз, адатта, функционалдык жактан маанилүү геномдун фрагменттерин сканерлейбиз. Мурда биз муну кыла алмак эмеспиз (бул өтө кымбат болуп чыкты), бирок азыр, топтун (бин) түзүмү жана кэштин аркасында, бир SNPге суроо-талап орто эсеп менен 0,1 секунддан аз убакытты талап кылат жана маалыматтарды колдонуу ушунчалык көп. S3 үчүн чыгымдар жержаңгак болуп саналат.

жыйынтыктоо

Бул макала такыр эле колдонмо эмес. Чечим жеке болуп чыкты жана дээрлик оптималдуу эмес. Тескерисинче, бул саякат баяны. Башкалардын түшүнүшүн каалайм, мындай чечимдер башында толук калыптанбагандай көрүнбөйт, алар сыноо жана катанын натыйжасы. Ошондой эле, эгер сиз маалымат таануучу издеп жатсаңыз, бул куралдарды натыйжалуу колдонуу тажрыйбаны талап кылаарын жана тажрыйба акча талап кыларын унутпаңыз. Мен төлөөгө каражатым болгонуна кубанычтамын, бирок менден жакшыраак бир ишти жасай алган башка көптөгөн адамдар акчанын жетишсиздигинен улам эч качан аракет кыла алышпайт.

Чоң маалымат куралдары ар тараптуу. Эгерде сизде убакыт болсо, анда сиз акылдуу маалыматтарды тазалоо, сактоо жана алуу ыкмаларын колдонуу менен тезирээк чечим жаза аласыз. Акыр-аягы, бул чыгаша-пайда талдоосуна келет.

Мен эмнени үйрөндүм:

  • бир убакта 25 ТБ талдоо үчүн эч кандай арзан жолу жок;
  • Паркет файлдарыңыздын өлчөмүнө жана аларды уюштурууга этият болуңуз;
  • Spark ичиндеги бөлүктөр тең салмактуу болушу керек;
  • Жалпысынан алганда, эч качан 2,5 миллион бөлүктөрүн жасоого аракет кылбайт;
  • Спаркты орнотуу сыяктуу сорттоо дагы деле кыйын;
  • кээде атайын маалыматтар атайын чечимдерди талап кылат;
  • Учкунду бириктирүү тез, бирок бөлүү дагы деле кымбат;
  • Алар сизге негиздерин үйрөткөндө уктабаңыз, балким, кимдир бирөө сиздин көйгөйүңүздү 1980-жылдары чечкен;
  • gnu parallel - бул сыйкырдуу нерсе, аны баары колдонушу керек;
  • Spark кысылбаган маалыматтарды жактырат жана бөлүмдөрдү бириктирүүнү жактырбайт;
  • Жөнөкөй маселелерди чечүүдө Spark өтө көп чыгымга ээ;
  • AWKнын ассоциативдик массивдери абдан эффективдүү;
  • байланышсаңыз болот stdin и stdout R скриптинен, ошондуктан аны түтүктө колдонуңуз;
  • Акылдуу жолду ишке ашыруунун аркасында S3 көптөгөн файлдарды иштете алат;
  • Убакытты текке кетирүүнүн негизги себеби - сактоо ыкмасын мөөнөтүнөн мурда оптималдаштыруу;
  • тапшырмаларды кол менен оптималдаштырууга аракет кылбаңыз, аны компьютер аткарсын;
  • API колдонуунун оңойлугу жана ийкемдүүлүгү үчүн жөнөкөй болушу керек;
  • Эгер маалыматыңыз жакшы даярдалса, кэштөө оңой болот!

Source: www.habr.com

Комментарий кошуу