25 TB parsÄ“Å”ana, izmantojot AWK un R

25 TB parsÄ“Å”ana, izmantojot AWK un R
Kā lasÄ«t Å”o rakstu: Es atvainojos, ka teksts ir tik garÅ” un haotisks. Lai ietaupÄ«tu jÅ«su laiku, es katru nodaļu sāku ar ievadu ā€œKo es uzzinājuā€, kas vienā vai divos teikumos apkopo nodaļas bÅ«tÄ«bu.

"VienkārÅ”i parādiet man risinājumu!" Ja vēlaties tikai redzēt, no kurienes esmu nācis, pārejiet uz nodaļu ā€œKļūstot izgudrojoŔākamā€, taču, manuprāt, ir interesantāk un noderÄ«gāk lasÄ«t par neveiksmēm.

Man nesen tika uzdots izveidot procesu liela apjoma neapstrādātu DNS sekvenču (tehniski SNP mikroshēmas) apstrādei. NepiecieÅ”amÄ«ba bija ātri iegÅ«t datus par noteiktu Ä£enētisko atraÅ”anās vietu (sauktu par SNP) turpmākai modelÄ“Å”anai un citiem uzdevumiem. Izmantojot R un AWK, es varēju tÄ«rÄ«t un sakārtot datus dabiskā veidā, ievērojami paātrinot vaicājumu apstrādi. Tas man nebija viegli un prasÄ«ja daudzus atkārtojumus. Å is raksts palÄ«dzēs jums izvairÄ«ties no dažām manām kļūdām un parādÄ«s, ar ko es beidzu.

Pirmkārt, daži ievada skaidrojumi.

Dati

MÅ«su universitātes Ä£enētiskās informācijas apstrādes centrs mums sniedza datus 25 TB TSV formātā. Es saņēmu tos sadalÄ«tus 5 pakotnēs, saspiestas ar Gzip, katrā no kurām bija aptuveni 240 četru gigabaitu faili. Katrā rindā bija dati par vienu SNP no vienas personas. Kopumā tika pārsÅ«tÄ«ti dati par ~2,5 miljoniem SNP un ~60 tÅ«kstoÅ”iem cilvēku. Papildus SNP informācijai failos bija daudzas kolonnas ar cipariem, kas atspoguļo dažādas Ä«paŔības, piemēram, lasÄ«Å”anas intensitāti, dažādu alēļu biežumu utt. Kopumā bija aptuveni 30 kolonnas ar unikālām vērtÄ«bām.

mērķis

Tāpat kā jebkurā datu pārvaldÄ«bas projektā, vissvarÄ«gākais bija noteikt, kā dati tiks izmantoti. Å ajā gadÄ«jumā mēs galvenokārt atlasÄ«sim modeļus un darbplÅ«smas SNP, pamatojoties uz SNP. Tas nozÄ«mē, ka mums vienlaikus bÅ«s nepiecieÅ”ami dati tikai par vienu SNP. Man bija jāiemācās pēc iespējas vienkārŔāk, ātrāk un lētāk izgÅ«t visus ierakstus, kas saistÄ«ti ar kādu no 2,5 miljoniem SNP.

Kā to nedarīt

Citējot piemērotu kliÅ”eju:

Es nekļūdÄ«jos tÅ«kstoÅ” reižu, es vienkārÅ”i atklāju tÅ«kstoÅ” veidu, kā izvairÄ«ties no datu kopas parsÄ“Å”anas vaicājumam draudzÄ«gā formātā.

Pirmais mēģinājums

Ko esmu iemācÄ«jies: Nav lēta veida, kā vienā reizē parsēt 25 TB.

ApgÅ«stot kursu ā€œLielo datu apstrādes uzlabotas metodesā€ Vanderbiltas Universitātē, es biju pārliecināts, ka triks ir somā. Visticamāk, bÅ«s nepiecieÅ”ama stunda vai divas, lai iestatÄ«tu Hive serveri, lai tas palaistu visus datus un ziņotu par rezultātu. Tā kā mÅ«su dati tiek glabāti AWS S3, es izmantoju pakalpojumu Athena, kas ļauj lietot Hive SQL vaicājumus S3 datiem. Jums nav jāiestata/jāizveido Hive klasteris, un jÅ«s arÄ« maksājat tikai par meklētajiem datiem.

Kad es parādīju Athena savus datus un to formātu, es veicu dažus testus ar Ŕādiem vaicājumiem:

select * from intensityData limit 10;

Un ātri saņēma labi strukturētus rezultātus. Gatavs.

Līdz brīdim, kad mēģinājām izmantot datus savā darbā...

Man tika lūgts izņemt visu SNP informāciju, lai pārbaudītu modeli. Es izpildīju vaicājumu:


select * from intensityData 
where snp = 'rs123456';

...un sāka gaidÄ«t. Pēc astoņām minÅ«tēm un vairāk nekā 4 TB pieprasÄ«to datu es saņēmu rezultātu. Athena iekasē maksu pēc atrasto datu apjoma, 5 USD par terabaitu. Tātad Å”is vienÄ«gais pieprasÄ«jums maksāja 20 USD un astoņas minÅ«tes gaidÄ«Å”anas. Lai modeli darbinātu ar visiem datiem, mums bija jāgaida 38 gadi un jāmaksā 50 miljoni ASV dolāru.AcÄ«mredzot, tas mums nebija piemērots.

Vajadzēja izmantot Parketu...

Ko esmu iemācījies: Esiet piesardzīgs attiecībā uz savu parketa failu izmēru un to organizāciju.

Vispirms mēģināju labot situāciju, pārvērÅ”ot visus TSV uz Parketa vÄ«les. Tie ir ērti darbam ar lielām datu kopām, jo ā€‹ā€‹tajos esoŔā informācija tiek glabāta kolonnu formā: katra kolonna atrodas savā atmiņas/diska segmentā, atŔķirÄ«bā no teksta failiem, kuros rindās ir katras kolonnas elementi. Un, ja jums ir kaut kas jāatrod, tad vienkārÅ”i izlasiet nepiecieÅ”amo kolonnu. Turklāt katrs fails kolonnā saglabā vērtÄ«bu diapazonu, tādēļ, ja meklētā vērtÄ«ba nav kolonnas diapazonā, Spark netērēs laiku visa faila skenÄ“Å”anai.

Es izpildÄ«ju vienkārÅ”u uzdevumu AWS lÄ«me lai pārvērstu mÅ«su TSV parketa formātā, un jaunos failus ievietoja programmā Athena. Pagāja apmēram 5 stundas. Bet, izpildot pieprasÄ«jumu, tā aizpildÄ«Å”ana prasÄ«ja aptuveni tikpat daudz laika un nedaudz mazāk naudas. Fakts ir tāds, ka Spark, mēģinot optimizēt uzdevumu, vienkārÅ”i izsaiņoja vienu TSV gabalu un ievietoja to savā parketa daļā. Un tā kā katrs fragments bija pietiekami liels, lai saturētu visus daudzu cilvēku ierakstus, katrs fails saturēja visus SNP, tāpēc Spark bija jāatver visi faili, lai iegÅ«tu nepiecieÅ”amo informāciju.

Interesanti, ka Parketa noklusējuma (un ieteicamais) kompresijas veids, snappy, nav sadalāms. Tāpēc katrs izpildītājs bija iestrēdzis pie uzdevuma izpakot un lejupielādēt visu 3,5 GB datu kopu.

25 TB parsÄ“Å”ana, izmantojot AWK un R

Sapratīsim problēmu

Ko esmu iemācījies: kārtoŔana ir sarežģīta, it īpaŔi, ja dati tiek izplatīti.

Man Ŕķita, ka tagad es sapratu problēmas bÅ«tÄ«bu. Man vajadzēja kārtot datus tikai pēc SNP kolonnas, nevis pēc cilvēkiem. Pēc tam vairāki SNP tiks saglabāti atseviŔķā datu gabalā, un pēc tam Parketa funkcija "viedā" "atvērta tikai tad, ja vērtÄ«ba ir diapazonā" parādÄ«sies visā savā krāŔņumā. Diemžēl kārtoÅ”ana starp miljardiem rindu, kas izkaisÄ«tas klasterÄ«, izrādÄ«jās grÅ«ts uzdevums.

AWS noteikti nevēlas izsniegt atmaksu iemesla dēļ ā€œEs esmu izklaidÄ«gs studentsā€. Pēc tam, kad es kārtoju Amazon Glue, tas darbojās 2 dienas un avarēja.

Kā ar sadalīŔanu?

Ko esmu iemācījies: Spark starpsienām jābūt līdzsvarotām.

Tad man radās ideja sadalīt datus hromosomās. Ir 23 no tiem (un vēl vairāki, ja ņem vērā mitohondriju DNS un nekartētos reģionus).
Tas ļaus jums sadalÄ«t datus mazākos gabalos. Ja skriptā Glue Spark eksportÄ“Å”anas funkcijai pievienojat tikai vienu rindiņu partition_by = "chr", tad dati jāsadala segmentos.

25 TB parsÄ“Å”ana, izmantojot AWK un R
Genoms sastāv no daudziem fragmentiem, ko sauc par hromosomām.

Diemžēl tas neizdevās. Hromosomām ir dažādi izmēri, kas nozÄ«mē dažādu informācijas apjomu. Tas nozÄ«mē, ka uzdevumi, ko Spark nosÅ«tÄ«ja darbiniekiem, nebija lÄ«dzsvaroti un tika pabeigti lēni, jo daži mezgli pabeidza agri un bija dÄ«kstāvē. Tomēr uzdevumi tika izpildÄ«ti. Bet, prasot vienu SNP, nelÄ«dzsvarotÄ«ba atkal radÄ«ja problēmas. SNP apstrādes izmaksas lielākās hromosomās (tas ir, kur mēs vēlamies iegÅ«t datus) ir samazinājuŔās tikai aptuveni 10 reizes. Daudz, bet nepietiekami.

Ko darīt, ja mēs to sadalām vēl mazākās daļās?

Ko esmu iemācījies: Nekad nemēģiniet izveidot 2,5 miljonus nodalījumu.

Es nolēmu pilnÄ«bā rÄ«koties un sadalÄ«ju katru SNP. Tas nodroÅ”ināja, ka starpsienas bija vienāda izmēra. TĀ BIJA SLIKTA IDEJA. Es izmantoju lÄ«mi un pievienoju nevainÄ«gu lÄ«niju partition_by = 'snp'. Uzdevums sākās un sāka izpildÄ«t. Dienu vēlāk es pārbaudÄ«ju un redzēju, ka S3 joprojām nekas nav rakstÄ«ts, tāpēc es nogalināju uzdevumu. Å Ä·iet, ka Glue ierakstÄ«ja starpposma failus slēptā vietā S3, daudz failu, varbÅ«t pāris miljonus. Rezultātā mana kļūda maksāja vairāk nekā tÅ«kstoti dolāru un neiepriecināja manu mentoru.

SadalīŔana + ŔķiroŔana

Ko esmu iemācÄ«jies: Å Ä·iroÅ”ana joprojām ir sarežģīta, tāpat kā Spark noregulÄ“Å”ana.

Mans pēdējais sadalÄ«Å”anas mēģinājums bija saistÄ«ts ar hromosomu sadalÄ«Å”anu un pēc tam katra nodalÄ«juma ŔķiroÅ”anu. Teorētiski tas paātrinātu katru vaicājumu, jo vēlamajiem SNP datiem bija jāatrodas dažos parketa gabalos noteiktā diapazonā. Diemžēl pat sadalÄ«to datu ŔķiroÅ”ana izrādÄ«jās grÅ«ts uzdevums. Rezultātā es pārgāju uz EMR pielāgotam klasterim un izmantoju astoņus jaudÄ«gus gadÄ«jumus (C5.4xl) un Sparklyr, lai izveidotu elastÄ«gāku darbplÅ«smu...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tomēr uzdevums joprojām nebija izpildīts. Es to konfigurēju dažādos veidos: palielināju atmiņas sadalījumu katram vaicājuma izpildītājam, izmantoju mezglus ar lielu atmiņas apjomu, izmantoju apraides mainīgos (apraides mainīgos), bet katru reizi tie izrādījās puspasākumi, un pamazām sāka izpildītāji. izgāzties, līdz viss apstāsies.

Es kļūstu radoŔāka

Ko esmu iemācījies: Dažreiz īpaŔiem datiem ir nepiecieŔami īpaŔi risinājumi.

Katram SNP ir pozÄ«cijas vērtÄ«ba. Tas ir skaitlis, kas atbilst bāzu skaitam gar tā hromosomu. Tas ir jauks un dabisks veids, kā sakārtot mÅ«su datus. Sākumā es gribēju sadalÄ«t pa katras hromosomas reÄ£ioniem. Piemēram, pozÄ«cijas 1 - 2000, 2001 - 4000 utt. Bet problēma ir tā, ka SNP nav vienmērÄ«gi sadalÄ«ti pa hromosomām, tāpēc grupu lielumi bÅ«s ļoti atŔķirÄ«gi.

25 TB parsÄ“Å”ana, izmantojot AWK un R

Rezultātā es nonācu pie pozÄ«ciju sadalÄ«juma kategorijās (rangs). Izmantojot jau lejupielādētos datus, es izpildÄ«ju pieprasÄ«jumu iegÅ«t unikālo SNP, to pozÄ«ciju un hromosomu sarakstu. Pēc tam es sakārtoju datus katrā hromosomā un savācu SNP noteiktā lieluma grupās (tvertnē). Pieņemsim, ka katrs 1000 SNP. Tas man radÄ«ja SNP attiecÄ«bas starp grupām uz hromosomu.

Galu galā es izveidoju grupas (bin) no 75 SNP, iemesls tiks paskaidrots tālāk.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Vispirms mēģiniet ar Spark

Ko esmu iemācījies: Dzirksteļu apkopoŔana ir ātra, taču sadalīŔana joprojām ir dārga.

Es gribēju nolasÄ«t Å”o mazo (2,5 miljoni rindu) datu rāmi pakalpojumā Spark, apvienot to ar neapstrādātajiem datiem un pēc tam sadalÄ«t to ar tikko pievienoto kolonnu. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

ES izmantoju sdf_broadcast(), tāpēc Spark zina, ka tai jānosÅ«ta datu rāmis uz visiem mezgliem. Tas ir noderÄ«gi, ja dati ir mazi un nepiecieÅ”ami visiem uzdevumiem. Pretējā gadÄ«jumā Spark cenÅ”as bÅ«t gudrs un pēc vajadzÄ«bas izplata datus, kas var izraisÄ«t palēninājumus.

Un atkal mana ideja neizdevās: kādu laiku uzdevumi strādāja, pabeidza savienību, un tad, tāpat kā izpildītāji, kas tika uzsākti sadalot, tie sāka neizdoties.

AWK pievienoŔana

Ko esmu iemācījies: Neguli, kad tev māca pamatus. Noteikti kāds jau ir atrisinājis jūsu problēmu astoņdesmitajos gados.

LÄ«dz Å”im visu manu neveiksmju ar Spark iemesls bija datu juceklis klasterÄ«. Iespējams, situāciju var uzlabot ar iepriekŔēju apstrādi. Es nolēmu mēģināt sadalÄ«t neapstrādātos teksta datus hromosomu kolonnās, tāpēc es cerēju nodroÅ”ināt Spark ar ā€œiepriekÅ” sadalÄ«tiemā€ datiem.

Es meklēju vietnē StackOverflow, kā sadalīt pēc kolonnu vērtībām, un atradu tik lieliska atbilde. Izmantojot AWK, varat sadalīt teksta failu pēc kolonnu vērtībām, ierakstot to skriptā, nevis nosūtot rezultātus uz stdout.

Es uzrakstÄ«ju BaÅ”a skriptu, lai to izmēģinātu. Lejupielādēja vienu no iepakotajiem TSV un pēc tam izsaiņoja to, izmantojot gzip un nosÅ«tÄ«ts uz awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Tas izdevās!

Serdeņu aizpildÄ«Å”ana

Ko esmu iemācījies: gnu parallel - tā ir maģiska lieta, ikvienam vajadzētu to izmantot.

AtdalÄ«Å”anās bija diezgan lēna un kad es sāku htoplai pārbaudÄ«tu jaudÄ«gas (un dārgas) EC2 instances izmantoÅ”anu, izrādÄ«jās, ka izmantoju tikai vienu kodolu un apmēram 200 MB atmiņas. Lai atrisinātu problēmu un nezaudētu daudz naudas, mums bija jāizdomā, kā paralēli veikt darbu. Par laimi, absolÅ«ti pārsteidzoŔā grāmatā Datu zinātne komandrindā Es atradu Džerona Jansensa nodaļu par paralēlizāciju. No tā es uzzināju par gnu parallel, ļoti elastÄ«ga metode daudzpavedienu ievieÅ”anai Unix sistēmā.

25 TB parsÄ“Å”ana, izmantojot AWK un R
Kad es sāku sadalÄ«Å”anu, izmantojot jauno procesu, viss bija kārtÄ«bā, bet joprojām bija vājÅ” kakls - S3 objektu lejupielāde diskā nebija ļoti ātra un nebija pilnÄ«bā paralēla. Lai to labotu, es rÄ«kojos Ŕādi:

  1. Es uzzināju, ka ir iespējams ieviest S3 lejupielādes stadiju tieÅ”i konveijerā, pilnÄ«bā novērÅ”ot starpglabāŔanu diskā. Tas nozÄ«mē, ka varu izvairÄ«ties no neapstrādātu datu ierakstÄ«Å”anas diskā un izmantot vēl mazāku un lÄ«dz ar to lētāku krātuvi AWS.
  2. komanda aws configure set default.s3.max_concurrent_requests 50 ievērojami palielināja AWS CLI izmantoto pavedienu skaitu (pēc noklusējuma ir 10).
  3. Es pārgāju uz EC2 gadÄ«jumu, kas optimizēts tÄ«kla ātrumam un kura nosaukumā ir burts n. Es atklāju, ka apstrādes jaudas zudumu, izmantojot n-gadÄ«jumus, vairāk nekā kompensē ielādes ātruma palielināŔanās. Lielākajai daļai uzdevumu es izmantoju c5n.4xl.
  4. MainÄ«ts gzip par pigz, Å”is ir gzip rÄ«ks, kas var darÄ«t lieliskas lietas, lai paralēli sākotnēji neparalēlētajam failu atspieÅ”anas uzdevumam (tas palÄ«dzēja vismazāk).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Å Ä«s darbÄ«bas ir apvienotas viena ar otru, lai viss darbotos ļoti ātri. Palielinot lejupielādes ātrumu un novērÅ”ot ierakstÄ«Å”anu diskā, tagad es varētu apstrādāt 5 terabaitu pakotni tikai dažās stundās.

Šajā tvītā vajadzēja pieminēt "TSV". Diemžēl.

Tiek izmantoti tikko parsētie dati

Ko esmu iemācījies: Spark patīk nesaspiesti dati un nepatīk apvienot nodalījumus.

Tagad dati bija S3 neizpakotā (lasÄ«t: koplietotā) un daļēji pasÅ«tÄ«tā formātā, un es varēju vēlreiz atgriezties Sparkā. Mani gaidÄ«ja pārsteigums: man atkal neizdevās sasniegt to, ko gribēju! Bija ļoti grÅ«ti Spark precÄ«zi pateikt, kā dati tika sadalÄ«ti. Un pat tad, kad es to izdarÄ«ju, izrādÄ«jās, ka starpsienu bija pārāk daudz (95 tÅ«kstoÅ”i), un kad es to izmantoju coalesce samazināja to skaitu lÄ«dz saprātÄ«gām robežām, tas iznÄ«cināja manu nodalÄ«jumu. Esmu pārliecināts, ka to var labot, taču pēc pāris dienu meklÄ“Å”anas nevarēju atrast risinājumu. Es beidzot pabeidzu visus uzdevumus Sparkā, lai gan tas prasÄ«ja kādu laiku un mani sadalÄ«tie Parquet faili nebija ļoti mazi (~200 KB). Tomēr dati bija tur, kur tie bija vajadzÄ«gi.

25 TB parsÄ“Å”ana, izmantojot AWK un R
Par mazu un nelīdzenu, brīniŔķīgi!

Vietējo Spark vaicājumu pārbaude

Ko esmu iemācÄ«jies: Spark ir pārāk lielas izmaksas, risinot vienkārÅ”as problēmas.

Lejupielādējot datus gudrā formātā, varēju pārbaudīt ātrumu. Iestatiet R skriptu, lai palaistu lokālo Spark serveri, un pēc tam ielādēja Spark datu rāmi no norādītās Parquet grupas krātuves (bin). Es mēģināju ielādēt visus datus, bet nevarēju panākt, lai Sparklyr atpazītu nodalījumu.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Izpilde ilga 29,415 sekundes. Daudz labāk, bet ne pārāk labi kaut ko masveida testÄ“Å”anai. Turklāt es nevarēju paātrināt darbu ar keÅ”atmiņu, jo, mēģinot keÅ”atmiņā saglabāt datu rāmi, Spark vienmēr avarēja, pat ja datu kopai, kas sver mazāk par 50, pieŔķīru vairāk nekā 15 GB atmiņas.

Atgriezties uz AWK

Ko esmu iemācījies: AWK asociatīvie masīvi ir ļoti efektīvi.

Sapratu, ka varu sasniegt lielāku ātrumu. Es to atcerējos brÄ«niŔķīgā BrÅ«sa Bārneta AWK apmācÄ«ba Es lasÄ«ju par lielisku funkciju ar nosaukumu "asociatÄ«vie masÄ«vi" BÅ«tÄ«bā tie ir atslēgu-vērtÄ«bu pāri, kas nez kāpēc AWK tika saukti atŔķirÄ«gi, un tāpēc es kaut kā par tiem daudz nedomāju. Romāns Čepļaka atgādināja, ka termins ā€œasociatÄ«vie masÄ«viā€ ir daudz vecāks par terminu ā€œatslēgas vērtÄ«bu pārisā€. Pat ja jÅ«s meklējiet atslēgas vērtÄ«bu pakalpojumā Google Ngram, jÅ«s tur neredzēsit Å”o terminu, bet jÅ«s atradÄ«sit asociatÄ«vus masÄ«vus! Turklāt ā€œatslēgas-vērtÄ«bas pārisā€ visbiežāk tiek saistÄ«ts ar datu bāzēm, tāpēc ir daudz saprātÄ«gāk to salÄ«dzināt ar hashmap. Es sapratu, ka varu izmantot Å”os asociatÄ«vos masÄ«vus, lai saistÄ«tu savus SNP ar bin tabulu un neapstrādātiem datiem, neizmantojot Spark.

Lai to izdarītu, AWK skriptā es izmantoju bloku BEGIN. Šis ir koda fragments, kas tiek izpildīts, pirms pirmā datu rinda tiek nodota skripta galvenajai daļai.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Komanda while(getline...) ielādēja visas rindas no CSV grupas (bin), iestatiet pirmo kolonnu (SNP nosaukumu) kā asociatīvā masīva atslēgu bin un otrā vērtība (grupa) kā vērtība. Pēc tam blokā { }, kas tiek izpildīts visās galvenā faila rindās, katra rinda tiek nosūtīta uz izvades failu, kas saņem unikālu nosaukumu atkarībā no grupas (bin): ..._bin_"bin[$1]"_....

MainÄ«gie batch_num Šø chunk_id sakrita ar konveijera sniegtajiem datiem, izvairoties no sacensÄ«bu stāvokļa un katra izpildes pavediena palaiÅ”anas parallel, rakstÄ«ja savā unikālajā failā.

Tā kā es izkaisÄ«ju visus neapstrādātos datus mapēs hromosomās, kas palika pāri no mana iepriekŔējā eksperimenta ar AWK, tagad es varētu uzrakstÄ«t citu Bash skriptu, lai apstrādātu vienu hromosomu vienlaikus un nosÅ«tÄ«tu dziļākus sadalÄ«tos datus uz S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Skriptam ir divas sadaļas parallel.

Pirmajā sadaļā dati tiek nolasÄ«ti no visiem failiem, kas satur informāciju par vēlamo hromosomu, pēc tam Å”ie dati tiek sadalÄ«ti pa pavedieniem, kas sadala failus atbilstoŔās grupās (bin). Lai izvairÄ«tos no sacensÄ«bu apstākļiem, kad vienā failā raksta vairāki pavedieni, AWK nodod failu nosaukumus, lai rakstÄ«tu datus dažādās vietās, piemēram, chr_10_bin_52_batch_2_aa.csv. Rezultātā diskā tiek izveidoti daudzi mazi faili (Å”im nolÅ«kam izmantoju terabaitu EBS apjomus).

Konveijers no otrās sekcijas parallel iziet cauri grupām (bin) un apvieno to atseviŔķos failus kopējā CSV c catun pēc tam nosÅ«ta tos eksportam.

Apraide R?

Ko esmu iemācÄ«jies: JÅ«s varat sazināties stdin Šø stdout no R skripta, un tāpēc izmantojiet to konveijerā.

JÅ«s, iespējams, pamanÄ«jāt Å”o rindiņu savā Bash skriptā: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Tas pārvērÅ” visus sasaistÄ«tos grupu failus (bin) tālāk esoÅ”ajā R skriptā. {} ir Ä«paÅ”a tehnika parallel, kas visus datus, ko tā sÅ«ta uz norādÄ«to straumi, ievieto tieÅ”i paŔā komandā. Opcija {#} nodroÅ”ina unikālu pavediena ID, un {%} apzÄ«mē darba vietas numuru (atkārtoti, bet nekad vienlaikus). Visu opciju sarakstu var atrast dokumentācija.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kad mainÄ«gais file("stdin") pārsÅ«tÄ«ts uz readr::read_csv, dati, kas tulkoti R skriptā, tiek ielādēti rāmÄ«, kas pēc tam atrodas formā .rds- failu izmantoÅ”ana aws.s3 rakstÄ«ts tieÅ”i uz S3.

RDS ir kaut kas lÄ«dzÄ«gs jaunākajai Parketa versijai bez skaļruņu uzglabāŔanas iespējām.

Pēc Bash skripta pabeigÅ”anas es saņēmu komplektu .rds-faili, kas atrodas S3, kas ļāva man izmantot efektÄ«vu saspieÅ”anu un iebÅ«vētos veidus.

Neskatoties uz bremžu R izmantoÅ”anu, viss nostrādāja ļoti ātri. Nav pārsteidzoÅ”i, ka R daļas, kas lasa un raksta datus, ir ļoti optimizētas. Pēc pārbaudes vienā vidēja izmēra hromosomā C5n.4xl instancē darbs tika pabeigts aptuveni divās stundās.

S3 Ierobežojumi

Ko esmu iemācījies: Pateicoties viedā ceļa ievieŔanai, S3 var apstrādāt daudzus failus.

Es uztraucos, vai S3 spēs apstrādāt daudzos failus, kas uz to tika pārsūtīti. Es varētu padarīt failu nosaukumus saprotamus, bet kā S3 tos meklētu?

25 TB parsÄ“Å”ana, izmantojot AWK un R
S3 mapes ir paredzētas tikai demonstrÄ“Å”anai, patiesÄ«bā sistēma neinteresē simbolu /. No S3 FAQ lapas.

Å Ä·iet, ka S3 apzÄ«mē ceļu uz konkrētu failu kā vienkārÅ”u atslēgu sava veida hash tabulā vai uz dokumentu balstÄ«tā datu bāzē. Spaini var uzskatÄ«t par tabulu, un failus var uzskatÄ«t par ierakstiem Å”ajā tabulā.

Tā kā ātrums un efektivitāte ir svarÄ«ga peļņas gÅ«Å”anai Amazon, nav pārsteigums, ka Ŕī atslēgas kā faila ceļa sistēma ir ārkārtÄ«gi optimizēta. Mēģināju atrast lÄ«dzsvaru: lai man nebÅ«tu jāiesniedz daudz pieprasÄ«jumu, bet lai pieprasÄ«jumi tiktu izpildÄ«ti ātri. IzrādÄ«jās, ka vislabāk ir izgatavot apmēram 20 tÅ«kstoÅ”us bin failu. Domāju, ka, ja turpināsim optimizēt, varam panākt ātruma pieaugumu (piemēram, izveidojot speciālu spaini tikai datiem, tādējādi samazinot uzmeklÄ“Å”anas tabulas izmēru). Taču turpmākiem eksperimentiem nebija ne laika, ne naudas.

Kā ir ar savstarpējo saderību?

Ko es uzzināju: galvenais izŔķērdēta laika iemesls ir priekÅ”laicÄ«ga uzglabāŔanas metodes optimizÄ“Å”ana.

Å ajā brÄ«dÄ« ir ļoti svarÄ«gi sev uzdot jautājumu: "Kāpēc izmantot patentētu faila formātu?" Iemesls ir ielādes ātrums (gzip CSV failu ielāde prasÄ«ja 7 reizes ilgāku laiku) un saderÄ«ba ar mÅ«su darbplÅ«smām. Es varu pārskatÄ«t, vai R var viegli ielādēt Parketa (vai Arrow) failus bez Spark slodzes. Ikviens mÅ«su laboratorijā izmanto R, un, ja man ir nepiecieÅ”ams konvertēt datus citā formātā, man joprojām ir oriÄ£inālie teksta dati, lai es varētu vienkārÅ”i palaist konveijeru vēlreiz.

Darba sadale

Ko esmu iemācījies: Nemēģiniet optimizēt darbus manuāli, ļaujiet to darīt datoram.

Esmu atkļūdojis darbplūsmu vienā hromosomā, tagad man ir jāapstrādā visi pārējie dati.
Es gribēju izvirzÄ«t vairākus EC2 gadÄ«jumus konvertÄ“Å”anai, taču tajā paŔā laikā es baidÄ«jos iegÅ«t ļoti nelÄ«dzsvarotu slodzi dažādos apstrādes darbos (tāpat kā Spark cieta no nelÄ«dzsvarotām starpsienām). Turklāt es neinteresējos palielināt vienu gadÄ«jumu katrā hromosomā, jo AWS kontiem noklusējuma ierobežojums ir 10 gadÄ«jumi.

Tad es nolēmu uzrakstīt skriptu R valodā, lai optimizētu apstrādes darbus.

Pirmkārt, es palūdzu S3 aprēķināt, cik daudz vietas katra hromosoma aizņēma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# ā€¦ with 17 more rows

Tad es uzrakstīju funkciju, kas ņem kopējo izmēru, sajauc hromosomu secību, sadala tās grupās num_jobs un norāda, cik dažādi ir visu apstrādes darbu izmēri.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Ɨ 3]>

Tad es skrēju cauri tÅ«kstoÅ” jaukÅ”anas reizēm, izmantojot purrr, un izvēlējos labāko.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Tāpēc es nonācu pie uzdevumu kopuma, kas bija ļoti lÄ«dzÄ«ga izmēra. Tad atlika tikai ietÄ«t manu iepriekŔējo BaÅ”a skriptu lielā cilpā for. Å Ä«s optimizācijas rakstÄ«Å”ana aizņēma apmēram 10 minÅ«tes. Un tas ir daudz mazāk, nekā es tērētu manuālai uzdevumu izveidei, ja tie bÅ«tu nelÄ«dzsvaroti. Tāpēc es domāju, ka man bija taisnÄ«ba ar Å”o sākotnējo optimizāciju.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Beigās pievienoju izslēgÅ”anas komandu:

sudo shutdown -h now

... un viss izdevās! Izmantojot AWS CLI, es izvirzīju gadījumus, izmantojot opciju user_data iedeva viņiem apstrādei savu uzdevumu Bash skriptus. Tie darbojās un izslēdzās automātiski, tāpēc es nemaksāju par papildu apstrādes jaudu.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Iepakosim!

Ko esmu iemācījies: API ir jābūt vienkārŔai izmantoŔanas viegluma un elastības labad.

Beidzot es saņēmu datus pareizajā vietā un formā. Atlika tikai pēc iespējas vienkārÅ”ot datu izmantoÅ”anas procesu, lai kolēģiem bÅ«tu vieglāk. Es gribēju izveidot vienkārÅ”u API pieprasÄ«jumu izveidei. Ja nākotnē es izlemÅ”u pāriet no .rds uz Parketa failiem, tad Å”ai problēmai vajadzētu bÅ«t man, nevis maniem kolēģiem. Å im nolÅ«kam es nolēmu izveidot iekŔējo R paketi.

Izveidojiet un dokumentējiet ļoti vienkārÅ”u pakotni, kurā ir tikai dažas datu piekļuves funkcijas, kas sakārtotas ap funkciju get_snp. Es arÄ« izveidoju vietni saviem kolēģiem pkgdown, lai viņi varētu viegli skatÄ«t piemērus un dokumentāciju.

25 TB parsÄ“Å”ana, izmantojot AWK un R

Viedā keÅ”atmiņa

Ko esmu iemācÄ«jies: Ja jÅ«su dati ir labi sagatavoti, saglabāŔana keÅ”atmiņā bÅ«s vienkārÅ”a!

Tā kā viena no galvenajām darbplÅ«smām izmantoja to paÅ”u analÄ«zes modeli SNP pakotnei, es nolēmu izmantot binning savā labā. PārsÅ«tot datus, izmantojot SNP, visa informācija no grupas (bin) tiek pievienota atgrieztajam objektam. Tas nozÄ«mē, ka vecie vaicājumi var (teorētiski) paātrināt jaunu vaicājumu apstrādi.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Veidojot pakotni, es izmantoju daudzus etalonus, lai salÄ«dzinātu ātrumu, izmantojot dažādas metodes. Iesaku to neatstāt novārtā, jo dažkārt rezultāti ir negaidÄ«ti. Piemēram, dplyr::filter bija daudz ātrāk nekā rindu tverÅ”ana, izmantojot uz indeksÄ“Å”anu balstÄ«tu filtrÄ“Å”anu, un vienas kolonnas izgÅ«Å”ana no filtrēta datu rāmja bija daudz ātrāka nekā indeksÄ“Å”anas sintakses izmantoÅ”ana.

LÅ«dzu, ņemiet vērā, ka objekts prev_snp_results satur atslēgu snps_in_bin. Å is ir visu unikālo SNP masÄ«vs grupā (bin), kas ļauj ātri pārbaudÄ«t, vai jums jau ir dati no iepriekŔējā vaicājuma. Tas arÄ« atvieglo visu grupas (bin) SNP cilpu, izmantojot Å”o kodu:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

rezultātus

Tagad mēs varam (un esam sākuÅ”i nopietni) palaist modeļus un scenārijus, kas mums iepriekÅ” nebija pieejami. Labākais ir tas, ka maniem laboratorijas kolēģiem nav jādomā par sarežģījumiem. Viņiem vienkārÅ”i ir funkcija, kas darbojas.

Un, lai gan iepakojums viņiem aiztaupa detaļas, es mēģināju padarÄ«t datu formātu pietiekami vienkārÅ”u, lai viņi to varētu saprast, ja es rÄ«t pēkŔņi pazustu...

Ātrums manāmi pieaudzis. Mēs parasti skenējam funkcionāli nozÄ«mÄ«gus genoma fragmentus. IepriekÅ” mēs to nevarējām izdarÄ«t (tas izrādÄ«jās pārāk dārgi), bet tagad, pateicoties grupu (bin) struktÅ«rai un keÅ”atmiņai, viena SNP pieprasÄ«jums vidēji aizņem mazāk nekā 0,1 sekundi, un datu lietojums ir tik liels. zemas, ka S3 izmaksas ir zemesrieksti.

Secinājums

Å is raksts nepavisam nav ceļvedis. Risinājums izrādÄ«jās individuāls un gandrÄ«z noteikti nav optimāls. DrÄ«zāk tas ir ceļojuma apraksts. Es vēlos, lai citi saprastu, ka Ŕādi lēmumi nerodas lÄ«dz galam, tie ir izmēģinājumu un kļūdu rezultāts. Turklāt, ja meklējat datu zinātnieku, ņemiet vērā, ka Å”o rÄ«ku efektÄ«vai lietoÅ”anai ir nepiecieÅ”ama pieredze un pieredze maksā naudu. Esmu laimÄ«ga, ka man bija iespējas maksāt, bet daudziem citiem, kas to paÅ”u darbu var izdarÄ«t labāk par mani, naudas trÅ«kuma dēļ nekad nebÅ«s iespējas pat mēģināt.

Lielo datu rīki ir daudzpusīgi. Ja jums ir laiks, gandrīz noteikti varat uzrakstīt ātrāku risinājumu, izmantojot viedas datu tīrīŔanas, uzglabāŔanas un ieguves metodes. Galu galā tas ir saistīts ar izmaksu un ieguvumu analīzi.

Ko es uzzināju:

  • nav lēta veida, kā vienā reizē parsēt 25 TB;
  • esiet piesardzÄ«gs ar savu Parketa failu izmēru un to organizāciju;
  • Spark starpsienām jābÅ«t lÄ«dzsvarotām;
  • Kopumā nekad nemēģiniet izveidot 2,5 miljonus nodalÄ«jumu;
  • Å Ä·iroÅ”ana joprojām ir sarežģīta, tāpat kā Spark iestatÄ«Å”ana;
  • dažreiz Ä«paÅ”iem datiem ir nepiecieÅ”ami Ä«paÅ”i risinājumi;
  • Dzirksteļu apkopoÅ”ana ir ātra, taču sadalÄ«Å”ana joprojām ir dārga;
  • neguli, kad tev māca pamatus, iespējams, kāds jau ir atrisinājis tavu problēmu astoņdesmitajos gados;
  • gnu parallel - tā ir maÄ£iska lieta, tā jāizmanto ikvienam;
  • Spark patÄ«k nesaspiesti dati un nepatÄ«k apvienot nodalÄ«jumus;
  • Spark ir pārāk lielas izmaksas, risinot vienkārÅ”as problēmas;
  • AWK asociatÄ«vie masÄ«vi ir ļoti efektÄ«vi;
  • jÅ«s varat sazināties stdin Šø stdout no R skripta un tāpēc izmantojiet to konveijerā;
  • Pateicoties viedā ceļa ievieÅ”anai, S3 var apstrādāt daudzus failus;
  • Galvenais laika tērÄ“Å”anas iemesls ir priekÅ”laicÄ«ga uzglabāŔanas metodes optimizÄ“Å”ana;
  • nemēģiniet optimizēt uzdevumus manuāli, ļaujiet to darÄ«t datoram;
  • API ir jābÅ«t vienkārÅ”ai izmantoÅ”anas viegluma un elastÄ«bas labad;
  • Ja jÅ«su dati ir labi sagatavoti, keÅ”atmiņas saglabāŔana bÅ«s vienkārÅ”a!

Avots: www.habr.com

Pievieno komentāru