Jäsentää 25 Tt AWK:n ja R:n avulla

Jäsentää 25 Tt AWK:n ja R:n avulla
Kuinka lukea tämä artikkeli: Pahoittelen, että teksti on niin pitkä ja kaoottinen. Ajan säästämiseksi aloitan jokaisen luvun "Mitä opin" johdannossa, joka tiivistää luvun olemuksen yhdellä tai kahdella virkkeellä.

"Näytä minulle ratkaisu!" Jos haluat vain nähdä, mistä olen kotoisin, siirry lukuun "Kekseliäisemmäksi tuleminen", mutta mielestäni on mielenkiintoisempaa ja hyödyllisempää lukea epäonnistumisesta.

Minulle annettiin äskettäin tehtäväksi perustaa prosessi suuren määrän raaka-DNA-sekvenssejä (teknisesti SNP-sirua) käsittelemiseksi. Tarve oli saada nopeasti tietoa tietystä geneettisestä sijainnista (kutsutaan SNP:ksi) myöhempää mallinnusta ja muita tehtäviä varten. R:n ja AWK:n avulla pystyin puhdistamaan ja järjestämään tiedot luonnollisella tavalla, mikä nopeutti huomattavasti kyselyjen käsittelyä. Tämä ei ollut minulle helppoa ja vaati useita iteraatioita. Tämä artikkeli auttaa sinua välttämään joitain virheitäni ja näyttämään, mihin päädyin.

Ensinnäkin muutama johdantoselvitys.

Tiedot

Yliopistomme geneettisen tiedonkäsittelykeskuksemme toimitti meille tiedot 25 TB TSV:n muodossa. Sain ne jaettuna viiteen Gzip-pakkaukseen, joista jokainen sisälsi noin 5 neljän gigatavun tiedostoa. Jokainen rivi sisälsi tietoja yhdeltä yksilöltä yhdelle SNP:lle. Yhteensä lähetettiin tietoja ~240 miljoonasta SNP:stä ja ~2,5 tuhannesta ihmisestä. SNP-tietojen lisäksi tiedostot sisälsivät lukuisia sarakkeita numeroilla, jotka kuvastivat erilaisia ​​ominaisuuksia, kuten lukuintensiteettiä, eri alleelien esiintymistiheyttä jne. Yhteensä oli noin 60 saraketta yksilöllisillä arvoilla.

Tavoite

Kuten kaikissa tiedonhallintaprojekteissa, tärkeintä oli määrittää, miten dataa käytetään. Tässä tapauksessa valitsemme enimmäkseen SNP:n mallit ja työnkulkut SNP:n perusteella. Eli tarvitsemme tietoja vain yhdestä SNP:stä kerrallaan. Minun piti opetella hakemaan kaikki tietueet, jotka liittyvät yhteen 2,5 miljoonasta SNP:stä mahdollisimman helposti, nopeasti ja halvalla.

Kuinka ei tehdä tätä

Lainatakseni sopivaa klisettä:

En epäonnistunut tuhat kertaa, olen vain löytänyt tuhat tapaa välttää tietojen jäsentämisen kyselyystävällisessä muodossa.

Yritä ensin

Mitä olen oppinut: Ei ole halpaa tapaa jäsentää 25 Tt kerrallaan.

Käytyäni kurssin "Advanced Methods for Big Data Processing" Vanderbiltin yliopistossa olin varma, että temppu oli pussissa. Hive-palvelimen asettaminen käymään läpi kaikki tiedot ja raportoimaan tuloksen kestää luultavasti tunnin tai kaksi. Koska tietomme on tallennettu AWS S3:een, käytin palvelua Athena, jonka avulla voit käyttää Hive SQL -kyselyjä S3-tietoihin. Sinun ei tarvitse perustaa/nostaa Hive-klusteria, ja maksat myös vain etsimistäsi tiedoista.

Kun näytin Athenalle tietoni ja sen muodon, suoritin joitain testejä seuraavilla kyselyillä:

select * from intensityData limit 10;

Ja sai nopeasti hyvin jäsenneltyjä tuloksia. Valmis.

Kunnes yritimme käyttää tietoja työssämme...

Minua pyydettiin vetämään kaikki SNP-tiedot mallin testaamiseksi. Tein kyselyn:


select * from intensityData 
where snp = 'rs123456';

...ja alkoi odottaa. Kahdeksan minuutin ja yli 4 Tt pyydettyjen tietojen jälkeen sain tuloksen. Athena veloittaa löydetyn tiedon määrän mukaan, 5 dollaria teratavua kohden. Joten tämä yksittäinen pyyntö maksoi 20 dollaria ja kahdeksan minuuttia odotusta. Jotta mallia voitaisiin käyttää kaikilla tiedoilla, jouduimme odottamaan 38 vuotta ja maksamaan 50 miljoonaa dollaria, mikä ei tietenkään sopinut meille.

Oli pakko käyttää parkettia...

Mitä olen oppinut: Ole varovainen parkettitiedostojesi koon ja niiden järjestyksen suhteen.

Yritin ensin korjata tilanteen muuntamalla kaikki TSV:t muotoon Parkettitiedostot. Ne ovat käteviä työskennellä suurten tietojoukkojen kanssa, koska niissä olevat tiedot on tallennettu sarakemuotoon: jokainen sarake on omassa muisti-/levysegmentissään, toisin kuin tekstitiedostot, joissa rivit sisältävät kunkin sarakkeen elementtejä. Ja jos sinun on löydettävä jotain, lue vain vaadittu sarake. Lisäksi jokainen tiedosto tallentaa useita arvoja sarakkeeseen, joten jos etsimäsi arvo ei ole sarakkeen alueella, Spark ei tuhlaa aikaa koko tiedoston skannaamiseen.

Suoritin yksinkertaisen tehtävän AWS-liima muuntaaksemme TSV:mme parketiksi ja pudotimme uudet tiedostot Athenaan. Kesti noin 5 tuntia. Mutta kun suoritin pyynnön, sen täyttämiseen kului suunnilleen saman verran aikaa ja vähän vähemmän rahaa. Tosiasia on, että Spark, joka yritti optimoida tehtävän, yksinkertaisesti purki yhden TSV-palan ja laittoi sen omaan parkettipalaansa. Ja koska jokainen pala oli tarpeeksi suuri sisältämään monien ihmisten kokonaiset tietueet, jokainen tiedosto sisälsi kaikki SNP:t, joten Spark joutui avaamaan kaikki tiedostot poimiakseen tarvitsemansa tiedot.

Mielenkiintoista on, että Parquetin oletus (ja suositeltu) pakkaustyyppi, snappy, ei ole jaettavissa. Siksi jokainen toteuttaja oli jumissa tehtävässä purkaa ja ladata koko 3,5 Gt:n tietojoukko.

Jäsentää 25 Tt AWK:n ja R:n avulla

Ymmärretään ongelma

Mitä olen oppinut: Lajittelu on vaikeaa, varsinkin jos tiedot on jaettu.

Minusta tuntui, että nyt ymmärrän ongelman olemuksen. Minun piti lajitella tiedot vain SNP-sarakkeen, ei ihmisten mukaan. Sitten useat SNP:t tallennetaan erilliseen tietopakettiin, ja sitten Parquetin "älykäs"-toiminto "avoinna vain, jos arvo on alueella" näyttää itsensä kaikessa loistossaan. Valitettavasti klusterin poikki hajallaan olevien miljardien rivien lajittelu osoittautui vaikeaksi tehtäväksi.

AWS ei todellakaan halua myöntää hyvitystä "Olen hajamielinen opiskelija" -syyn vuoksi. Kun suoritin lajittelun Amazon Gluella, se toimi 2 päivää ja kaatui.

Entä osiointi?

Mitä olen oppinut: Sparkin väliseinät on tasapainotettava.

Sitten keksin idean jakaa tiedot kromosomeihin. Niitä on 23 (ja useita muita, jos otetaan huomioon mitokondrioiden DNA ja kartoittamattomat alueet).
Näin voit jakaa tiedot pienempiin osiin. Jos lisäät vain yhden rivin Spark-vientitoimintoon Glue-skriptissä partition_by = "chr", tiedot tulee jakaa ryhmiin.

Jäsentää 25 Tt AWK:n ja R:n avulla
Genomi koostuu lukuisista fragmenteista, joita kutsutaan kromosomeiksi.

Valitettavasti se ei toiminut. Kromosomit ovat erikokoisia, mikä tarkoittaa eri määriä tietoa. Tämä tarkoittaa, että Sparkin työntekijöille lähettämät tehtävät eivät olleet tasapainossa ja valmistuivat hitaasti, koska jotkut solmut valmistuivat aikaisin ja olivat käyttämättömänä. Tehtävät saatiin kuitenkin tehtyä. Mutta kun pyydettiin yhtä SNP:tä, epätasapaino aiheutti jälleen ongelmia. SNP:iden käsittelykustannukset suuremmissa kromosomeissa (eli missä haluamme saada tietoja) ovat laskeneet vain noin 10-kertaiseksi. Paljon, mutta ei tarpeeksi.

Entä jos jaamme sen vielä pienempiin osiin?

Mitä olen oppinut: Älä koskaan yritä tehdä 2,5 miljoonaa osiota ollenkaan.

Päätin tehdä kaiken ja osioin jokaisen SNP:n. Tämä varmisti, että väliseinät olivat samankokoisia. SE OLI HUONA IDEA. Käytin liimaa ja lisäsin viattoman viivan partition_by = 'snp'. Tehtävä alkoi ja alkoi toteuttaa. Päivää myöhemmin tarkistin ja huomasin, että S3:een ei vieläkään ollut kirjoitettu mitään, joten lopetin tehtävän. Näyttää siltä, ​​että Glue kirjoitti välitiedostoja piilotettuun paikkaan S3:ssa, paljon tiedostoja, ehkä pari miljoonaa. Tämän seurauksena virheeni maksoi yli tuhat dollaria, eikä se miellyttänyt mentoriani.

Osiointi + lajittelu

Mitä olen oppinut: Lajittelu on edelleen vaikeaa, kuten myös Spark-viritys.

Viimeinen osiointiyritykseni sisälsi kromosomien osioinnin ja kunkin osion lajittelun. Teoriassa tämä nopeuttaisi jokaista kyselyä, koska haluttujen SNP-tietojen piti olla muutaman Parquet-palan sisällä tietyllä alueella. Valitettavasti jopa osioitujen tietojen lajittelu osoittautui vaikeaksi tehtäväksi. Tämän seurauksena vaihdoin EMR:ään mukautettua klusteria varten ja käytin kahdeksaa tehokasta esiintymää (C5.4xl) ja Sparklyria joustavamman työnkulun luomiseksi...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tehtävää ei kuitenkaan vieläkään suoritettu. Konfiguroin sen eri tavoilla: lisäsin muistin varausta jokaiselle kyselyn suorittajalle, käytin solmuja, joissa oli paljon muistia, käytin yleislähetysmuuttujia (lähetysmuuttujia), mutta joka kerta nämä osoittautuivat puolimitoiksi, ja vähitellen suorittajat alkoivat epäonnistua, kunnes kaikki pysähtyy.

Minusta tulee luovempi

Mitä olen oppinut: Joskus erikoistiedot vaativat erikoisratkaisuja.

Jokaisella SNP:llä on sijaintiarvo. Tämä on luku, joka vastaa emästen määrää sen kromosomissa. Tämä on mukava ja luonnollinen tapa järjestää tietomme. Aluksi halusin jakaa kunkin kromosomin alueiden mukaan. Esimerkiksi paikat 1 - 2000, 2001 - 4000 jne. Mutta ongelmana on, että SNP:t eivät ole jakautuneet tasaisesti kromosomeihin, joten ryhmien koot vaihtelevat suuresti.

Jäsentää 25 Tt AWK:n ja R:n avulla

Tuloksena päädyin tehtävien jakautumiseen luokkiin (sijoitus). Käyttämällä jo ladattuja tietoja suoritin pyynnön saada luettelo ainutlaatuisista SNP:istä, niiden sijainnista ja kromosomeista. Sitten lajittelin tiedot kussakin kromosomissa ja keräsin SNP:t tietyn kokoisiin ryhmiin (bin). Oletetaan 1000 SNP:tä kukin. Tämä antoi minulle SNP-to-group-per-kromosomi -suhteen.

Lopulta tein 75 SNP:n ryhmät (bin), syy selitetään alla.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Kokeile ensin Sparkilla

Mitä olen oppinut: Spark-aggregointi on nopeaa, mutta osiointi on silti kallista.

Halusin lukea tämän pienen (2,5 miljoonaa riviä) tietokehyksen Sparkiin, yhdistää sen raakadataan ja sitten osioida sen äskettäin lisätyllä sarakkeella bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

käytin sdf_broadcast(), joten Spark tietää, että sen pitäisi lähettää datakehys kaikille solmuille. Tämä on hyödyllistä, jos tiedot ovat kooltaan pieniä ja niitä tarvitaan kaikkiin tehtäviin. Muuten Spark yrittää olla älykäs ja jakaa tietoja tarpeen mukaan, mikä voi aiheuttaa hidastuksia.

Ja taas, ajatukseni ei toiminut: tehtävät toimivat jonkin aikaa, muodostivat liiton, ja sitten ne alkoivat epäonnistua, kuten osioinnin käynnistämät toimeenpanijat.

Lisätään AWK

Mitä olen oppinut: Älä nuku, kun sinulle opetetaan perusasiat. Varmasti joku ratkaisi ongelmasi jo 1980-luvulla.

Tähän asti syynä kaikkiin epäonnistumiseeni Sparkissa oli klusterin tietojen sekamelska. Ehkä tilannetta voidaan parantaa esikäsittelyllä. Päätin yrittää jakaa raakatekstidatan kromosomisarakkeisiin, joten toivoin voivani tarjota Sparkille "esiosoitettua" dataa.

Etsin StackOverflowsta kuinka jakaa sarakearvojen mukaan ja löysin niin mahtava vastaus. AWK:lla voit jakaa tekstitiedoston sarakearvojen mukaan kirjoittamalla sen skriptillä sen sijaan, että lähetät tulokset stdout.

Kirjoitin Bash-skriptin kokeillakseni sitä. Latasi yhden pakatuista TSV:istä ja puristi sen sitten käyttämällä gzip ja lähetetty osoitteeseen awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Se toimi!

Sydänten täyttäminen

Mitä olen oppinut: gnu parallel - Se on maaginen asia, kaikkien pitäisi käyttää sitä.

Ero oli melko hidasta ja kun aloitin htoptehokkaan (ja kalliin) EC2-instanssin käytön tarkistamiseksi kävi ilmi, että käytin vain yhtä ydintä ja noin 200 Mt muistia. Ratkaiseksemme ongelman ja emme menettäisi paljon rahaa, meidän oli keksittävä, kuinka työ rinnastetaan. Onneksi aivan uskomattomassa kirjassa Tietotiede komentorivillä Löysin Jeron Janssensin luvun rinnastamisesta. Siitä opin gnu parallel, erittäin joustava menetelmä monisäikeistyksen toteuttamiseen Unixissa.

Jäsentää 25 Tt AWK:n ja R:n avulla
Kun aloitin osioinnin uudella prosessilla, kaikki oli hyvin, mutta pullonkaula oli silti olemassa - S3-objektien lataaminen levylle ei ollut kovin nopeaa eikä täysin rinnastettu. Korjatakseni tämän tein näin:

  1. Huomasin, että S3-latausvaihe on mahdollista toteuttaa suoraan liukuhihnassa, jolloin välitallennus levylle jää kokonaan pois. Tämä tarkoittaa, että voin välttää raakadatan kirjoittamisen levylle ja käyttää vielä pienempää ja siten halvempaa tallennustilaa AWS:ssä.
  2. tiimi aws configure set default.s3.max_concurrent_requests 50 lisäsi huomattavasti AWS CLI:n käyttämien säikeiden määrää (oletusarvoisesti niitä on 10).
  3. Vaihdoin verkon nopeudelle optimoituun EC2-instanssiin, jonka nimessä on kirjain n. Olen havainnut, että käsittelytehon menetys käytettäessä n-instanssia on enemmän kuin kompensoitu latausnopeuden kasvulla. Useimpiin tehtäviin käytin c5n.4xl.
  4. Muutettu gzip päälle pigz, tämä on gzip-työkalu, joka voi tehdä hienoja asioita rinnakkaistakseen alun perin ei-rinnakkaisen tiedostojen purkamisen (tämä auttoi vähiten).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Nämä vaiheet yhdistetään toisiinsa, jotta kaikki toimisi erittäin nopeasti. Lisäämällä latausnopeuksia ja eliminoimalla levyn kirjoitusten, pystyin nyt käsittelemään 5 teratavun paketin vain muutamassa tunnissa.

Tässä twiitissä olisi pitänyt mainita "TSV". Valitettavasti.

Hiljattain jäsennetyn tiedon käyttäminen

Mitä olen oppinut: Spark pitää pakkaamattomasta tiedosta eikä osioiden yhdistämisestä.

Nyt tiedot olivat S3:ssa pakkaamattomassa (lue: jaetussa) ja puolitilatussa muodossa, ja voisin palata Sparkiin uudelleen. Minua odotti yllätys: en taaskaan onnistunut saavuttamaan haluamaani! Sparkille oli erittäin vaikeaa kertoa tarkasti, kuinka tiedot osioitiin. Ja jopa kun tein tämän, kävi ilmi, että osioita oli liikaa (95 tuhatta), ja kun käytin coalesce vähensi niiden määrää kohtuullisiin rajoihin, tämä tuhosi osiointini. Olen varma, että tämä voidaan korjata, mutta muutaman päivän etsinnän jälkeen en löytänyt ratkaisua. Lopulta sain kaikki tehtävät valmiiksi Sparkissa, vaikka se kestikin ja jaetut Parquet-tiedostoni eivät olleet kovin pieniä (~200 KB). Data oli kuitenkin siellä, missä sitä tarvittiin.

Jäsentää 25 Tt AWK:n ja R:n avulla
Liian pieni ja epätasainen, ihana!

Testataan paikallisia Spark-kyselyitä

Mitä olen oppinut: Sparkilla on liikaa kustannuksia yksinkertaisten ongelmien ratkaisemisessa.

Lataamalla tiedot älykkäässä muodossa, pystyin testaamaan nopeutta. Määritä R-komentosarja suorittamaan paikallista Spark-palvelinta ja lataa sitten Spark-tietokehys määritetystä Parquet-ryhmämuistista (lokero). Yritin ladata kaikki tiedot, mutta en saanut Sparklyria tunnistamaan osiointia.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Toteutus kesti 29,415 sekuntia. Paljon parempi, mutta ei liian hyvä minkään massatestaukseen. Lisäksi en voinut nopeuttaa asioita välimuistiin tallentamalla, koska kun yritin tallentaa datakehyksen välimuistiin, Spark kaatui aina, vaikka varasin yli 50 Gt muistia alle 15:n painoiselle tietojoukolle.

Palaa AWK:hen

Mitä olen oppinut: AWK:n assosiatiiviset taulukot ovat erittäin tehokkaita.

Tajusin, että pystyn saavuttamaan suurempia nopeuksia. Muistin sen upealla tavalla Bruce Barnettin AWK-opetusohjelma Luin hienosta ominaisuudesta nimeltä "assosiatiiviset taulukot" Pohjimmiltaan nämä ovat avain-arvo-pareja, joita jostain syystä kutsuttiin eri tavalla AWK:ssa, ja siksi en jotenkin ajatellut niitä paljon. Roman Cheplyaka muistutti, että termi "assosiatiiviset taulukot" on paljon vanhempi kuin termi "avainarvopari". Vaikka sinä etsi avainarvo Google Ngramista, et näe tätä termiä siellä, mutta löydät assosiatiivisia taulukoita! Lisäksi "avain-arvo-pari" liittyy useimmiten tietokantoihin, joten on paljon järkevämpää verrata sitä hashmappiin. Ymmärsin, että voisin käyttää näitä assosiatiivisia taulukoita yhdistääkseni SNP:ni bin-taulukkoon ja raakadataan ilman Sparkia.

Tätä varten käytin AWK-skriptissä lohkoa BEGIN. Tämä on koodinpätkä, joka suoritetaan ennen kuin ensimmäinen tietorivi välitetään komentosarjan pääosaan.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Joukkue while(getline...) ladannut kaikki rivit CSV-ryhmästä (bin), aseta ensimmäinen sarake (SNP-nimi) assosiatiivisen taulukon avaimeksi bin ja toinen arvo (ryhmä) arvona. Siis lohkossa { }, joka suoritetaan päätiedoston kaikilla riveillä, jokainen rivi lähetetään tulostiedostoon, joka saa yksilöllisen nimen ryhmästään (bin): ..._bin_"bin[$1]"_....

muuttujat batch_num и chunk_id täsmäsi liukuhihnan antamien tietojen kanssa välttäen kilpailutilanteen ja jokaisen suoritussäikeen käynnissä parallel, kirjoitti omaan ainutlaatuiseen tiedostoonsa.

Koska hajotin kaiken raakadatan kansioihin kromosomeihin, jotka jäivät edellisestä AWK-kokeilustani, nyt voin kirjoittaa toisen Bash-skriptin käsittelemään kromosomi kerrallaan ja lähettämään syvemmälle osioitua dataa S3:lle.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Käsikirjoituksessa on kaksi osaa parallel.

Ensimmäisessä osiossa tiedot luetaan kaikista tiedostoista, jotka sisältävät tietoa halutusta kromosomista, sitten nämä tiedot jaetaan säikeiden kesken, jotka jakavat tiedostot sopiviin ryhmiin (bin). Välttääkseen kilpailuolosuhteet, kun useat säikeet kirjoittavat samaan tiedostoon, AWK välittää tiedostojen nimet kirjoittaakseen tietoja eri paikkoihin, esim. chr_10_bin_52_batch_2_aa.csv. Tämän seurauksena levylle luodaan monia pieniä tiedostoja (tätä varten käytin teratavun EBS-taltioita).

Kuljetin toisesta osasta parallel käy ryhmien (bin) läpi ja yhdistää niiden yksittäiset tiedostot yhteiseksi CSV-tiedostoksi c catja lähettää ne sitten vientiin.

Lähetys R:ssä?

Mitä olen oppinut: Voit ottaa yhteyttä stdin и stdout R-komentosarjasta, ja siksi käytä sitä valmisteilla.

Olet ehkä huomannut tämän rivin Bash-skriptissäsi: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Se kääntää kaikki ketjutetut ryhmätiedostot (bin) alla olevaan R-komentosarjaan. {} on erityinen tekniikka parallel, joka lisää kaikki määritettyyn tietovirtaan lähettämänsä tiedot suoraan itse komentoon. Vaihtoehto {#} tarjoaa ainutlaatuisen säikeen tunnuksen ja {%} edustaa työpaikan numeroa (toistuu, mutta ei koskaan samanaikaisesti). Luettelo kaikista vaihtoehdoista löytyy osoitteesta dokumentointi.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kun muuttuja file("stdin") lähetetty readr::read_csv, R-skriptiksi käännetyt tiedot ladataan kehykseen, joka on sitten muodossa .rds- tiedostoa käyttämällä aws.s3 kirjoitetaan suoraan S3:een.

RDS on jotain parquetin junioriversiota, ilman kaiuttimien säilytyksen kekseliäisyyttä.

Kun Bash-skripti oli valmis, sain paketin .rds-tiedostot, jotka sijaitsevat S3:ssa, minkä ansiosta pystyin käyttämään tehokasta pakkausta ja sisäänrakennettuja tyyppejä.

R-jarrun käytöstä huolimatta kaikki toimi erittäin nopeasti. Ei ole yllättävää, että R:n osat, jotka lukevat ja kirjoittavat dataa, ovat erittäin optimoituja. Yhden keskikokoisen kromosomin testauksen jälkeen työ valmistui C5n.4xl-esiintymässä noin kahdessa tunnissa.

S3:n rajoitukset

Mitä olen oppinut: Älykkään polun toteutuksen ansiosta S3 pystyy käsittelemään monia tiedostoja.

Olin huolissani, pystyykö S3 käsittelemään monia siihen siirrettyjä tiedostoja. Voisin tehdä tiedostonimistä järkeviä, mutta miten S3 etsisi niitä?

Jäsentää 25 Tt AWK:n ja R:n avulla
S3:n kansiot ovat vain esittelyä varten, itse asiassa järjestelmä ei ole kiinnostunut symbolista /. S3:n UKK-sivulta.

Näyttää siltä, ​​että S3 edustaa polkua tiettyyn tiedostoon yksinkertaisena avaimena eräänlaisessa hash-taulukossa tai asiakirjapohjaisessa tietokannassa. Ämpäri voidaan ajatella taulukkona, ja tiedostoja voidaan pitää taulukon tietueina.

Koska nopeus ja tehokkuus ovat tärkeitä voiton saamiseksi Amazonissa, ei ole yllätys, että tämä avaintiedostopolkujärjestelmä on hullunkurisesti optimoitu. Yritin löytää tasapainoa: niin, ettei minun tarvinnut tehdä paljon get-pyyntöjä, vaan että pyynnöt toteutettiin nopeasti. Kävi ilmi, että on parasta tehdä noin 20 tuhatta roskakoritiedostoa. Uskon, että jos jatkamme optimointia, voimme saavuttaa nopeuden lisäämisen (esimerkiksi tekemällä erityisen kauhan vain tiedoille, mikä pienentää hakutaulukon kokoa). Mutta ei ollut aikaa eikä rahaa lisäkokeille.

Entä ristiinsopivuus?

Mitä opin: Suurin syy ajanhukkaan on varastointimenetelmäsi ennenaikainen optimointi.

Tässä vaiheessa on erittäin tärkeää kysyä itseltäsi: "Miksi käyttää patentoitua tiedostomuotoa?" Syynä on latausnopeus (gzipattujen CSV-tiedostojen lataaminen kesti 7 kertaa kauemmin) ja yhteensopivuus työnkulkujemme kanssa. Voin harkita uudelleen, pystyykö R lataamaan Parquet (tai Arrow) -tiedostoja helposti ilman Spark-kuormitusta. Kaikki laboratoriossamme käyttävät R:tä, ja jos minun on muutettava tiedot toiseen muotoon, minulla on edelleen alkuperäinen tekstidata, joten voin vain ajaa liukuhihnan uudelleen.

Työnjako

Mitä olen oppinut: Älä yritä optimoida töitä manuaalisesti, anna tietokoneen tehdä se.

Olen tehnyt virheenkorjauksen yhden kromosomin työnkulussa, nyt minun on käsiteltävä kaikki muut tiedot.
Halusin nostaa useita EC2-instansseja muuntamista varten, mutta samalla pelkäsin saavani erittäin epätasapainoisen kuormituksen eri prosessointitöihin (kuten Spark kärsi epätasapainoisista osioista). Lisäksi en ollut kiinnostunut nostamaan yhtä esiintymää kromosomia kohti, koska AWS-tileillä on oletusraja 10 esiintymää.

Sitten päätin kirjoittaa käsikirjoituksen R-kielellä prosessointitöiden optimoimiseksi.

Ensin pyysin S3:a laskemaan, kuinka paljon tallennustilaa kukin kromosomi vei.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Sitten kirjoitin funktion, joka ottaa kokonaiskoon, sekoittaa kromosomien järjestyksen, jakaa ne ryhmiin num_jobs ja kertoo kuinka erilaisia ​​kaikkien käsittelytöiden koot ovat.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Sitten juoksin läpi tuhat sekoitusta purrrilla ja valitsin parhaan.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Joten päädyin joukkoon tehtäviä, jotka olivat kooltaan hyvin samanlaisia. Sitten jäi vain kääriä edellinen Bash-käsikirjoitukseni isoon silmukkaan for. Tämän optimoinnin kirjoittaminen kesti noin 10 minuuttia. Ja tämä on paljon vähemmän kuin kuluttaisin tehtävien luomiseen manuaalisesti, jos ne olisivat epätasapainossa. Siksi uskon, että olin oikeassa tämän alustavan optimoinnin kanssa.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Lisään lopuksi shutdown-komennon:

sudo shutdown -h now

...ja kaikki sujui! AWS CLI:n avulla esitin esiintymiä käyttämällä vaihtoehtoa user_data antoi heille Bash-skriptit tehtävistään käsittelyä varten. Ne juoksivat ja sammuivat automaattisesti, joten en maksanut ylimääräisestä prosessointitehosta.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Pakkaamme!

Mitä olen oppinut: API:n tulee olla yksinkertainen käytön helppouden ja joustavuuden vuoksi.

Lopulta sain tiedot oikeaan paikkaan ja muotoon. Jäljelle jäi vain tietojen käytön yksinkertaistaminen mahdollisimman paljon, jotta se olisi helpompaa kollegoilleni. Halusin tehdä yksinkertaisen API:n pyyntöjen luomiseen. Jos tulevaisuudessa päätän vaihtaa .rds parkettiviiloille, tämän pitäisi olla ongelma minulle, ei kollegoilleni. Tätä varten päätin tehdä sisäisen R-paketin.

Rakenna ja dokumentoi hyvin yksinkertainen paketti, joka sisältää vain muutaman funktion ympärille järjestetyn datan käyttötoiminnon get_snp. Tein myös nettisivut kollegoilleni pkgdown, jotta he voivat helposti nähdä esimerkkejä ja dokumentteja.

Jäsentää 25 Tt AWK:n ja R:n avulla

Älykäs välimuisti

Mitä olen oppinut: Jos tietosi on hyvin valmisteltu, välimuistin tallentaminen on helppoa!

Koska yksi päätyönkuluista sovelsi samaa analyysimallia SNP-pakettiin, päätin käyttää binningiä hyödykseni. Kun dataa lähetetään SNP:n kautta, kaikki tiedot ryhmästä (bin) liitetään palautettuun objektiin. Eli vanhat kyselyt voivat (teoriassa) nopeuttaa uusien kyselyjen käsittelyä.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Pakettia rakentaessani suoritin monia benchmarkeja vertaillakseni nopeutta eri menetelmiä käytettäessä. Suosittelen, että et jätä tätä huomiotta, koska joskus tulokset ovat odottamattomia. Esimerkiksi, dplyr::filter oli paljon nopeampi kuin rivien sieppaus indeksointipohjaisella suodatuksella, ja yhden sarakkeen hakeminen suodatetusta datakehyksestä oli paljon nopeampaa kuin indeksointisyntaksin käyttäminen.

Huomaa, että esine prev_snp_results sisältää avaimen snps_in_bin. Tämä on joukko kaikkia yksilöllisiä SNP:itä ryhmässä (bin), jonka avulla voit tarkistaa nopeasti, onko sinulla jo tietoja edellisestä kyselystä. Se tekee myös helpoksi silmukan kaikkien ryhmän (bin) SNP:iden läpi tällä koodilla:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Tulokset

Nyt voimme (ja olemme alkaneet tosissaan) ajaa malleja ja skenaarioita, joihin emme aiemmin olleet saatavilla. Parasta on, että laboratoriokollegoideni ei tarvitse ajatella mitään komplikaatioita. Niillä on vain toiminto, joka toimii.

Ja vaikka paketti säästää heille yksityiskohtia, yritin tehdä tietomuodosta riittävän yksinkertaisen, jotta he ymmärtäisivät sen, jos katoaisin yhtäkkiä huomenna...

Nopeus on lisääntynyt huomattavasti. Skannaamme yleensä toiminnallisesti merkittäviä genomifragmentteja. Aiemmin emme voineet tehdä tätä (se osoittautui liian kalliiksi), mutta nyt ryhmä (bin) rakenteen ja välimuistin ansiosta yhden SNP:n pyyntö kestää keskimäärin alle 0,1 sekuntia ja datan käyttö on niin alhainen, että S3:n kustannukset ovat maapähkinöitä.

Johtopäätös

Tämä artikkeli ei ole ollenkaan opas. Ratkaisu osoittautui yksilölliseksi, eikä läheskään varmasti optimaalinen. Pikemminkin se on matkakertomus. Haluan muiden ymmärtävän, että tällaiset päätökset eivät näytä täysin muotoutuneilta päässä, ne ovat yrityksen ja erehdyksen tulosta. Lisäksi, jos etsit datatieteilijää, muista, että näiden työkalujen tehokas käyttö vaatii kokemusta ja kokemus maksaa rahaa. Olen iloinen, että minulla oli varaa maksaa, mutta monilla muilla, jotka pystyvät tekemään saman työn paremmin kuin minä, ei ole rahan puutteen vuoksi koskaan mahdollisuutta edes yrittää.

Big data -työkalut ovat monipuolisia. Jos sinulla on aikaa, voit melkein varmasti kirjoittaa nopeamman ratkaisun älykkäiden tietojen puhdistus-, tallennus- ja poimintatekniikoiden avulla. Viime kädessä kyse on kustannus-hyötyanalyysistä.

Mitä opin:

  • ei ole halpaa tapaa jäsentää 25 Tt kerrallaan;
  • ole varovainen parkettitiedostojesi koon ja niiden järjestyksen suhteen;
  • Sparkin väliseinät on tasapainotettava;
  • Yleensä älä koskaan yritä tehdä 2,5 miljoonaa osiota;
  • Lajittelu on edelleen vaikeaa, kuten myös Sparkin asettaminen;
  • joskus erikoistiedot vaativat erikoisratkaisuja;
  • Spark-aggregointi on nopeaa, mutta osiointi on silti kallista;
  • älä nuku, kun sinulle opetetaan perusasiat, joku luultavasti ratkaisi ongelmasi jo 1980-luvulla;
  • gnu parallel - tämä on maaginen asia, kaikkien tulisi käyttää sitä;
  • Spark pitää pakkaamattomasta tiedosta eikä osioiden yhdistämisestä;
  • Sparkilla on liikaa yleiskustannuksia yksinkertaisten ongelmien ratkaisemisessa;
  • AWK:n assosiatiiviset taulukot ovat erittäin tehokkaita;
  • voit ottaa yhteyttä stdin и stdout R-komentosarjasta ja siksi käytä sitä liukuhihnassa;
  • Älykkään polun toteutuksen ansiosta S3 voi käsitellä monia tiedostoja;
  • Suurin syy ajanhukkaan on varastointitavan ennenaikainen optimointi;
  • älä yritä optimoida tehtäviä manuaalisesti, anna tietokoneen tehdä se;
  • Sovellusliittymän tulee olla yksinkertainen käytön helppouden ja joustavuuden vuoksi;
  • Jos tietosi on hyvin valmisteltu, välimuistin tallentaminen on helppoa!

Lähde: will.com

Lisää kommentti