25TB analizatzen AWK eta R erabiliz

25TB analizatzen AWK eta R erabiliz
Nola irakurri artikulu hau: Barkatu testua hain luzea eta kaotikoa izateagatik. Denbora aurrezteko, kapitulu bakoitza "Ikasi dudana" sarrera batekin hasten dut, kapituluaren funtsa esaldi batean edo bitan laburbiltzen duena.

"Erakutsi soluzioa!" Nondik natorren ikusi nahi baduzu, joan zaitez "Asmatzaileagoa bilakatzea" kapitulura, baina porrotaren inguruan irakurtzea interesgarriagoa eta erabilgarriagoa dela uste dut.

Duela gutxi DNA sekuentzia gordinak (teknikoki, SNP txipa) bolumen handi bat prozesatzeko prozesu bat konfiguratzeaz arduratu nintzen. Beharrezkoa zen kokapen genetiko jakin bati buruzko datuak azkar lortzea (SNP izenekoa) ondorengo modelatze eta beste zeregin batzuetarako. R eta AWK erabiliz, datuak modu naturalean garbitu eta antolatzeko gai izan nintzen, kontsulten prozesamendua asko azkartuz. Hau ez zen erraza izan niretzat eta errepikapen ugari behar izan zituen. Artikulu honek nire akats batzuk saihesten lagunduko dizu eta zer lortu dudan erakutsiko dizu.

Hasteko, sarrerako azalpen batzuk.

Datu

Gure unibertsitateko informazio genetikoa prozesatzeko zentroak 25 TBko TSV baten formako datuak eman zizkigun. 5 paketetan banatuta jaso nituen, Gzip-ek konprimituta, eta horietako bakoitzak lau gigabyte inguruko 240 fitxategi zituen. Errenkada bakoitzak banako baten SNP baterako datuak zituen. Guztira, ~2,5 milioi SNP eta ~60 mila pertsonari buruzko datuak transmititu ziren. SNP informazioaz gain, fitxategiek zutabe ugari zituzten hainbat ezaugarri islatzen zituzten zenbakiak, hala nola irakurketa intentsitatea, alelo ezberdinen maiztasuna, etab. Guztira 30 bat zutabe zeuden balio bereziekin.

Helburua

Datuak kudeatzeko edozein proiektutan bezala, garrantzitsuena datuak nola erabiliko ziren zehaztea zen. Kasu honetan gehienbat SNPrako ereduak eta lan-fluxuak hautatuko ditugu SNPn oinarrituta. Hau da, aldi berean SNP baten datuak soilik beharko ditugu. 2,5 milioi SNPetako batekin lotutako erregistro guztiak ahalik eta errazen, azkar eta merkeen berreskuratzen ikasi behar izan nuen.

Nola ez egin hau

Topiko egoki bat aipatzeko:

Ez nuen mila aldiz huts egin, mila modu aurkitu ditut datu mordoa kontsultak egiteko moduko formatuan analizatzea ekiditeko.

Lehenengo proba

Zer ikasi dut: Ez dago aldi berean 25 TB analizatzeko modu merkerik.

Vanderbilt Unibertsitatean "Datu handien tratamendurako metodo aurreratuak" ikastaroa egin ostean, ziur nengoen trikimailua poltsan zegoela. Ziurrenik ordu bat edo bi beharko dira Hive zerbitzaria konfiguratzeko datu guztiak exekutatzeko eta emaitzaren berri emateko. Gure datuak AWS S3-n gordetzen direnez, zerbitzua erabili dut Athena, Hive SQL kontsultak S3 datuetan aplikatzeko aukera ematen duena. Ez duzu Hive kluster bat konfiguratu/goratu beharrik, eta, gainera, bilatzen ari zaren datuengatik bakarrik ordaintzen duzu.

Athenari nire datuak eta bere formatua erakutsi ondoren, proba batzuk egin nituen honelako kontsultekin:

select * from intensityData limit 10;

Eta azkar ondo egituratutako emaitzak jaso zituen. Prest.

Gure lanean datuak erabiltzen saiatu ginen arte...

SNP informazio guztia ateratzeko eskatu zidaten eredua probatzeko. Kontsulta egin nuen:


select * from intensityData 
where snp = 'rs123456';

...eta itxaroten hasi zen. Zortzi minutu eta eskatutako datuen 4 TB baino gehiagoren ostean, emaitza jaso nuen. Athena-k aurkitutako datu-bolumenaren arabera kobratzen du, 5 $ terabyte bakoitzeko. Beraz, eskaera bakar honek 20 $ eta zortzi minutu itxaron behar ditu. Eredua datu guztietan exekutatzeko, 38 urte itxaron eta 50 milioi dolar ordaindu behar izan ditugu.Bistan denez, hau ez zitzaigun egokia.

Beharrezkoa zen parketa erabiltzea...

Zer ikasi dut: Kontuz ibili zure Parquet fitxategien tamainarekin eta haien antolaketarekin.

Lehenik eta behin, egoera konpontzen saiatu nintzen TSV guztiak bihurtuz Parquet fitxategiak. Datu-multzo handiekin lan egiteko erosoak dira, haietako informazioa zutabe forman gordetzen delako: zutabe bakoitza bere memoria/disko-segmentuan dago, testu-fitxategien aldean, zeinetan errenkadak zutabe bakoitzeko elementuak baititu. Eta zerbait aurkitu behar baduzu, irakurri behar den zutabea. Gainera, fitxategi bakoitzak balio sorta bat gordetzen du zutabe batean, beraz, bilatzen ari zaren balioa zutabearen barrutian ez badago, Sparkek ez du denbora galduko fitxategi osoa eskaneatzen.

Zeregin sinple bat exekutatu nuen AWS Kola gure TSVak Parquet bihurtzeko eta fitxategi berriak Athena-ra bota zituen. 5 ordu inguru behar izan zituen. Baina eskaera exekutatu nuenean, denbora kopuru bera eta diru apur bat gutxiago behar izan ziren osatzeko. Kontua da Spark-ek, zeregina optimizatu nahian, TSV zati bat deskonprimitu eta bere Parquet zatian jarri besterik ez zuela. Eta zati bakoitza jende askoren erregistro osoa edukitzeko nahikoa zenez, fitxategi bakoitzak SNP guztiak zituen, beraz Sparkek fitxategi guztiak ireki behar zituen behar zuen informazioa ateratzeko.

Interesgarria da Parquet-en konpresio mota lehenetsia (eta gomendatua), snappy, ez da zatigarria. Hori dela eta, exekutatzaile bakoitza 3,5 GB-ko datu-multzo osoa deskonprimitu eta deskargatzeko zereginean geratu zen.

25TB analizatzen AWK eta R erabiliz

Uler dezagun arazoa

Zer ikasi dut: Zaila da ordenatzea, batez ere datuak banatzen badira.

Orain arazoaren funtsa ulertzen nuela iruditu zitzaidan. Datuak SNP zutabearen arabera ordenatu behar izan ditut, ez pertsonen arabera. Ondoren, hainbat SNP datu-zati bereizi batean gordeko dira, eta gero Parquet-en "adimentsua" funtzioa "balioa barrutian badago soilik irekiko da" bere distira guztian erakutsiko da. Zoritxarrez, kluster batean sakabanatuta dauden milaka milioi errenkadak sailkatzea lan zaila izan zen.

AWS-k ez du itzulketarik egin nahi "Ikasle distraitua naiz" arrazoia dela eta. Amazon Glue-n ordenatzen exekutatu ondoren, 2 egun egin zituen eta huts egin zuen.

Eta zatiketarekin?

Zer ikasi dut: Spark-en partizioak orekatu egin behar dira.

Gero, datuak kromosometan banatzea bururatu zitzaidan. 23 dira (eta beste hainbat DNA mitokondriala eta mapatu gabeko eskualdeak kontuan hartzen badituzu).
Honek datuak zati txikiagoetan banatzeko aukera emango dizu. Spark esportazio funtzioari lerro bakarra gehitzen badiozu Glue script-ean partition_by = "chr", orduan datuak kuboetan banatu behar dira.

25TB analizatzen AWK eta R erabiliz
Genoma kromosoma izeneko zati ugariz osatuta dago.

Zoritxarrez, ez zuen funtzionatu. Kromosomek tamaina desberdinak dituzte, hau da, informazio kantitate desberdina da. Horrek esan nahi du Sparkek langileei bidali zizkien zereginak ez zirela orekatu eta poliki-poliki amaitu, nodo batzuk goiz amaitu zirelako eta inaktibo zeudelako. Dena den, zereginak bete ziren. Baina SNP bat eskatzean, desorekak arazoak sortu zituen berriro. Kromosoma handiagoetan SNPak prozesatzeko kostua (hau da, non lortu nahi ditugun datuak) 10 faktore inguru baino ez da gutxitu. Asko, baina ez nahikoa.

Eta zati txikiagotan banatzen badugu?

Zer ikasi dut: Inoiz ez saiatu 2,5 milioi partizio egiten.

Guztiak ateratzea erabaki nuen eta SNP bakoitza zatitzea. Honek partizioak tamaina berekoak zirela ziurtatzen zuen. IDEIA TXARRA IZAN ZEN. Kola erabili nuen eta lerro errugabe bat gehitu nuen partition_by = 'snp'. Eginkizuna hasi eta exekutatzen hasi zen. Egun bat geroago egiaztatu nuen eta oraindik ez zegoela ezer idatzita S3-n ikusi nuen, beraz, zeregina hil nuen. Badirudi Glue-k tarteko fitxategiak idazten ari zela S3-n ezkutuko kokapen batean, fitxategi asko, agian milioi bat. Ondorioz, nire akatsak mila dolar baino gehiago balio zuen eta ez zuen nire tutorea atsegin izan.

Banaketa + ordenatzea

Zer ikasi dut: Sailkatzea zaila da oraindik, Spark sintonizatzea bezala.

Nire azken saiakerak kromosomak zatitu eta gero partizio bakoitza ordenatu nuen. Teorian, horrek kontsulta bakoitza bizkortuko luke, nahi den SNP datuek tarte jakin baten barruan Parquet zati gutxi batzuen barruan egon behar zutelako. Zoritxarrez, zatitutako datuak ere sailkatzea lan zaila izan zen. Ondorioz, EMRra aldatu nintzen kluster pertsonalizatu baterako eta zortzi instantzia indartsu (C5.4xl) eta Sparklyr erabili nituen lan-fluxu malguagoa sortzeko...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...hala ere, zeregina oraindik amaitu gabe zegoen. Modu ezberdinetan konfiguratu nuen: kontsulta exekutatzaile bakoitzaren memoria esleipena handitu, memoria kopuru handia zuten nodoak erabili, broadcast aldagaiak erabili (difusio aldagaiak), baina aldi bakoitzean hauek erdi-neurriak izaten ziren eta pixkanaka exekutatzaileak hasi ziren. huts egin dena gelditu arte.

Gero eta sortzaileagoa naiz

Zer ikasi dut: Batzuetan datu bereziek irtenbide bereziak behar dituzte.

SNP bakoitzak posizio-balio bat du. Hau bere kromosomako base kopuruari dagokion zenbakia da. Gure datuak antolatzeko modu polita eta naturala da. Hasieran kromosoma bakoitzeko eskualdeen arabera banatu nahi nuen. Adibidez, posizioak 1 - 2000, 2001 - 4000, etab. Baina arazoa da SNPak ez daudela berdin banatzen kromosometan, beraz, taldeen tamainak asko aldatuko dira.

25TB analizatzen AWK eta R erabiliz

Ondorioz, posizioak kategorietan banatzea lortu nuen (maila). Jada deskargatutako datuak erabiliz, eskaera bat egin nuen SNP esklusiboen zerrenda, haien posizioak eta kromosomak lortzeko. Ondoren, kromosoma bakoitzaren barruan datuak ordenatu eta SNPak tamaina jakin bateko taldeetan (binak) bildu nituen. Demagun 1000 SNP bakoitzak. Horrek SNP-tik-talde-kromosoma bakoitzeko erlazioa eman zidan.

Azkenean, 75 SNPko taldeak (binak) egin nituen, arrazoia jarraian azalduko da.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Lehenik eta behin, saiatu Spark-ekin

Zer ikasi dut: Spark agregazioa azkarra da, baina partizioa garestia da oraindik.

Datu-marko txiki hau (2,5 milioi errenkada) Spark-en irakurri nahi nuen, datu gordinarekin konbinatu eta, ondoren, gehitu berri den zutabearen arabera banatu. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

erabili nuen sdf_broadcast(), beraz Spark-ek badaki datu-markoa nodo guztietara bidali behar duela. Hau erabilgarria da datuak tamaina txikikoak badira eta zeregin guztietarako beharrezkoak badira. Bestela, Spark adimentsu izaten saiatzen da eta behar den moduan datuak banatzen ditu, eta horrek moteltzeak eragin ditzake.

Eta, berriro ere, nire ideiak ez zuen funtzionatu: zereginek denbora batez funtzionatu zuten, sindikatua osatu zuten, eta gero, zatiketaz abiarazitako exekutiboak bezala, huts egiten hasi ziren.

AWK gehitzea

Zer ikasi dut: Ez egin lorik oinarriak irakasten dizkizutenean. Segur aski, norbaitek dagoeneko konpondu zuen zure arazoa 1980ko hamarkadan.

Orain arte, Spark-ekin egindako huts guztien arrazoia klusterreko datuen nahastea izan zen. Beharbada, egoera hobetu daiteke aurretratamenduarekin. Testu gordinaren datuak kromosoma zutabeetan banatzen saiatzea erabaki nuen, beraz, Spark-i "aurrez zatitutako" datuak ematea espero nuen.

StackOverflow-en bilatu nuen zutabeen balioen arabera nola zatitu eta aurkitu nuen hain erantzun bikaina. AWK-rekin testu-fitxategi bat zutabeen balioen arabera zati dezakezu script batean idatziz emaitzak bidali beharrean. stdout.

Bash gidoia idatzi nuen probatzeko. Paketatutako TSVetako bat deskargatu eta gero deskonpaktatu erabiliz gzip eta bidali awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Funtzionatu zuen!

Nukleoak betetzea

Zer ikasi dut: gnu parallel - gauza magikoa da, denek erabili beharko lukete.

Banaketa nahiko motela izan zen eta hasi nintzenean htopEC2 instantzia indartsu (eta garesti) baten erabilera egiaztatzeko, nukleo bakarra eta 200 MB inguru memoria erabiltzen ari nintzen. Arazoa konpontzeko eta diru asko ez galtzeko, lana nola paralelizatu asmatu behar izan dugu. Zorionez, liburu guztiz harrigarrian Datu-zientzia komando lerroan Paralelizazioari buruzko Jeron Janssens-en kapitulu bat aurkitu dut. Hortik ikasi nuen gnu parallel, Unix-en multithreading ezartzeko metodo oso malgua.

25TB analizatzen AWK eta R erabiliz
Prozesu berria erabiliz partizionatzen hasi nintzenean, dena ondo zegoen, baina oraindik ere botila-lepoa zegoen - S3 objektuak diskora deskargatzea ez zen oso azkarra eta ez zen guztiz paralelizatu. Hau konpontzeko, hau egin nuen:

  1. S3 deskarga-fasea zuzenean kanalizazioan ezartzea posible dela jakin nuen, diskoan bitarteko biltegiratzea erabat ezabatuz. Horrek esan nahi du datu gordinak diskoan idaztea saihestu dezakedala eta AWS-en biltegiratze are txikiagoa eta, beraz, merkeagoa erabiltzea.
  2. taldea aws configure set default.s3.max_concurrent_requests 50 AWS CLI-k erabiltzen dituen hari kopurua asko handitu du (lehenespenez 10 dira).
  3. Sareko abiadurarako optimizatutako EC2 instantzia batera aldatu nintzen, izenan n hizkiarekin. n instantziak erabiltzean prozesatzeko ahalmenaren galera karga-abiaduraren igoerarekin baino gehiago konpentsatzen dela aurkitu dut. Zeregin gehienetarako c5n.4xl erabili dut.
  4. Aldatua gzip on pigz, fitxategiak deskonprimitzeko hasiera batean paralelizatu gabeko zeregina paralelizatzeko gauza politak egin ditzake gzip tresna bat da (horrek gutxien lagundu zuen).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Urrats hauek elkarren artean konbinatzen dira dena oso azkar funtziona dezan. Deskarga-abiadura areagotuz eta disko-idazketak ezabatuz, orain 5 terabyteko pakete bat prozesatu nezake ordu gutxitan.

Txio honek 'TSV' aipatu behar zuen. Ai!

Analizatu berri diren datuak erabiliz

Zer ikasi dut: Spark-i konprimitu gabeko datuak gustatzen zaizkio eta ez zaio gustatzen partizioak konbinatzea.

Orain datuak S3-n zeuden formatu gabeko (irakurri: partekatua) eta erdi-ordenatutako formatuan, eta Sparkera itzuli nezakeen berriro. Sorpresa bat itxaroten nuen: berriro ez nuen lortu nahi nuena! Oso zaila zen Spark-i datuak nola banatu ziren zehatz-mehatz esatea. Eta hau egin nuenean ere, partizio gehiegi zeudela (95 mila), eta noiz erabili nuen coalesce haien kopurua arrazoizko mugetara murriztu zuten, horrek nire zatiketa suntsitu zuen. Ziur nago hau konpondu daitekeela, baina pare bat egun bilatu ondoren ezin izan dut irtenbiderik aurkitu. Azkenean Spark-eko zeregin guztiak amaitu nituen, nahiz eta denbora pixka bat behar izan eta nire zatitutako Parquet fitxategiak ez ziren oso txikiak (~200 KB). Hala ere, datuak behar ziren lekuan zeuden.

25TB analizatzen AWK eta R erabiliz
Txikiegia eta irregularra, zoragarria!

Spark lokaleko kontsultak probatzen

Zer ikasi dut: Spark-ek gainkostu handiegia du arazo sinpleak ebazteko orduan.

Datuak formatu adimentsuan deskargatuz, abiadura probatu ahal izan nuen. Konfiguratu R script bat Spark zerbitzari lokal bat exekutatzeko, eta, ondoren, Spark datu-markoa kargatu zehaztutako Parquet taldeko biltegiratzetik (ontzi). Datu guztiak kargatzen saiatu nintzen, baina ezin izan nuen Sparklyr-ek partizioa ezagutzea.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Exekuzioak 29,415 segundo behar izan zituen. Askoz hobeto, baina ez oso ona ezer proba masiborako. Gainera, ezin izan nituen gauzak azkartu cachean, datu-marko bat memorian cacheatzen saiatzen nintzenean, Spark-ek beti huts egiten zuen, nahiz eta 50 GB baino gehiago memoria esleitu nituenean 15 baino gutxiagoko datu multzo bati.

Itzuli AWK-ra

Zer ikasi dut: AWK-ko matrize asoziatiboak oso eraginkorrak dira.

Abiadura handiagoak lor nezakeela konturatu nintzen. Zoragarri batean gogoratu nuen hori Bruce Barnett-en AWK tutoriala "" izeneko funtzio polit bati buruz irakurri nuenarray elkartuak" Funtsean, gako-balio bikoteak dira, arrazoiren batengatik AWK-n modu ezberdinean deitzen direnak, eta, beraz, nolabait ez nuen horietaz asko pentsatu. Roman Cheplyaka gogoratu du "matrize elkartuak" terminoa "gako-balio bikotea" terminoa baino askoz zaharragoa dela. Zuk bada ere bilatu gako-balioa Google Ngram-en, ez duzu termino hau hor ikusiko, baina array elkartuak aurkituko dituzu! Gainera, "gako-balio bikotea" datu-baseekin lotzen da gehienetan, beraz, askoz ere zentzuzkoa da hashmap batekin alderatzea. Konturatu nintzen array elkartu hauek erabil nezakeela nire SNPak bin taula eta datu gordinarekin lotzeko Spark erabili gabe.

Horretarako, AWK scriptean blokea erabili dut BEGIN. Datuen lehen lerroa script-aren gorputz nagusira pasatu aurretik exekutatzen den kode zati bat da.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) CSV taldeko errenkada guztiak kargatu ditu (bin), lehen zutabea (SNP izena) ezarri matrize elkartuaren gako gisa bin eta bigarren balioa (taldea) balio gisa. Gero blokean { }, fitxategi nagusiko lerro guztietan exekutatzen dena, lerro bakoitza irteerako fitxategira bidaltzen da, zeinak izen esklusibo bat jasotzen du bere taldearen (bin) arabera: ..._bin_"bin[$1]"_....

aldagai batch_num ΠΈ chunk_id kanalizazioak emandako datuekin bat egin zuen, lasterketa-baldintza bat saihestuz eta exekuzio-hari bakoitza martxan jarriz parallel, bere fitxategi esklusiboan idatzi zuen.

Datu gordina guztiak AWK-rekin egindako aurreko esperimentuan geratutako kromosometako karpetetan sakabanatu nituenez, orain beste Bash script bat idatz nezake kromosoma bat aldi berean prozesatzeko eta S3-ra zatitutako datu sakonagoak bidaltzeko.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Gidoiak bi atal ditu parallel.

Lehenengo atalean, nahi den kromosomari buruzko informazioa duten fitxategi guztietatik datuak irakurtzen dira, ondoren datu horiek harietan banatuko dira, fitxategiak talde egokietan banatzen dituztenak (bin). Hainbat hari fitxategi berean idazten direnean lasterketa-baldintzak saihesteko, AWK-k fitxategien izenak pasatzen ditu datuak leku ezberdinetara idazteko, adibidez. chr_10_bin_52_batch_2_aa.csv. Ondorioz, fitxategi txiki asko sortzen dira diskoan (horretarako terabyte EBS bolumenak erabili ditut).

Bigarren ataleko garraiatzailea parallel taldeetatik (bin) igarotzen da eta haien fitxategi indibidualak CSV c komun batean konbinatzen ditu cateta gero esportatzeko bidaltzen ditu.

R-n emititzen?

Zer ikasi dut: Jar zaitez harremanetan stdin ΠΈ stdout R script batetik, eta, beraz, erabili kanalizazioan.

Baliteke zure Bash script-ean lerro hau nabaritzea: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Kateatuta dauden talde-fitxategi guztiak (bin) beheko R script-era itzultzen ditu. {} teknika berezi bat da parallel, zehaztutako korronteari bidaltzen dituen datuak zuzenean komandoan bertan txertatzen dituena. Aukera {#} hari ID bakarra eskaintzen du, eta {%} lanaren zirrikituaren zenbakia adierazten du (errepikatua, baina inoiz ez aldi berean). Aukera guztien zerrenda hemen aurki daiteke dokumentazioa.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Aldagai bat denean file("stdin") igorria readr::read_csv, R script-era itzultzen diren datuak marko batean kargatzen dira, gero forman dagoena .rds-fitxategia erabiliz aws.s3 S3-ra zuzenean idatzita.

RDS Parquet-en junior bertsioaren antzeko zerbait da, bozgorailuaren biltegiratze apurrik gabe.

Bash gidoia amaitu ondoren sorta bat lortu nuen .rds-S3-n kokatutako fitxategiak, konpresio eraginkorra eta barne-motak erabiltzeko aukera eman zidan.

R balazta erabili arren, dena oso azkar funtzionatu zuen. Ez da harritzekoa datuak irakurtzen eta idazten dituzten R-ren zatiak oso optimizatuta egotea. Tamaina ertaineko kromosoma batean probatu ondoren, lana C5n.4xl instantzia batean burutu zen bi ordu gutxi gorabehera.

S3 Mugak

Zer ikasi dut: Bide adimendunen ezarpenari esker, S3-k fitxategi asko kudea ditzake.

Kezkatuta nengoen S3-k bertara transferitutako fitxategi asko kudeatzeko gai izango ote zen. Fitxategi-izenei zentzua eman nezake, baina nola bilatuko lituzke S3-k?

25TB analizatzen AWK eta R erabiliz
S3-ko karpetak erakusteko besterik ez dira, izan ere sistemari ez zaio ikurra interesatzen /. S3 FAQ orrialdetik.

Badirudi S3-k fitxategi jakin baterako bidea gako soil gisa adierazten duela hash taula edo dokumentuetan oinarritutako datu-base moduko batean. Ontzi bat taula gisa har daiteke, eta fitxategiak taula horretako erregistrotzat har daitezke.

Abiadura eta eraginkortasuna Amazonen irabaziak lortzeko garrantzitsuak direnez, ez da harritzekoa gako gisa fitxategi-bide-sistema hau izugarri optimizatuta egotea. Oreka bat bilatzen saiatu nintzen: get eskaera asko egin beharrik ez izateko, baina eskaerak azkar gauzatu zitezen. Agertu zen hobe dela 20 mila bin fitxategi inguru egitea. Uste dut optimizatzen jarraitzen badugu, abiadura handitzea lor dezakegula (adibidez, datuetarako soilik ontzi berezi bat egitea, horrela bilaketa-taularen tamaina murriztuz). Baina ez zegoen denbora edo dirurik esperimentu gehiago egiteko.

Zer gertatzen da bateragarritasun gurutzatua?

Ikasi dudana: denbora alferrik galtzearen arrazoi nagusia biltegiratze-metodoa lehenago optimizatzea da.

Une honetan, oso garrantzitsua da zure buruari galdetzea: "Zergatik erabili fitxategi formatu jabeduna?" Arrazoia kargatzeko abiaduran (gzipped CSV fitxategiak 7 aldiz gehiago behar izan ziren kargatzeko) eta gure lan-fluxuekin bateragarritasunean datza. Berrazter dezaket R-k Parquet (edo Arrow) fitxategiak erraz karga ditzakeen Spark kargarik gabe. Gure laborategiko guztiek R erabiltzen dute, eta datuak beste formatu batera bihurtu behar baditut, jatorrizko testuaren datuak ditut oraindik, beraz, kanalizazioa berriro exekutatu ahal izango dut.

Lan banaketa

Zer ikasi dut: Ez saiatu lanak eskuz optimizatzen, utzi ordenagailuak egiten.

Lan-fluxua kromosoma batean arazketa egin dut, orain beste datu guztiak prozesatu behar ditut.
Hainbat EC2 instantzia planteatu nahi nituen bihurtzeko, baina, aldi berean, prozesatzeko lan ezberdinetan karga oso desorekatua lortzeko beldur nintzen (Sparkek partizio desorekatuak jasaten zituen bezala). Horrez gain, ez zitzaidan interesatzen kromosoma bakoitzeko instantzia bat igotzea, AWS kontuetarako 10 instantziako muga lehenetsia baitago.

Orduan erabaki nuen script bat idaztea R-n prozesatzeko lanak optimizatzeko.

Lehenik eta behin, kromosoma bakoitzak zenbat biltegiratze espazio hartzen zuen kalkulatzeko eskatu nion S3ri.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Gero, tamaina osoa hartzen duen funtzio bat idatzi nuen, kromosomen ordena nahasten duena, taldeetan banatzen duena. num_jobs eta prozesatzeko lan guztien tamaina zein desberdina den esaten dizu.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Γ— 3]>

Gero, mila nahasketa egin nituen purrr erabiliz eta onena aukeratu nuen.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Beraz, tamainaz oso antzekoak ziren zeregin multzo batekin amaitu nuen. Orduan falta zen nire aurreko Bash gidoia begizta handi batean biltzea izan zen for. Optimizazio honek 10 minutu inguru behar izan zituen idazteko. Eta desorekatuta egongo balira zereginak eskuz sortzeko gastatuko nukeena baino askoz ere txikiagoa da. Horregatik, uste dut arrazoia nuela aurretiazko optimizazio honekin.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Amaieran itzaltzeko komandoa gehitzen dut:

sudo shutdown -h now

... eta dena ondo atera zen! AWS CLI erabiliz, instantziak planteatu nituen aukera erabiliz user_data prozesatzeko beren zereginen Bash script-ak eman zizkieten. Automatikoki exekutatu eta itzali ziren, beraz, ez nuen prozesatzeko potentzia gehigarririk ordaintzen.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Egin dezagun paketea!

Zer ikasi dut: APIa sinplea izan behar da erabilera erraztasunaren eta malgutasunaren mesedetan.

Azkenik datuak leku eta forma egokian lortu ditut. Besterik gabe geratu zen datuak erabiltzeko prozesua ahalik eta gehien erraztea nire lankideei errazago egiteko. Eskaerak sortzeko API sinple bat egin nahi nuen. Etorkizunean hortik aldatzea erabakitzen badut .rds Parquet fitxategietara, orduan honek arazo bat izan beharko luke niretzat, ez nire lankideentzat. Horretarako barne R pakete bat egitea erabaki nuen.

Eraiki eta dokumentatu oso pakete sinple bat, funtzio baten inguruan antolatutako datuetara sartzeko funtzio gutxi batzuk dituena get_snp. Nire lankideentzako webgune bat ere egin nuen pkgdown, adibideak eta dokumentazioa erraz ikus ditzaten.

25TB analizatzen AWK eta R erabiliz

Cache adimenduna

Zer ikasi dut: Zure datuak ondo prestatuta badaude, cachean gordetzea erraza izango da!

Lan-fluxu nagusietako batek SNP paketeari analisi-eredu bera aplikatu zuenez, binning-a nire onurarako erabiltzea erabaki nuen. Datuak SNP bidez igortzen direnean, taldeko (bin) informazio guztia itzultzen den objektuari eransten zaio. Hau da, kontsulta zaharrek (teorian) kontsulta berrien prozesamendua bizkortu dezakete.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Paketea eraikitzean, erreferentzia asko exekutatu nituen abiadura alderatzeko metodo desberdinak erabiltzean. Hau ez alde batera uztea gomendatzen dut, batzuetan emaitzak ustekabekoak direlako. Adibidez, dplyr::filter indexazioan oinarritutako iragazkia erabiliz errenkadak harrapatzea baino askoz azkarragoa zen, eta iragazitako datu-marko batetik zutabe bakarra berreskuratzea indexazio-sintaxia erabiltzea baino askoz azkarragoa zen.

Kontuan izan objektua dela prev_snp_results gakoa dauka snps_in_bin. Hau talde bateko SNP esklusibo guztien multzoa da (ontzia), aurreko kontsulta bateko datuak lehendik dituzun ala ez egiaztatzeko aukera ematen duena. Gainera, talde bateko SNP guztiak (ontzia) kode honekin errazago errepasatzea errazten du:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Findings

Orain, lehen eskuraezinak ziren ereduak eta eszenatokiak exekutatu ditzakegu (eta serio hasi gara). Onena da nire laborategiko lankideek ez dutela konplikaziorik pentsatu behar. Funtzio bat besterik ez dute.

Eta paketeak xehetasunak gordetzen dituen arren, datu-formatua nahikoa sinple egiten saiatu nintzen bihar bat-batean desagertuko banintz asma zezaten...

Abiadura nabarmen handitu da. Normalean funtzionalki esanguratsuak diren genoma zatiak eskaneatzen ditugu. Lehen, ezin genuen hau egin (garestia suertatu zen), baina orain, taldearen (bin) egiturari eta cacheari esker, SNP baten eskaerak batez beste 0,1 segundo baino gutxiago irauten du, eta datuen erabilera hain da. baxua S3-ren kostuak kakahueteak direla.

Ondorioa

Artikulu hau ez da batere gida bat. Irtenbidea indibiduala izan zen, eta ia ziur ez da optimoa. Baizik eta, bidaia-koadernoa da. Besteek ulertzea nahi dut horrelako erabakiak ez direla buruan guztiz osatuta agertzen, saiakeraren ondorio direla. Gainera, datu-zientzialari baten bila bazabiltza, kontuan izan tresna hauek eraginkortasunez erabiltzeak esperientzia eskatzen duela eta esperientziak dirua balio duela. Pozik nago ordaintzeko baliabideak izan ditudalako, baina ni baino lan bera hobeto egin dezaketen beste askok ez dute inoiz aukerarik izango diru faltagatik probatzeko ere.

Big data tresnak polifazetikoak dira. Denbora badaukazu, ziur asko irtenbide azkarrago bat idatzi dezakezu datuen garbiketa, biltegiratze eta erauzketa teknika adimendunak erabiliz. Azken finean, kostu-onuraren azterketara dator.

Ikasi dudana:

  • ez dago aldi berean 25 TB analizatzeko modu merkerik;
  • kontuz ibili zure Parquet fitxategien tamainarekin eta haien antolaketarekin;
  • Spark-en partizioak orekatuak izan behar dira;
  • Oro har, inoiz ez saiatu 2,5 milioi partizio egiten;
  • Sailkatzea zaila da oraindik, Spark konfiguratzea bezala;
  • batzuetan datu bereziek irtenbide bereziak behar dituzte;
  • Spark agregazioa azkarra da, baina partizioa garestia da oraindik;
  • ez egin lorik oinarriak irakasten dizkizutenean, ziurrenik norbaitek dagoeneko konpondu zuen zure arazoa 1980ko hamarkadan;
  • gnu parallel - Hau gauza magikoa da, denek erabili beharko lukete;
  • Spark-i konprimitu gabeko datuak gustatzen zaizkio eta ez zaio gustatzen partizioak konbinatzea;
  • Spark-ek gainkostu gehiegi du arazo sinpleak ebazten;
  • AWK-ren matrize elkartuak oso eraginkorrak dira;
  • harremanetan jar zaitezke stdin ΠΈ stdout R script batetik, eta, beraz, erabili kanalizazioan;
  • Bide adimendunen ezarpenari esker, S3-k fitxategi asko prozesatu ditzake;
  • Denbora galtzearen arrazoi nagusia biltegiratze metodoa goiztiago optimizatzea da;
  • ez saiatu zereginak eskuz optimizatzen, utzi ordenagailuak egiten;
  • APIak sinplea izan behar du erabilera erraztasunaren eta malgutasunaren mesedetan;
  • Zure datuak ondo prestatuta badaude, erraza izango da cachean gordetzea!

Iturria: www.habr.com

Gehitu iruzkin berria