25 TB elemzése AWK és R használatával

25 TB elemzése AWK és R használatával
Hogyan kell olvasni ezt a cikket: Elnézést kérek a szöveg ilyen hosszú és kaotikus voltáért. Az időmegtakarítás érdekében minden fejezetet egy „Amit tanultam” bevezetővel kezdek, amely egy-két mondatban összefoglalja a fejezet lényegét.

– Csak mutasd meg a megoldást! Ha csak azt szeretné látni, honnan jöttem, akkor ugorjon a „leleményesebbé válás” című fejezethez, de szerintem érdekesebb és hasznosabb a kudarcról olvasni.

Nemrég azt a feladatot kaptam, hogy állítsak fel egy eljárást nagy mennyiségű nyers DNS-szekvencia (technikailag SNP chip) feldolgozására. Az volt az igény, hogy gyorsan adatokat szerezzünk egy adott genetikai helyről (úgynevezett SNP) a későbbi modellezéshez és egyéb feladatokhoz. Az R és az AWK használatával természetes módon tudtam megtisztítani és rendszerezni az adatokat, ami jelentősen felgyorsította a lekérdezések feldolgozását. Ez nem volt könnyű számomra, és számos iterációt igényelt. Ez a cikk segít elkerülni néhány hibámat, és megmutatja, mire jutottam.

Először is néhány bevezető magyarázat.

Adat

Egyetemi genetikai információfeldolgozó központunk 25 TB-os TSV formájában bocsátotta rendelkezésünkre az adatokat. 5 csomagra bontva kaptam meg őket Gzip-pel tömörítve, amelyek mindegyike körülbelül 240 darab négy gigabájtos fájlt tartalmazott. Minden sor egy személy egy SNP-jének adatait tartalmazta. Összességében ~2,5 millió SNP-ről és ~60 ezer emberről továbbítottak adatokat. Az SNP-információk mellett a fájlok számos oszlopot tartalmaztak, amelyekben számok különböző jellemzőket tükröztek, mint például az olvasási intenzitás, a különböző allélok gyakorisága stb. Összesen körülbelül 30 oszlop volt egyedi értékekkel.

Gól

Mint minden adatkezelési projektnél, itt is az volt a legfontosabb, hogy meghatározzuk, hogyan használjuk fel az adatokat. Ebben az esetben többnyire az SNP-n alapuló modelleket és munkafolyamatokat választjuk ki az SNP-hez. Vagyis egyszerre csak egy SNP-re lesz szükségünk. Meg kellett tanulnom, hogyan lehet a 2,5 millió SNP egyikéhez kapcsolódó összes rekordot a lehető legegyszerűbben, leggyorsabban és olcsóbban visszakeresni.

Hogyan ne tegye ezt

Hogy egy megfelelő klisét idézzek:

Nem vallottam kudarcot ezerszer, csak ezer módszert fedeztem fel annak elkerülésére, hogy egy csomó adatot lekérdezésbarát formátumban elemezzek.

Első próba

Mit tanultam: Egyszerre 25 TB elemzésére nincs olcsó mód.

Miután elvégeztem a „Haladó módszerek a nagy adatfeldolgozáshoz” tanfolyamot a Vanderbilt Egyetemen, biztos voltam benne, hogy a trükk benne van. Valószínűleg egy-két órát vesz igénybe, amíg beállítja a Hive-kiszolgálót, hogy végigfusson az összes adaton és jelentse az eredményt. Mivel adataink az AWS S3-ban vannak tárolva, igénybe vettem a szolgáltatást Athéné, amely lehetővé teszi Hive SQL lekérdezések alkalmazását az S3 adatokra. Nem kell Hive-fürtöt felállítania/emelnie, és csak a keresett adatokért kell fizetnie.

Miután megmutattam az Athénának az adataimat és azok formátumát, lefuttattam néhány tesztet a következő lekérdezésekkel:

select * from intensityData limit 10;

És gyorsan kapott jól strukturált eredményeket. Kész.

Amíg nem próbáltuk az adatokat a munkánk során felhasználni...

Megkértek, hogy húzzam ki az összes SNP-információt a modell teszteléséhez. Lefuttattam a lekérdezést:


select * from intensityData 
where snp = 'rs123456';

...és elkezdett várni. Nyolc perc és több mint 4 TB kért adat után megkaptam az eredményt. Az Athena a talált adatmennyiség alapján számít fel, terabájtonként 5 dollárt. Tehát ez az egyetlen kérés 20 dollárba és nyolc perc várakozásba került. Ahhoz, hogy a modellt minden adaton futtathassuk, 38 évet kellett várnunk és 50 millió dollárt kellett fizetnünk, ami nyilvánvalóan nem volt megfelelő számunkra.

Parkettát kellett használni...

Mit tanultam: Legyen óvatos a parketta reszelői méretével és azok felépítésével.

Először úgy próbáltam megjavítani a helyzetet, hogy az összes TSV-t átalakítottam ilyenre Parketta reszelők. Kényelmesek nagy adathalmazokkal való munkavégzéshez, mert a bennük lévő információkat oszlopos formában tárolják: minden oszlop a saját memória/lemez szegmensében található, ellentétben a szöveges fájlokkal, amelyekben a sorok minden oszlop elemeit tartalmazzák. És ha valamit meg kell találnia, csak olvassa el a szükséges oszlopot. Ezenkívül minden fájl értéket tárol egy oszlopban, így ha a keresett érték nincs az oszlop tartományában, a Spark nem vesztegeti az időt a teljes fájl átvizsgálásával.

Elvégeztem egy egyszerű feladatot AWS ragasztó hogy a TSV-ket parkettává alakítsa, és az új fájlokat bedobta az Athénába. Körülbelül 5 óráig tartott. De amikor lefutottam a kérést, nagyjából ugyanannyi időt és egy kicsit kevesebb pénzt igényelt a teljesítése. A helyzet az, hogy a Spark a feladat optimalizálása érdekében egyszerűen kicsomagolt egy TSV-darabot, és behelyezte a saját parkettadarabjába. És mivel mindegyik darab elég nagy volt ahhoz, hogy sok ember teljes rekordját tárolja, minden fájl tartalmazta az összes SNP-t, így a Sparknak meg kellett nyitnia az összes fájlt a szükséges információk kinyeréséhez.

Érdekes módon a Parquet alapértelmezett (és ajánlott) tömörítési típusa, a snappy, nem osztható. Ezért minden végrehajtó leragadt a teljes 3,5 GB-os adatkészlet kicsomagolásában és letöltésében.

25 TB elemzése AWK és R használatával

Értsük meg a problémát

Mit tanultam: A rendezés nehézkes, különösen, ha az adatok el vannak osztva.

Úgy tűnt, most már megértettem a probléma lényegét. Csak SNP oszlop szerint kellett rendeznem az adatokat, nem emberek szerint. Ezután több SNP külön adatcsomóban kerül tárolásra, majd a Parquet „okos” funkciója, „csak akkor nyílik meg, ha az érték a tartományban van” teljes pompájában megmutatkozik. Sajnos a klaszterben szétszórt sorok milliárdjainak válogatása nehéz feladatnak bizonyult.

Az AWS határozottan nem akar visszatérítést kiadni az „elzavart tanuló vagyok” ok miatt. Miután elvégeztem a válogatást az Amazon Glue-n, 2 napig futott, és összeomlott.

Mi a helyzet a particionálással?

Mit tanultam: A Spark partícióit ki kell egyensúlyozni.

Aztán eszembe jutott az adatok felosztása a kromoszómákban. 23 van belőlük (és még több, ha figyelembe vesszük a mitokondriális DNS-t és a fel nem térképezett régiókat).
Ez lehetővé teszi az adatok felosztását kisebb darabokra. Ha csak egy sort ad hozzá a Spark export funkcióhoz a ragasztószkriptben partition_by = "chr", akkor az adatokat kockákra kell osztani.

25 TB elemzése AWK és R használatával
A genom számos kromoszómának nevezett fragmensből áll.

Sajnos nem sikerült. A kromoszómák különböző méretűek, ami eltérő mennyiségű információt jelent. Ez azt jelenti, hogy a Spark által a dolgozóknak küldött feladatok nem voltak kiegyensúlyozva, és lassan fejeződtek be, mert egyes csomópontok korán befejeződtek és tétlenek voltak. A feladatok azonban elkészültek. De amikor egy SNP-t kértek, az egyensúlyhiány ismét problémákat okozott. Az SNP-k nagyobb kromoszómákon történő feldolgozásának költsége (vagyis ahol adatokat akarunk szerezni) csak körülbelül 10-szeresére csökkent. Sok, de nem elég.

Mi van, ha még kisebb részekre osztjuk?

Mit tanultam: Soha ne próbáljon meg 2,5 millió partíciót csinálni.

Úgy döntöttem, hogy mindent megteszek, és felosztom az egyes SNP-ket. Ez biztosította, hogy a válaszfalak azonos méretűek legyenek. ROSSZ ÖTLET VOLT. Ragasztót használtam, és tettem hozzá egy ártatlan vonalat partition_by = 'snp'. A feladat elindult és elkezdődött a végrehajtás. Egy nappal később megnéztem, és láttam, hogy az S3-ba még mindig nincs írva semmi, így megöltem a feladatot. Úgy tűnik, a Glue közbenső fájlokat írt az S3 rejtett helyére, sok fájlt, talán néhány milliót. Ennek eredményeként a tévedésem több mint ezer dollárba került, és nem tetszett a mentoromnak.

Particionálás + válogatás

Mit tanultam: A rendezés továbbra is nehézkes, ahogy a Spark hangolása is.

Az utolsó particionálási kísérletem a kromoszómák particionálásából, majd az egyes partíciók rendezéséből állt. Elméletileg ez felgyorsítaná az egyes lekérdezéseket, mivel a kívánt SNP-adatoknak néhány Parquet-darabon belül kell lenniük egy adott tartományon belül. Sajnos a particionált adatok rendezése is nehéz feladatnak bizonyult. Ennek eredményeként váltottam az EMR-re egy egyéni fürthöz, és nyolc nagy teljesítményű példányt (C5.4xl) és Sparklyrt használtam a rugalmasabb munkafolyamat létrehozásához...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...a feladat azonban továbbra sem készült el. Különféleképpen konfiguráltam: növeltem a memóriafoglalást az egyes lekérdezések végrehajtóihoz, használtam a nagy memóriával rendelkező csomópontokat, használtam a broadcast változókat (broadcasting változókat), de ezek minden alkalommal félmértéknek bizonyultak, és fokozatosan elkezdték a végrehajtók. kudarcot vallani, amíg minden meg nem áll.

Egyre kreatívabb vagyok

Mit tanultam: Néha speciális adatok speciális megoldásokat igényelnek.

Minden SNP-nek van egy pozícióértéke. Ez a szám megfelel a bázisok számának a kromoszómájában. Ez egy szép és természetes módja adataink rendszerezésének. Először az egyes kromoszómák régiói szerint akartam felosztani. Például 1 - 2000, 2001 - 4000, stb. A probléma azonban az, hogy az SNP-k nem egyenletesen oszlanak el a kromoszómák között, ezért a csoportok mérete nagyon eltérő lesz.

25 TB elemzése AWK és R használatával

Ennek eredményeként eljutottam a pozíciók kategóriákra (rangsorra) való bontásához. A már letöltött adatok felhasználásával lefuttattam egy kérést, hogy megszerezzem az egyedi SNP-k listáját, azok helyzetét és kromoszómáit. Ezután az adatokat az egyes kromoszómákon belül rendeztem, és az SNP-ket adott méretű csoportokba (bin) gyűjtöttem. Tegyük fel, hogy egyenként 1000 SNP. Ez adta nekem az SNP-csoport-kromoszómánkénti kapcsolatot.

Végül 75 SNP-ből csoportokat (bin) készítettem, ennek okát alább ismertetjük.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Először próbálkozz a Sparkkal

Mit tanultam: A Spark aggregálás gyors, de a particionálás még mindig drága.

Ezt a kis (2,5 millió soros) adatkeretet be akartam olvasni a Sparkba, kombinálni a nyers adatokkal, majd particionálni az újonnan hozzáadott oszloppal bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

használtam sdf_broadcast(), így a Spark tudja, hogy el kell küldenie az adatkeretet az összes csomópontnak. Ez akkor hasznos, ha az adatok kis méretűek, és minden feladathoz szükségesek. Ellenkező esetben a Spark megpróbál okos lenni, és szükség szerint osztja el az adatokat, ami lassulást okozhat.

És megint nem vált be az ötletem: a feladatok egy ideig működtek, teljes lett az unió, majd a particionálással elindított végrehajtókhoz hasonlóan elkezdtek kudarcot vallani.

AWK hozzáadása

Mit tanultam: Ne aludj, amikor az alapokat tanítják. Biztosan valaki már az 1980-as években megoldotta a problémádat.

Eddig a pontig a Sparkkal kapcsolatos összes kudarc oka a fürtben lévő adatok zűrzavara volt. Talán előkezeléssel javítható a helyzet. Úgy döntöttem, hogy megpróbálom felosztani a nyers szöveges adatokat kromoszómák oszlopaira, abban a reményben, hogy „előre particionált” adatokkal láthassam el a Sparkot.

Megkerestem a StackOverflow-n, hogyan lehet oszlopértékek szerint felosztani, és megtaláltam olyan nagyszerű válasz. Az AWK segítségével feloszthat egy szöveges fájlt oszlopértékek szerint úgy, hogy szkriptbe írja ahelyett, hogy az eredményeket elküldi stdout.

Írtam egy Bash-szkriptet, hogy kipróbáljam. Letöltötte az egyik csomagolt TSV-t, majd a segítségével kicsomagolta gzip és elküldték awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Működött!

A magok kitöltése

Mit tanultam: gnu parallel - varázslatos dolog, mindenki használja.

Az elválás elég lassú volt és amikor elkezdtem htopegy erős (és drága) EC2 példány használatának ellenőrzéséhez kiderült, hogy csak egy magot és körülbelül 200 MB memóriát használok. Ahhoz, hogy megoldjuk a problémát, és ne veszítsünk sok pénzt, ki kellett találnunk, hogyan lehet párhuzamosítani a munkát. Szerencsére egy teljesen elképesztő könyvben Adattudomány a parancssorban Találtam egy fejezetet Jeron Janssenstől a párhuzamosításról. Abból tanultam kb gnu parallel, egy nagyon rugalmas módszer a többszálú Unix rendszerben való megvalósítására.

25 TB elemzése AWK és R használatával
Amikor elkezdtem a particionálást az új eljárással, minden rendben volt, de még mindig volt egy szűk keresztmetszet - az S3 objektumok letöltése a lemezre nem volt túl gyors és nem teljesen párhuzamos. Ennek kijavításához a következőt tettem:

  1. Megállapítottam, hogy az S3 letöltési szakaszt közvetlenül a folyamatban lehet megvalósítani, teljesen kiiktatva a lemezen való köztes tárolást. Ez azt jelenti, hogy elkerülhetem a nyers adatok lemezre írását, és még kisebb, és ezért olcsóbb tárhelyet használhatok az AWS-en.
  2. Csapat aws configure set default.s3.max_concurrent_requests 50 jelentősen megnövelte az AWS CLI által használt szálak számát (alapértelmezés szerint 10).
  3. Váltottam egy hálózati sebességre optimalizált EC2 példányra, melynek nevében n betű szerepel. Azt tapasztaltam, hogy a feldolgozási teljesítmény veszteségét n-példányok használatakor bőven kompenzálja a betöltési sebesség növekedése. A legtöbb feladathoz c5n.4xl-t használtam.
  4. Megváltozott gzip on pigz, ez egy gzip eszköz, amely remek dolgokat tud tenni a fájlok kitömörítésének kezdetben nem párhuzamos feladatának párhuzamosítására (ez segített a legkevésbé).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ezeket a lépéseket egymással kombinálják, hogy minden nagyon gyorsan működjön. A letöltési sebesség növelésével és a lemezírások kiküszöbölésével most néhány óra alatt fel tudtam dolgozni egy 5 terabájtos csomagot.

Ebben a tweetben meg kellett volna említeni a „TSV”-t. Jaj.

Újonnan elemzett adatok használata

Mit tanultam: A Spark szereti a tömörítetlen adatokat, és nem szereti a partíciók kombinálását.

Most az S3-ban voltak az adatok kicsomagolt (értsd: megosztott) és félig szervezett formátumban, és újra visszatérhettem a Sparkhoz. Meglepetés várt rám: megint nem sikerült elérni, amit szerettem volna! Nagyon nehéz volt megmondani a Sparknak, hogy pontosan hogyan particionálták az adatokat. És még amikor ezt csináltam, kiderült, hogy túl sok partíció van (95 ezer), és amikor használtam coalesce ésszerű határokra csökkentette a számukat, ez tönkretette a particionálásomat. Biztos vagyok benne, hogy ez javítható, de néhány napos keresés után nem találtam megoldást. Végül az összes feladatot elvégeztem a Sparkban, bár eltartott egy ideig, és a felosztott Parquet fájljaim nem voltak túl kicsik (~200 KB). Az adatok azonban ott voltak, ahol kellett.

25 TB elemzése AWK és R használatával
Túl kicsi és egyenetlen, csodálatos!

Helyi Spark-lekérdezések tesztelése

Mit tanultam: A Sparknak túl sok rezsije van az egyszerű problémák megoldása során.

Az adatok okos formátumban történő letöltésével tesztelhettem a sebességet. Állítson be egy R-szkriptet egy helyi Spark-kiszolgáló futtatásához, majd töltsön be egy Spark-adatkeretet a megadott Parquet-csoporttárolóból (tárhelyről). Megpróbáltam betölteni az összes adatot, de nem tudtam elérni, hogy a Sparklyr felismerje a particionálást.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

A végrehajtás 29,415 másodpercet vett igénybe. Sokkal jobb, de nem túl jó bármi tömeges teszteléséhez. Ezenkívül a gyorsítótárazással sem tudtam felgyorsítani a dolgokat, mert amikor megpróbáltam egy adatkeretet gyorsítótárba helyezni a memóriában, a Spark mindig összeomlott, még akkor is, ha több mint 50 GB memóriát foglaltam le egy 15-nél kisebb súlyú adatkészlethez.

Vissza az AWK-hoz

Mit tanultam: Az asszociatív tömbök AWK-ban nagyon hatékonyak.

Rájöttem, hogy nagyobb sebességet is elérhetek. Erre egy csodálatosan emlékeztem AWK oktatóanyag Bruce Barnetttől Olvastam egy nagyszerű szolgáltatásról, a „asszociatív tömbök" Lényegében ezek kulcs-érték párok, amelyeket valamiért másként hívtak az AWK-ban, és ezért valahogy nem sokat gondolkodtam rajtuk. Roman Cheplyaka emlékeztetett arra, hogy az „asszociatív tömbök” kifejezés sokkal régebbi, mint a „kulcs-érték pár”. Még ha te keresse meg a kulcsértéket a Google Ngramban, ezt a kifejezést ott nem fogja látni, de asszociatív tömböket talál! Ráadásul a „kulcs-érték pár” leggyakrabban adatbázisokhoz kapcsolódik, így sokkal értelmesebb összehasonlítani egy hashmappal. Rájöttem, hogy ezeket az asszociatív tömböket használhatom SNP-jeim bin táblához és nyers adatokhoz való társítására a Spark használata nélkül.

Ehhez az AWK szkriptben a blokkot használtam BEGIN. Ez egy olyan kódrészlet, amely azelőtt kerül végrehajtásra, hogy az első adatsor átkerülne a szkript főtörzsébe.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Csapat while(getline...) betöltötte az összes sort a CSV-csoportból (bin), állítsa be az első oszlopot (SNP név) az asszociatív tömb kulcsaként bin és a második érték (csoport) értékként. Aztán a blokkban { }, amely a főfájl minden sorában lefut, minden sor a kimeneti fájlba kerül, amely a csoportjától (bin) függően egyedi nevet kap: ..._bin_"bin[$1]"_....

Változók batch_num и chunk_id megfelelt a folyamat által szolgáltatott adatoknak, elkerülve a versenyfeltételt, és minden egyes végrehajtási szál fut parallel, a saját egyedi fájljába írt.

Mivel az összes nyers adatot az előző AWK-kísérletből megmaradt kromoszómákon lévő mappákba szórtam szét, most egy másik Bash-szkriptet írhattam, hogy egy-egy kromoszómát dolgozzanak fel, és mélyebben felosztott adatokat küldjenek az S3-nak.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

A forgatókönyv két részből áll parallel.

Az első részben a kívánt kromoszómára vonatkozó információkat tartalmazó összes fájlból kiolvassák az adatokat, majd ezeket az adatokat elosztják a szálak között, amelyek a fájlokat a megfelelő csoportokba (bin) osztják szét. A versenyhelyzetek elkerülése érdekében, amikor több szál ír ugyanabba a fájlba, az AWK átadja a fájlneveket, hogy különböző helyekre írja az adatokat, pl. chr_10_bin_52_batch_2_aa.csv. Ennek eredményeként sok kis fájl jön létre a lemezen (ehhez terabájtos EBS köteteket használtam).

Szállítószalag a második szakaszból parallel átmegy a csoportokon (bin), és egyesíti az egyes fájljaikat közös CSV-vé c catmajd exportra küldi őket.

Közvetítés R?

Mit tanultam: Felveheti a kapcsolatot stdin и stdout egy R szkriptből, és ezért használja azt a folyamatban.

Talán észrevette ezt a sort a Bash-szkriptjében: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Az összes összefűzött csoportfájlt (bin) az alábbi R-szkriptre fordítja. {} egy speciális technika parallel, amely a megadott adatfolyamba küldött adatokat közvetlenül magába a parancsba szúrja be. választási lehetőség {#} egyedi szálazonosítót biztosít, és {%} a munkaterület számát jelenti (ismétlődő, de soha nem egyidejűleg). Az összes opció listája itt található dokumentáció.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Amikor egy változó file("stdin") továbbítják readr::read_csv, az R szkriptbe lefordított adatok egy keretbe töltődnek be, amely ezután a formában .rds- fájl használatával aws.s3 közvetlenül az S3-ba írva.

Az RDS olyan, mint a Parquet junior verziója, a hangszórótárolás sallangja nélkül.

A Bash szkript befejezése után kaptam egy csomagot .rds-S3-ban található fájlok, amelyek lehetővé tették a hatékony tömörítés és a beépített típusok használatát.

Az R fék használata ellenére minden nagyon gyorsan működött. Nem meglepő, hogy az R azon részei, amelyek adatokat olvasnak és írnak, nagyon optimalizáltak. Egy közepes méretű kromoszómán végzett tesztelés után a munka egy C5n.4xl példányon körülbelül két óra alatt befejeződött.

S3 korlátozások

Mit tanultam: Az intelligens elérési út megvalósításának köszönhetően az S3 sok fájlt tud kezelni.

Aggódtam, hogy az S3 képes lesz-e kezelni a sok rá átvitt fájlt. Értelmesíthetném a fájlneveket, de hogyan keresné őket az S3?

25 TB elemzése AWK és R használatával
Az S3-ban lévő mappák csak bemutatásra szolgálnak, valójában a rendszert nem érdekli a szimbólum /. Az S3 GYIK oldaláról.

Úgy tűnik, hogy az S3 egy adott fájl elérési útját jelöli egyszerű kulcsként egyfajta hash táblában vagy dokumentum alapú adatbázisban. A vödör felfogható táblázatnak, a fájlok pedig rekordnak tekinthetők ebben a táblázatban.

Mivel a gyorsaság és a hatékonyság fontos az Amazon profitjához, nem meglepő, hogy ez a kulcs-fájl-útvonal-rendszer rohadt optimalizált. Igyekeztem megtalálni az egyensúlyt: úgy, hogy ne kelljen sok get-kérést intéznem, hanem a kéréseket gyorsan teljesíteni kell. Kiderült, hogy a legjobb körülbelül 20 ezer bin fájlt készíteni. Szerintem, ha folytatjuk az optimalizálást, akkor sebességnövekedést érhetünk el (például egy speciális vödröt készítünk csak az adatok számára, ezzel csökkentve a keresőtábla méretét). De nem volt idő és pénz további kísérletekre.

Mi a helyzet a keresztkompatibilitással?

Amit megtanultam: Az időpazarlás első számú oka a tárolási módszer idő előtti optimalizálása.

Ezen a ponton nagyon fontos feltenni magának a kérdést: „Miért használjon védett fájlformátumot?” Az ok a betöltési sebességben (a gzip-be csomagolt CSV-fájlok hétszer hosszabb ideig tartott betölteni) és a munkafolyamatainkkal való kompatibilitásban rejlik. Lehetséges, hogy átgondolom, hogy R könnyen betöltheti-e a Parquet (vagy Arrow) fájlokat a Spark terhelés nélkül. Laboratóriumunkban mindenki R-t használ, és ha az adatokat másik formátumba kell konvertálnom, továbbra is megvannak az eredeti szöveges adatok, így újra futtathatom a folyamatot.

Munkamegosztás

Mit tanultam: Ne próbálja meg kézzel optimalizálni a feladatokat, hagyja, hogy a számítógép végezze el.

Hibakeresést végeztem az egyik kromoszómán a munkafolyamatban, most az összes többi adatot fel kell dolgoznom.
Több EC2-példányt is fel akartam emelni az átalakításhoz, de ugyanakkor féltem, hogy nagyon kiegyensúlyozatlan terhelést kapok a különböző feldolgozási feladatok között (ahogy a Spark is szenvedett a kiegyensúlyozatlan partícióktól). Ezenkívül nem érdekelt, hogy kromoszómánként egy példányt emeljek ki, mivel az AWS-fiókok esetében az alapértelmezett 10 példány korlátja van.

Aztán úgy döntöttem, hogy írok egy szkriptet R-ben, hogy optimalizáljam a feldolgozási feladatokat.

Először megkértem az S3-at, hogy számítsa ki, mennyi tárhelyet foglalnak el az egyes kromoszómák.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Aztán írtam egy függvényt, ami felveszi a teljes méretet, megkeveri a kromoszómák sorrendjét, csoportokra osztja őket num_jobs és megmutatja, hogy az összes feldolgozási feladat mérete mennyire eltérő.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Aztán végigfutottam ezer keverést a purrr segítségével, és a legjobbat választottam.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Így végül egy nagyon hasonló méretű feladatsorhoz jutottam. Aztán már csak az volt hátra, hogy az előző Bash-szkriptemet egy nagy hurokba csomagoljam for. Ennek az optimalizálásnak a megírása körülbelül 10 percet vett igénybe. És ez sokkal kevesebb, mint amennyit a feladatok manuális létrehozására költenék, ha kiegyensúlyozatlanok lennének. Ezért úgy gondolom, hogy igazam volt ezzel az előzetes optimalizálással.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

A végére hozzáadom a shutdown parancsot:

sudo shutdown -h now

... és minden sikerült! Az AWS CLI használatával példányokat emeltem ki az opció használatával user_data Bash-szkripteket adott nekik feldolgozásra a feladataikról. Automatikusan lefutottak és leálltak, így nem fizettem extra feldolgozási teljesítményért.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Csomagoljunk!

Mit tanultam: Az API-nak egyszerűnek kell lennie a könnyű használat és a rugalmasság érdekében.

Végül a megfelelő helyre és formában megkaptam az adatokat. Nem maradt más hátra, mint a lehető legegyszerűsíteni az adatok felhasználásának folyamatát, hogy megkönnyítsem a kollégáim dolgát. Egy egyszerű API-t akartam készíteni kérések létrehozásához. Ha a jövőben úgy döntök, hogy váltok .rds parkettareszelőhöz, akkor ez nekem legyen probléma, nem a kollégáimnak. Ehhez úgy döntöttem, hogy készítek egy belső R csomagot.

Készítsen és dokumentáljon egy nagyon egyszerű csomagot, amely mindössze néhány adatelérési funkciót tartalmaz egy függvény köré szervezve get_snp. Weboldalt is készítettem a kollégáimnak pkgdown, így könnyen láthatnak példákat és dokumentációt.

25 TB elemzése AWK és R használatával

Intelligens gyorsítótár

Mit tanultam: Ha az adatai jól előkészítettek, a gyorsítótárazás egyszerű lesz!

Mivel az egyik fő munkafolyamat ugyanazt az elemzési modellt alkalmazta az SNP-csomagra, úgy döntöttem, hogy előnyömre használom a binninget. Az SNP-n keresztüli adatátvitel során a csoportból (bin) származó összes információ a visszaadott objektumhoz kapcsolódik. Vagyis a régi lekérdezések (elméletileg) felgyorsíthatják az új lekérdezések feldolgozását.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

A csomag összeállítása során számos benchmarkot lefuttattam, hogy összehasonlítsam a sebességet különböző módszerek használatakor. Azt javaslom, hogy ezt ne hanyagolja el, mert néha az eredmények váratlanok. Például, dplyr::filter sokkal gyorsabb volt, mint a sorok rögzítése indexelés alapú szűréssel, és egyetlen oszlop lekérése egy szűrt adatkeretből sokkal gyorsabb volt, mint az indexelés szintaxisa.

Felhívjuk figyelmét, hogy az objektum prev_snp_results tartalmazza a kulcsot snps_in_bin. Ez egy csoportban (bin) lévő összes egyedi SNP tömbje, amely lehetővé teszi, hogy gyorsan ellenőrizze, vannak-e már adatok egy korábbi lekérdezésből. Ezenkívül megkönnyíti a csoport (bin) összes SNP-jét ezzel a kóddal:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Álláspontja

Most már olyan modelleket és forgatókönyveket futtathatunk (és kezdtünk is komolyan), amelyek korábban nem voltak elérhetőek számunkra. A legjobb az, hogy a laboros kollégáimnak nem kell semmilyen komplikációra gondolniuk. Csak van egy funkciójuk, ami működik.

És bár a csomag megkíméli őket a részletektől, igyekeztem elég egyszerűvé tenni az adatformátumot, hogy rájöjjenek, ha holnap hirtelen eltűnnék...

A sebesség érezhetően megnőtt. Általában funkcionálisan jelentős genomfragmenseket szkennelünk. Korábban ezt nem tudtuk megtenni (túl drágának bizonyult), most viszont a csoportos (bin) struktúrának és a gyorsítótárazásnak köszönhetően egy SNP kérése átlagosan kevesebb, mint 0,1 másodpercet vesz igénybe, az adathasználat pedig ennyi alacsony, mint az S3 költségei földimogyoró.

Következtetés

Ez a cikk egyáltalán nem útmutató. A megoldás egyéninek bizonyult, és szinte biztosan nem optimális. Inkább egy útleírás. Szeretném, ha mások is megértenék, hogy az ilyen döntések nem a fejben jelennek meg teljesen, hanem próba és hiba eredménye. Továbbá, ha adattudót keres, ne feledje, hogy ezeknek az eszközöknek a hatékony használata tapasztalatot igényel, és a tapasztalat pénzbe kerül. Örülök, hogy volt módom fizetni, de sokan másoknak, akik nálam jobban el tudják végezni ugyanazt a munkát, pénzhiány miatt soha nem lesz lehetőségük még próbálkozni sem.

A Big Data eszközök sokoldalúak. Ha időd engedi, intelligens adattisztítási, tárolási és kinyerési technikákkal szinte biztosan gyorsabb megoldást írhatsz. Ez végül egy költség-haszon elemzésen múlik.

Amit tanultam:

  • nincs olcsó mód 25 TB egyidejű elemzésére;
  • Legyen óvatos a parketta reszelői méretével és azok felépítésével;
  • A Spark válaszfalainak kiegyensúlyozottnak kell lenniük;
  • Általában soha ne próbáljon 2,5 millió partíciót készíteni;
  • A rendezés továbbra is nehéz, ahogy a Spark beállítása is;
  • néha speciális adatok speciális megoldásokat igényelnek;
  • A szikragyűjtés gyors, de a particionálás még mindig drága;
  • ne aludj, amikor megtanítják az alapokat, valószínűleg valaki már az 1980-as években megoldotta a problémádat;
  • gnu parallel - ez egy varázslatos dolog, mindenki használja;
  • A Spark szereti a tömörítetlen adatokat, és nem szereti a partíciók kombinálását;
  • A Sparknak túl sok rezsije van az egyszerű problémák megoldása során;
  • Az AWK asszociatív tömbjei nagyon hatékonyak;
  • felveheti a kapcsolatot stdin и stdout egy R szkriptből, és ezért használja azt a folyamatban;
  • Az intelligens elérési út megvalósításának köszönhetően az S3 számos fájlt képes feldolgozni;
  • Az időpazarlás fő oka a tárolási mód idő előtti optimalizálása;
  • ne próbálja meg kézzel optimalizálni a feladatokat, hagyja, hogy a számítógép végezze el;
  • Az API-nak egyszerűnek kell lennie a könnyű használat és a rugalmasság érdekében;
  • Ha adatai jól vannak előkészítve, a gyorsítótárazás egyszerű lesz!

Forrás: will.com

Hozzászólás