AWK və R-dən istifadə edərək 25TB təhlili

AWK və R-dən istifadə edərək 25TB təhlili
Bu məqaləni necə oxumaq olar: Mətnin bu qədər uzun və xaotik olmasına görə üzr istəyirəm. Vaxtınıza qənaət etmək üçün hər fəsli bir və ya iki cümlə ilə fəslin mahiyyətini ümumiləşdirən “Öyrəndiklərim” girişi ilə başlayıram.

"Mənə həll yolunu göstər!" Əgər sadəcə haradan gəldiyimi görmək istəyirsinizsə, o zaman “Daha ixtiraçı olmaq” fəslinə keçin, amma məncə, uğursuzluq haqqında oxumaq daha maraqlı və faydalıdır.

Bu yaxınlarda mənə böyük həcmdə xam DNT ardıcıllığının (texniki olaraq SNP çipi) işlənməsi prosesini qurmaq tapşırılıb. Ehtiyac, sonrakı modelləşdirmə və digər vəzifələr üçün verilmiş genetik yer (SNP adlanır) haqqında məlumatları tez əldə etmək idi. R və AWK istifadə edərək, sorğuların işlənməsini xeyli sürətləndirərək məlumatları təbii şəkildə təmizləyə və təşkil edə bildim. Bu, mənim üçün asan olmadı və çoxsaylı təkrarlamalar tələb etdi. Bu məqalə bəzi səhvlərimdən qaçmağınıza kömək edəcək və nə ilə nəticələndiyimi sizə göstərəcək.

Birincisi, bəzi giriş izahatları.

Məlumat

Universitetimizin genetik məlumat emal mərkəzi bizə 25 TB TSV şəklində məlumatlar təqdim etdi. Mən onları Gzip tərəfindən sıxılmış 5 paketə bölünmüş şəkildə aldım, hər birində təxminən 240 dörd gigabaytlıq fayl var. Hər bir cərgədə bir fərdin bir SNP məlumatı var idi. Ümumilikdə ~2,5 milyon SNP və ~60 min insan haqqında məlumatlar ötürülüb. SNP məlumatlarına əlavə olaraq, fayllarda oxu intensivliyi, müxtəlif allellərin tezliyi və s. kimi müxtəlif xüsusiyyətləri əks etdirən nömrələri olan çoxsaylı sütunlar var idi. Ümumilikdə unikal dəyərləri olan təxminən 30 sütun var idi.

Məqsəd

Hər hansı bir məlumat idarəetmə layihəsində olduğu kimi, ən vacib şey məlumatların necə istifadə ediləcəyini müəyyən etmək idi. Bu halda biz əsasən SNP əsasında SNP üçün modellər və iş axınlarını seçəcəyik. Yəni, bir anda yalnız bir SNP haqqında məlumat lazım olacaq. 2,5 milyon SNP-dən biri ilə əlaqəli bütün qeydləri mümkün qədər asanlıqla, tez və ucuz əldə etməyi öyrənməli idim.

Bunu necə etməmək olar

Uyğun bir klişe sitat gətirmək üçün:

Min dəfə uğursuzluğa düçar olmadım, sadəcə olaraq sorğuya uyğun formatda bir dəstə məlumatı təhlil etməməyin minlərlə yolunu kəşf etdim.

İlk cəhd

Nə öyrənmişəm: Bir anda 25 TB təhlil etmək üçün ucuz yol yoxdur.

Vanderbilt Universitetində "Böyük Məlumatların Emalı üçün Qabaqcıl Metodlar" kursunu keçərək, hiylənin çantada olduğuna əmin idim. Yəqin ki, bütün məlumatları nəzərdən keçirmək və nəticəni bildirmək üçün Hive serverini qurmaq bir və ya iki saat çəkəcək. Məlumatlarımız AWS S3-də saxlandığı üçün xidmətdən istifadə etdim Athena, S3 məlumatlarına Hive SQL sorğularını tətbiq etməyə imkan verir. Hive klasterini qurmağa/yükləməyə ehtiyac yoxdur və siz həmçinin yalnız axtardığınız məlumat üçün ödəniş edirsiniz.

Athenaya öz məlumatlarımı və onun formatını göstərdikdən sonra bu kimi sorğularla bəzi testlər keçirdim:

select * from intensityData limit 10;

Və tez yaxşı qurulmuş nəticələr aldı. Hazır.

İşimizdə məlumatlardan istifadə etməyə cəhd edənə qədər...

Məndən modeli sınamaq üçün bütün SNP məlumatlarını çıxarmaq istəndi. Sorğunu icra etdim:


select * from intensityData 
where snp = 'rs123456';

...və gözləməyə başladı. Səkkiz dəqiqədən və 4 TB-dən çox tələb olunan məlumatdan sonra nəticəni aldım. Athena tapılan məlumatların həcminə görə bir terabayt üçün 5 dollar alır. Beləliklə, bu tək sorğu 20 dollara və səkkiz dəqiqə gözləməyə başa gəldi. Modeli bütün məlumatlar üzərində işlətmək üçün 38 il gözləməli və 50 milyon dollar ödəməli olduq.Açığı, bu bizim üçün uyğun deyildi.

Parketdən istifadə etmək lazımdı...

Nə öyrənmişəm: Parket fayllarınızın ölçüsünə və onların təşkilinə diqqət yetirin.

Əvvəlcə bütün TSV-ləri çevirərək vəziyyəti düzəltməyə çalışdım Parket faylları. Onlar böyük verilənlər dəstləri ilə işləmək üçün əlverişlidirlər, çünki onlarda olan məlumatlar sütun şəklində saxlanılır: sətirlərdə hər sütunun elementləri olan mətn fayllarından fərqli olaraq, hər bir sütun öz yaddaş/disk seqmentində yerləşir. Və bir şey tapmaq lazımdırsa, sadəcə tələb olunan sütunu oxuyun. Bundan əlavə, hər bir fayl bir sıra dəyərləri sütunda saxlayır, ona görə də axtardığınız dəyər sütunun diapazonunda deyilsə, Spark bütün faylı skan etmək üçün vaxt itirməyəcək.

Sadə bir tapşırığı yerinə yetirdim AWS Yapışqan TSV-lərimizi Parketə çevirmək və yeni faylları Athenaya atmaq üçün. Təxminən 5 saat çəkdi. Amma sorğunu yerinə yetirəndə onu tamamlamaq üçün təxminən eyni vaxt və bir az az pul lazım oldu. Məsələ burasındadır ki, tapşırığı optimallaşdırmağa çalışan Spark, sadəcə bir TSV parçasını çıxarıb öz Parket parçasına qoydu. Və hər bir parça bir çox insanın bütün qeydlərini ehtiva edəcək qədər böyük olduğundan, hər bir fayl bütün SNP-ləri ehtiva edirdi, ona görə də Spark lazım olan məlumatları çıxarmaq üçün bütün faylları açmalı oldu.

Maraqlıdır ki, Parketin standart (və tövsiyə olunan) sıxılma növü, tez, bölünə bilməz. Buna görə də, hər bir icraçı tam 3,5 GB məlumat dəstini açmaq və yükləmək tapşırığına ilişib qalmışdı.

AWK və R-dən istifadə edərək 25TB təhlili

Problemi anlayaq

Nə öyrənmişəm: Çeşidləmə çətindir, xüsusən də məlumatlar paylanırsa.

Mənə elə gəldi ki, problemin mahiyyətini indi başa düşdüm. Mən yalnız məlumatları insanlara görə deyil, SNP sütununa görə çeşidləməli oldum. Sonra bir neçə SNP ayrı bir məlumat yığınında saxlanacaq və sonra Parketin "ağıllı" funksiyası "yalnız dəyər diapazonda olduqda açılır" bütün şöhrəti ilə özünü göstərəcəkdir. Təəssüf ki, klasterə səpələnmiş milyardlarla cərgə arasında çeşidləmək çətin bir iş oldu.

AWS "Mən diqqəti yayındıran tələbəyəm" səbəbi ilə mütləq pulu qaytarmaq istəmir. Mən Amazon Glue-da çeşidləmə apardıqdan sonra 2 gün işlədi və çökdü.

Bölmə haqqında nə demək olar?

Nə öyrənmişəm: Spark-da arakəsmələr balanslaşdırılmış olmalıdır.

Sonra məlumatları xromosomlarda bölmək ideyası ilə gəldim. Onlardan 23-ü var (və mitoxondrial DNT-ni və xəritələnməmiş bölgələri nəzərə alsanız, daha bir neçəsi).
Bu, məlumatları daha kiçik hissələrə bölməyə imkan verəcəkdir. Glue skriptində Spark ixrac funksiyasına yalnız bir sətir əlavə etsəniz partition_by = "chr", sonra məlumatlar vedrələrə bölünməlidir.

AWK və R-dən istifadə edərək 25TB təhlili
Genom xromosom adlanan çoxsaylı fraqmentlərdən ibarətdir.

Təəssüf ki, alınmadı. Xromosomlar müxtəlif ölçülərə malikdir, bu da müxtəlif miqdarda məlumat deməkdir. Bu o deməkdir ki, Spark-ın işçilərə göndərdiyi tapşırıqlar tarazlaşdırılmayıb və yavaş-yavaş tamamlanıb, çünki bəzi qovşaqlar erkən bitib və boş qalıb. Bununla belə, tapşırıqlar tamamlandı. Ancaq bir SNP tələb edərkən, balanssızlıq yenidən problemlərə səbəb oldu. Daha böyük xromosomlarda (yəni məlumat əldə etmək istədiyimiz yerdə) SNP-lərin işlənməsinin dəyəri təxminən 10 dəfə azalıb. Çox, amma kifayət deyil.

Onu daha kiçik hissələrə bölsək necə olar?

Nə öyrənmişəm: Heç vaxt 2,5 milyon arakəsmə etməyə çalışmayın.

Mən hər şeyi tərk etmək qərarına gəldim və hər bir SNP-ni böldüm. Bu, arakəsmələrin bərabər ölçüdə olmasını təmin etdi. PİS İDEYA OLDU. Mən Glue istifadə etdim və günahsız bir xətt əlavə etdim partition_by = 'snp'. Tapşırıq başladı və icra etməyə başladı. Bir gün sonra yoxladım və gördüm ki, S3-ə hələ heç nə yazılmayıb, ona görə də tapşırığı öldürdüm. Deyəsən, Glue aralıq faylları S3-də gizli yerə, çoxlu fayla, bəlkə də bir neçə milyona yazırdı. Nəticədə mənim səhvim min dollardan baha başa gəldi və mentorumu sevindirmədi.

Bölmə + çeşidləmə

Nə öyrənmişəm: Spark-ı tənzimləmək kimi çeşidləmə hələ də çətindir.

Bölməyə son cəhdim xromosomları bölməkdən və sonra hər bir bölməni çeşidləməkdən ibarət idi. Nəzəriyyə olaraq, bu, hər bir sorğunu sürətləndirəcək, çünki istənilən SNP məlumatı müəyyən bir diapazonda bir neçə Parket parçası içərisində olmalıdır. Təəssüf ki, hətta bölünmüş məlumatların çeşidlənməsi çətin bir iş oldu. Nəticədə, mən fərdi klaster üçün EMR-ə keçdim və daha çevik iş axını yaratmaq üçün səkkiz güclü instansiyadan (C5.4xl) və Sparklyr-dən istifadə etdim...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...lakin tapşırıq hələ də tamamlanmamışdı. Mən bunu müxtəlif yollarla konfiqurasiya etdim: hər bir sorğu icraçısı üçün yaddaşın ayrılmasını artırdım, böyük həcmdə yaddaşa malik qovşaqlardan istifadə etdim, yayım dəyişənlərindən istifadə etdim (yayım dəyişənləri), lakin hər dəfə bunlar yarım ölçülər oldu və tədricən icraçılar başladı. hər şey dayanana qədər uğursuz olmaq.

Mən daha yaradıcı oluram

Nə öyrənmişəm: Bəzən xüsusi məlumatlar xüsusi həllər tələb edir.

Hər bir SNP-nin mövqe dəyəri var. Bu, onun xromosomu boyunca əsasların sayına uyğun gələn rəqəmdir. Bu, məlumatlarımızı təşkil etmək üçün gözəl və təbii bir yoldur. Əvvəlcə hər bir xromosomun bölgələrinə bölmək istədim. Məsələn, mövqelər 1 - 2000, 2001 - 4000 və s. Ancaq problem ondadır ki, SNP-lər xromosomlar arasında bərabər paylanmır, buna görə də qrup ölçüləri çox fərqli olacaq.

AWK və R-dən istifadə edərək 25TB təhlili

Nəticədə mən vəzifələrin kateqoriyalara (rütbələrə) bölünməsinə gəldim. Artıq yüklənmiş məlumatlardan istifadə edərək, unikal SNP-lərin, onların mövqelərinin və xromosomlarının siyahısını əldə etmək üçün sorğu göndərdim. Sonra hər bir xromosom daxilində məlumatları çeşidlədim və SNP-ləri verilmiş ölçüdə qruplara (zibil qutusuna) topladım. Tutaq ki, hər biri 1000 SNP. Bu, mənə SNP-dən qrupa-xromosoma münasibətini verdi.

Sonda 75 SNP-dən ibarət qruplar (zibil) etdim, səbəbi aşağıda izah ediləcəkdir.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Əvvəlcə Spark ilə cəhd edin

Nə öyrənmişəm: Qığılcımların yığılması sürətlidir, lakin bölmə hələ də bahadır.

Mən bu kiçik (2,5 milyon sətir) məlumat çərçivəsini Spark-da oxumaq, onu xam məlumatlarla birləşdirmək və sonra onu yeni əlavə edilmiş sütunla bölmək istəyirdim. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Mən istifadə etdim sdf_broadcast(), buna görə də Spark məlumat çərçivəsini bütün qovşaqlara göndərməli olduğunu bilir. Məlumat kiçik ölçüdə olduqda və bütün tapşırıqlar üçün tələb olunarsa, bu faydalıdır. Əks halda, Spark ağıllı olmağa çalışır və lazım olduqda məlumatları paylayır, bu da yavaşlamalara səbəb ola bilər.

Yenə də fikrim baş tutmadı: tapşırıqlar bir müddət işlədi, ittifaqı başa vurdu, sonra isə bölgü ilə işə salınan icraçılar kimi uğursuzluğa düçar oldu.

AWK əlavə olunur

Nə öyrənmişəm: Sizə əsasları öyrədəndə yatmayın. Şübhəsiz ki, kimsə artıq 1980-ci illərdə probleminizi həll edib.

Bu nöqtəyə qədər Spark ilə bütün uğursuzluqlarımın səbəbi çoxluqdakı məlumatların qarışıqlığı idi. Ola bilsin ki, ilkin müalicə ilə vəziyyəti yaxşılaşdırmaq olar. Xam mətn məlumatlarını xromosomların sütunlarına bölməyə cəhd etmək qərarına gəldim, ona görə də Spark-ı “əvvəlcədən bölünmüş” məlumatlarla təmin etməyə ümid etdim.

StackOverflow-da sütun dəyərlərinə necə bölmək olar deyə axtarış etdim və tapdım belə gözəl cavab. AWK ilə mətn faylını nəticələri göndərməkdənsə, onu skriptdə yazmaqla sütun dəyərlərinə görə bölmək olar. stdout.

Sınamaq üçün bir Bash skripti yazdım. Paketlənmiş TSV-lərdən birini endirdi, sonra istifadə edərək onu çıxartdı gzip və göndərildi awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

İşlədi!

Nüvələrin doldurulması

Nə öyrənmişəm: gnu parallel - sehrli bir şeydir, hamı istifadə etməlidir.

Ayrılıq olduqca yavaş idi və mən başlayanda htopgüclü (və bahalı) EC2 instansiyasının istifadəsini yoxlamaq üçün yalnız bir nüvədən və təxminən 200 MB yaddaşdan istifadə etdiyim məlum oldu. Problemi həll etmək və çox pul itirməmək üçün işi necə paralel aparacağımızı anlamalı olduq. Xoşbəxtlikdən, tamamilə heyrətamiz bir kitabda Komanda Xəttində Məlumat Elmi Jeron Janssensin paralelləşdirmə ilə bağlı fəslini tapdım. Ondan öyrəndim gnu parallel, Unix-də multithreading həyata keçirmək üçün çox çevik üsul.

AWK və R-dən istifadə edərək 25TB təhlili
Yeni prosesdən istifadə edərək bölmələrə başladıqda, hər şey qaydasında idi, lakin hələ də bir darboğaz var idi - S3 obyektlərinin diskə yüklənməsi çox sürətli deyildi və tam paralelləşdirilmədi. Bunu düzəltmək üçün bunu etdim:

  1. Bildim ki, S3 yükləmə mərhələsini diskdə aralıq yaddaşı tamamilə aradan qaldıraraq birbaşa boru kəmərində həyata keçirmək mümkündür. Bu o deməkdir ki, mən diskə xam məlumat yazmaqdan qaça bilərəm və AWS-də daha kiçik və buna görə də daha ucuz yaddaşdan istifadə edə bilərəm.
  2. komanda aws configure set default.s3.max_concurrent_requests 50 AWS CLI-nin istifadə etdiyi mövzuların sayını xeyli artırdı (standart olaraq 10-dur).
  3. Mən adında n hərfi olan şəbəkə sürəti üçün optimallaşdırılmış EC2 nümunəsinə keçdim. Mən n-nümunələrdən istifadə edərkən emal gücünün itirilməsinin yükləmə sürətinin artması ilə kompensasiya olunduğunu aşkar etdim. Əksər tapşırıqlar üçün c5n.4xl istifadə etdim.
  4. Dəyişdi gzip haqqında pigz, bu, faylları açmaq kimi ilkin olaraq paralelləşdirilməmiş tapşırığı paralelləşdirmək üçün gözəl işlər görə bilən bir gzip alətidir (bu, ən az kömək etdi).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Hər şeyin çox tez işləməsi üçün bu addımlar bir-biri ilə birləşdirilir. Yükləmə sürətini artırmaq və disk yazmalarını aradan qaldırmaqla mən indi cəmi bir neçə saat ərzində 5 terabaytlıq paketi emal edə bildim.

Bu tvitdə 'TSV' qeyd edilməli idi. vay.

Yeni təhlil edilmiş məlumatlardan istifadə

Nə öyrənmişəm: Spark sıxılmamış məlumatları sevir və bölmələri birləşdirməyi sevmir.

İndi məlumat S3-də paketdən çıxarılmamış (oxu: paylaşılmış) və yarı sifarişli formatda idi və mən yenidən Spark-a qayıda bildim. Məni sürpriz gözləyirdi: yenə istədiyimə nail ola bilmədim! Spark-a verilənlərin necə bölündüyünü dəqiq söyləmək çox çətin idi. Mən bunu edəndə belə, arakəsmələrin çox olduğu (95 min) və mən istifadə etdiyim zaman ortaya çıxdı coalesce onların sayını ağlabatan həddə qədər azaltdı, bu mənim bölmələrimi məhv etdi. Əminəm ki, bu düzəldilə bilər, lakin bir neçə günlük axtarışdan sonra bir həll tapa bilmədim. Nəhayət, Spark-da bütün tapşırıqları tamamladım, baxmayaraq ki, bu bir az vaxt apardı və mənim bölünmüş Parket fayllarım çox kiçik deyildi (~200 KB). Bununla belə, məlumatlar lazım olan yerdə idi.

AWK və R-dən istifadə edərək 25TB təhlili
Çox kiçik və qeyri-bərabər, gözəl!

Yerli Spark sorğuları sınaqdan keçirilir

Nə öyrənmişəm: Sadə problemləri həll edərkən Spark həddindən artıq yükə malikdir.

Məlumatları ağıllı formatda yükləməklə sürəti yoxlaya bildim. Yerli Spark serverini işə salmaq üçün R skripti qurun və sonra müəyyən edilmiş Parket qrup yaddaşından (zibil qutusundan) Spark məlumat çərçivəsini yükləyin. Mən bütün məlumatları yükləməyə çalışdım, lakin Sparklyr bölməni tanıya bilmədim.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Edam 29,415 saniyə çəkdi. Daha yaxşı, lakin heç bir şeyin kütləvi sınağı üçün çox yaxşı deyil. Bundan əlavə, mən yaddaşda məlumat çərçivəsini keşləmək istəyərkən, çəkisi 50-dən az olan verilənlər bazasına 15 GB-dan çox yaddaş ayırdığımda belə, Spark həmişə qəzaya uğradığı üçün keşləmə ilə işləri sürətləndirə bilmədim.

AWK səhifəsinə qayıt

Nə öyrənmişəm: AWK-da assosiativ massivlər çox səmərəlidir.

Daha yüksək sürətlərə nail ola biləcəyimi başa düşdüm. Mən bunu gözəl bir şəkildə xatırladım Bruce Barnett tərəfindən AWK dərsliyi " adlı gözəl xüsusiyyət haqqında oxudumassosiativ massivlər" Əslində, bunlar AWK-da nədənsə fərqli adlandırılan açar-dəyər cütləridir və buna görə də onlar haqqında çox düşünmədim. Roman Çeplyaka xatırladıb ki, “assosiativ massivlər” termini “açar-dəyər cütü” terminindən xeyli qədimdir. Hətta əgər sən Google Ngram-da açar-dəyəri axtarın, bu termini orada görməyəcəksiniz, lakin assosiativ massivlər tapacaqsınız! Bundan əlavə, "açar-dəyər cütü" ən çox verilənlər bazası ilə əlaqələndirilir, buna görə də onu hasshmap ilə müqayisə etmək daha məntiqlidir. Spark-dan istifadə etmədən SNP-ləri zibil masası və xam məlumatlarla əlaqələndirmək üçün bu assosiativ massivlərdən istifadə edə biləcəyimi başa düşdüm.

Bunu etmək üçün AWK skriptində blokdan istifadə etdim BEGIN. Bu, verilənlərin birinci sətri skriptin əsas gövdəsinə ötürülməzdən əvvəl yerinə yetirilən kod parçasıdır.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Komanda while(getline...) CSV qrupundan (zibil qutusundan) bütün sətirləri yüklədi, birinci sütunu (SNP adı) assosiativ massiv üçün açar kimi təyin edin bin dəyər kimi ikinci dəyər (qrup). Sonra blokda { }, əsas faylın bütün sətirlərində yerinə yetirilir, hər bir sətir öz qrupundan (bin) asılı olaraq unikal ad alan çıxış faylına göndərilir: ..._bin_"bin[$1]"_....

Dəyişənlər batch_num и chunk_id yarış vəziyyətindən qaçaraq boru kəməri tərəfindən verilən məlumatlara uyğun gəldi və hər bir icra ipi qaçdı parallel, öz unikal faylına yazdı.

Bütün xam məlumatları AWK ilə əvvəlki təcrübəmdən qalan xromosomlardakı qovluqlara səpələdiyim üçün indi bir dəfə bir xromosomu emal etmək və S3-ə daha dərin bölünmüş məlumatları göndərmək üçün başqa bir Bash skripti yaza bilərdim.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Ssenari iki hissədən ibarətdir parallel.

Birinci hissədə məlumat istənilən xromosom haqqında məlumatı ehtiva edən bütün fayllardan oxunur, sonra bu məlumatlar faylları müvafiq qruplara (zibil qutusu) paylayan mövzular arasında paylanır. Birdən çox mövzu eyni fayla yazdıqda yarış şərtlərinin qarşısını almaq üçün AWK müxtəlif yerlərə məlumat yazmaq üçün fayl adlarını ötürür, məsələn. chr_10_bin_52_batch_2_aa.csv. Nəticədə diskdə çoxlu kiçik fayllar yaradılır (bunun üçün mən terabayt EBS həcmlərindən istifadə etdim).

İkinci hissədən konveyer parallel qruplardan (zibil) keçir və onların fərdi fayllarını ümumi CSV-də birləşdirir c catvə sonra ixrac üçün göndərir.

R-də yayım?

Nə öyrənmişəm: Əlaqə saxlaya bilərsiniz stdin и stdout R skriptindən və buna görə də onu boru kəmərində istifadə edin.

Bash skriptinizdə bu xətti görmüsünüz: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Bütün birləşdirilmiş qrup fayllarını (zibil) aşağıdakı R skriptinə çevirir. {} xüsusi texnikadır parallel, göstərilən axına göndərdiyi hər hansı məlumatı birbaşa əmrin özünə daxil edir. Seçim {#} unikal mövzu ID təmin edir, və {%} iş yerinin nömrəsini təmsil edir (təkrarlanır, lakin heç vaxt eyni vaxtda deyil). Bütün variantların siyahısını burada tapa bilərsiniz sənədlər.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Dəyişən zaman file("stdin") -ə ötürülür readr::read_csv, R skriptinə tərcümə edilmiş məlumatlar çərçivəyə yüklənir, o, sonra formada olur .rds-fayldan istifadə etməklə aws.s3 birbaşa S3-ə yazılır.

RDS, dinamik saxlama imkanları olmayan Parketin kiçik versiyası kimi bir şeydir.

Bash skriptini bitirdikdən sonra bir paket aldım .rds-S3-də yerləşən fayllar, bu mənə səmərəli sıxılma və daxili növlərdən istifadə etməyə imkan verdi.

Əyləc R-nin istifadəsinə baxmayaraq, hər şey çox tez işlədi. Təəccüblü deyil ki, R-nin məlumatları oxuyan və yazan hissələri yüksək dərəcədə optimallaşdırılıb. Orta ölçülü bir xromosom üzərində sınaqdan sonra iş təxminən iki saat ərzində C5n.4xl nümunəsində tamamlandı.

S3 Məhdudiyyətləri

Nə öyrənmişəm: Ağıllı yol tətbiqi sayəsində S3 bir çox faylları idarə edə bilər.

S3-ün ona ötürülən bir çox faylı idarə edə biləcəyindən narahat idim. Fayl adlarını məntiqli edə bilərdim, lakin S3 onları necə axtaracaq?

AWK və R-dən istifadə edərək 25TB təhlili
S3-dəki qovluqlar sadəcə nümayiş üçündür, əslində sistem simvolla maraqlanmır /. S3 FAQ səhifəsindən.

Görünür ki, S3 müəyyən bir fayla gedən yolu bir növ hash cədvəlində və ya sənəd əsaslı verilənlər bazasında sadə açar kimi təmsil edir. Bir vedrə cədvəl kimi düşünülə bilər və fayllar bu cədvəldəki qeydlər hesab edilə bilər.

Sürət və səmərəlilik Amazon-da qazanc əldə etmək üçün vacib olduğundan, bu fayl yolu kimi açar sisteminin inanılmaz dərəcədə optimallaşdırılması təəccüblü deyil. Mən balans tapmağa çalışdım: çoxlu sorğular etməli deyildim, lakin sorğular tez yerinə yetirildi. Məlum oldu ki, ən yaxşısı təxminən 20 min bin faylı etməkdir. Düşünürəm ki, optimallaşdırmağa davam etsək, sürətin artmasına nail ola bilərik (məsələn, yalnız məlumat üçün xüsusi bir kova hazırlamaq, beləliklə axtarış cədvəlinin ölçüsünü azaltmaq). Lakin sonrakı təcrübələr üçün nə vaxt, nə də pul var idi.

Bəs çarpaz uyğunluq?

Öyrəndiklərim: Vaxt itkisinin bir nömrəli səbəbi saxlama metodunuzu vaxtından əvvəl optimallaşdırmaqdır.

Bu nöqtədə özünüzdən soruşmaq çox vacibdir: "Niyə xüsusi fayl formatından istifadə edin?" Səbəb yükləmə sürətində (gziplənmiş CSV fayllarının yüklənməsi 7 dəfə uzun çəkdi) və iş axınlarımızla uyğunluqdadır. R-nin Spark yükü olmadan Parket (və ya Arrow) fayllarını asanlıqla yükləyə biləcəyini yenidən nəzərdən keçirə bilərəm. Laboratoriyamızdakı hər kəs R-dən istifadə edir və əgər mən məlumatları başqa formata çevirməli olsam, məndə hələ də orijinal mətn məlumatı var, ona görə də boru xəttini yenidən işə sala bilərəm.

İş bölgüsü

Nə öyrənmişəm: İşləri əl ilə optimallaşdırmağa çalışmayın, kompüter bunu etsin.

Bir xromosomda iş prosesini düzəltdim, indi bütün digər məlumatları emal etməliyəm.
Dönüşüm üçün bir neçə EC2 nümunəsini qaldırmaq istədim, lakin eyni zamanda müxtəlif emal işlərində çox balanssız bir yük almaqdan qorxdum (eynilə Spark balanssız arakəsmələrdən əziyyət çəkdiyi kimi). Bundan əlavə, hər bir xromosom üçün bir nümunənin artırılmasında maraqlı deyildim, çünki AWS hesabları üçün standart olaraq 10 instansiya limiti var.

Sonra emal işlərini optimallaşdırmaq üçün R-də skript yazmaq qərarına gəldim.

Əvvəlcə S3-dən hər bir xromosomun nə qədər saxlama yeri tutduğunu hesablamağı xahiş etdim.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Sonra ümumi ölçüsü alan, xromosomların sırasını qarışdıran, onları qruplara bölən bir funksiya yazdım. num_jobs və bütün emal işlərinin ölçülərinin nə qədər fərqli olduğunu sizə xəbər verir.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Sonra purrr-dan istifadə edərək min qarışıqdan keçdim və ən yaxşısını seçdim.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Beləliklə, mən ölçü baxımından çox oxşar olan bir sıra tapşırıqlarla başa çatdım. Sonra qalan yalnız mənim əvvəlki Bash skriptimi böyük bir döngəyə bükmək idi for. Bu optimallaşdırmanın yazılması təxminən 10 dəqiqə çəkdi. Və bu, balanssız olsaydı, əl ilə tapşırıqlar yaratmağa xərcləyəcəyimdən qat-qat azdır. Ona görə də hesab edirəm ki, bu ilkin optimallaşdırma ilə haqlı idim.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Sonda bağlama əmrini əlavə edirəm:

sudo shutdown -h now

... və hər şey alındı! AWS CLI istifadə edərək, seçimdən istifadə edərək nümunələri qaldırdım user_data onlara emal üçün tapşırıqlarının Bash skriptlərini verdi. Onlar qaçdı və avtomatik olaraq bağlandı, ona görə də əlavə emal gücü üçün pul ödəmirdim.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Gəlin qablaşdıraq!

Nə öyrənmişəm: İstifadə rahatlığı və rahatlığı üçün API sadə olmalıdır.

Nəhayət, məlumatları lazımi yerdə və formada əldə etdim. Həmkarlarım üçün asanlaşdırmaq üçün məlumatlardan istifadə prosesini mümkün qədər sadələşdirmək qaldı. Sorğu yaratmaq üçün sadə API yaratmaq istədim. Gələcəkdə mən keçid etmək qərarına gəlsəm .rds Parket fayllarına, onda bu, həmkarlarım üçün deyil, mənim üçün problem olmalıdır. Bunun üçün daxili R paketi hazırlamağa qərar verdim.

Funksiya ətrafında təşkil edilmiş bir neçə məlumat əldə etmək funksiyasından ibarət çox sadə paket qurun və sənədləşdirin get_snp. Həmkarlarım üçün də sayt hazırladım pkgdown, beləliklə onlar nümunələri və sənədləri asanlıqla görə bilərlər.

AWK və R-dən istifadə edərək 25TB təhlili

Ağıllı keşləmə

Nə öyrənmişəm: Məlumatlarınız yaxşı hazırlanmışdırsa, keşləmə asan olacaq!

Əsas iş axınlarından biri eyni analiz modelini SNP paketinə tətbiq etdiyinə görə, öz üstünlüyüm üçün binning istifadə etmək qərarına gəldim. Məlumatları SNP vasitəsilə ötürərkən qrupdan (zibildən) bütün məlumatlar qaytarılan obyektə əlavə olunur. Yəni köhnə sorğular (nəzəri olaraq) yeni sorğuların işlənməsini sürətləndirə bilər.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Paketi qurarkən, müxtəlif üsullardan istifadə edərkən sürəti müqayisə etmək üçün bir çox meyarları yerinə yetirdim. Bunu laqeyd etməməyi tövsiyə edirəm, çünki bəzən nəticələr gözlənilməz olur. Misal üçün, dplyr::filter indeksləşdirməyə əsaslanan süzgəcdən istifadə edərək sətirləri tutmaqdan daha sürətli idi və süzülmüş məlumat çərçivəsindən tək sütunu əldə etmək indeksləşdirmə sintaksisini istifadə etməkdən daha sürətli idi.

Qeyd edək ki, obyekt prev_snp_results açarı ehtiva edir snps_in_bin. Bu, bir qrupdakı (zibil qutusunda) bütün unikal SNP-lərin massividir və əvvəlki sorğudan artıq məlumatınız olub-olmadığını tez yoxlamağa imkan verir. O, həmçinin bu kodla qrupdakı (zibil) bütün SNP-lər arasında dolaşmağı asanlaşdırır:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Tapıntılar

İndi biz əvvəllər bizim üçün əlçatmaz olan modelləri və ssenariləri işlədə bilərik (və ciddi şəkildə etməyə başlamışıq). Ən yaxşısı odur ki, laboratoriya həmkarlarım hər hansı bir fəsad barədə düşünmək məcburiyyətində deyillər. Onların sadəcə işləyən funksiyası var.

Paket onlara təfərrüatları əsirgəməsə də, məlumat formatını o qədər sadə etməyə çalışdım ki, sabah birdən yoxa çıxsam, bunu başa düşə bilsinlər...

Sürət nəzərəçarpacaq dərəcədə artıb. Biz adətən funksional əhəmiyyətli genom fraqmentlərini skan edirik. Əvvəllər biz bunu edə bilmirdik (çox baha olduğu ortaya çıxdı), lakin indi qrup (zibil) strukturu və keşləmə sayəsində bir SNP üçün sorğu orta hesabla 0,1 saniyədən az vaxt aparır və məlumat istifadəsi çox aşağıdır. S3-ün qiyməti fıstıqdır.

Nəticə

Bu məqalə ümumiyyətlə bələdçi deyil. Həll fərdi oldu və demək olar ki, optimal deyil. Daha doğrusu, bu, səyahətnamədir. İstəyirəm ki, başqaları başa düşsünlər ki, belə qərarlar beyində tam formalaşmış görünmür, sınaq və səhvin nəticəsidir. Həmçinin, məlumat alimi axtarırsınızsa, unutmayın ki, bu vasitələrdən səmərəli istifadə təcrübə tələb edir və təcrübə pula başa gəlir. Ödəmək imkanım olduğu üçün xoşbəxtəm, lakin eyni işi məndən daha yaxşı bacaran bir çox başqalarının pul çatışmazlığı səbəbindən heç vaxt cəhd etmək imkanı olmayacaq.

Böyük məlumat vasitələri çox yönlüdür. Vaxtınız varsa, ağıllı məlumatların təmizlənməsi, saxlanması və çıxarılması üsullarından istifadə edərək, demək olar ki, daha sürətli bir həll yaza bilərsiniz. Nəhayət, bu, xərc-fayda təhlilinə gəlir.

Öyrəndiklərim:

  • bir anda 25 TB təhlil etmək üçün ucuz bir yol yoxdur;
  • Parket fayllarınızın ölçüsünə və onların təşkilinə diqqət yetirin;
  • Spark-da arakəsmələr balanslaşdırılmış olmalıdır;
  • Ümumiyyətlə, heç vaxt 2,5 milyon arakəsmə yaratmağa çalışmayın;
  • Spark-ın qurulması kimi çeşidləmə hələ də çətindir;
  • bəzən xüsusi məlumatlar xüsusi həllər tələb edir;
  • Qığılcımların yığılması sürətlidir, lakin bölmələr hələ də bahadır;
  • sizə əsasları öyrədəndə yatmayın, yəqin ki, kimsə probleminizi hələ 1980-ci illərdə həll edib;
  • gnu parallel - bu sehrli bir şeydir, hamı bundan istifadə etməlidir;
  • Spark sıxılmamış məlumatları bəyənir və bölmələri birləşdirməyi sevmir;
  • Sadə problemləri həll edərkən Spark həddindən artıq yükə malikdir;
  • AWK-nin assosiativ massivləri çox səmərəlidir;
  • əlaqə saxlaya bilərsiniz stdin и stdout bir R skriptindən və buna görə də onu boru kəmərində istifadə edin;
  • Ağıllı yol tətbiqi sayəsində S3 bir çox faylları emal edə bilər;
  • Vaxt itkisinin əsas səbəbi saxlama üsulunuzu vaxtından əvvəl optimallaşdırmaqdır;
  • tapşırıqları əl ilə optimallaşdırmağa çalışmayın, kompüterə icazə verin;
  • API istifadənin asanlığı və çevikliyi naminə sadə olmalıdır;
  • Məlumatlarınız yaxşı hazırlanmışdırsa, keşləmə asan olacaq!

Mənbə: www.habr.com

Добавить комментарий