Analizante 25TB uzante AWK kaj R

Analizante 25TB uzante AWK kaj R
Kiel legi ĉi tiun artikolon: Mi pardonpetas pro la teksto tiel longa kaj kaosa. Por ŝpari al vi tempon, mi komencas ĉiun ĉapitron per enkonduko "Kion Mi Lernis", kiu resumas la esencon de la ĉapitro per unu aŭ du frazoj.

"Nur montru al mi la solvon!" Se vi nur volas vidi de kie mi venis, do transsaltu al la ĉapitro "Iĝi Pli inventema", sed mi pensas, ke estas pli interese kaj utile legi pri fiasko.

Lastatempe mi estis taskigita starigi procezon por prilaborado de granda volumeno de krudaj DNA-sekvencoj (teknike SNP-peceto). La bezono estis rapide akiri datenojn pri antaŭfiksita genetika loko (nomita SNP) por posta modeligado kaj aliaj taskoj. Uzante R kaj AWK, mi povis purigi kaj organizi datumojn en natura maniero, multe plirapidigante demandpretigon. Ĉi tio ne estis facila por mi kaj postulis multajn ripetojn. Ĉi tiu artikolo helpos vin eviti iujn miajn erarojn kaj montri al vi, kion mi finis.

Unue, kelkaj enkondukaj klarigoj.

datumoj

Nia universitata genetika pretigocentro provizis al ni datumojn en la formo de 25 TB TSV. Mi ricevis ilin dividitaj en 5 Gzip-kunpremitajn pakaĵojn, ĉiu enhavante ĉirkaŭ 240 kvar-gigabajtajn dosierojn. Ĉiu vico enhavis datenojn por unu SNP de unu individuo. Entute, datumoj pri ~2,5 milionoj da SNP-oj kaj ~60 mil homoj estis transdonitaj. Aldone al SNP-informoj, la dosieroj enhavis multajn kolumnojn kun nombroj reflektantaj diversajn trajtojn, kiel legintenseco, ofteco de malsamaj aleloj, ktp. Entute estis ĉirkaŭ 30 kolumnoj kun unikaj valoroj.

Golo

Kiel kun ajna datuma administradprojekto, la plej grava afero estis determini kiel la datumoj estus uzataj. Tiuokaze ni plejparte elektos modelojn kaj laborfluojn por SNP bazitaj sur SNP. Tio estas, ni nur bezonos datumojn pri unu SNP samtempe. Mi devis lerni kiel retrovi ĉiujn rekordojn asociitajn kun unu el la 2,5 milionoj da SNP kiel eble plej facile, rapide kaj malmultekoste.

Kiel ne fari ĉi tion

Por citi taŭgan kliŝon:

Mi ne milfoje malsukcesis, mi ĵus malkovris mil manierojn eviti analizi amason da datumoj en demando-amika formato.

Unue provu

Kion mi lernis: Ne estas malmultekosta maniero analizi 25 TB samtempe.

Preninte la kurson "Altnivelaj Metodoj por Granda Datuma Pretigo" ĉe Universitato Vanderbilt, mi estis certa, ke la lertaĵo estas en la sako. Verŝajne daŭros unu aŭ du horojn por agordi la Hive-servilon por trakuri ĉiujn datumojn kaj raporti la rezulton. Ĉar niaj datumoj estas konservitaj en AWS S3, mi uzis la servon Ateneo, kiu permesas vin apliki Hive SQL-demandojn al S3-datumoj. Vi ne bezonas starigi/altigi Hive-grupon, kaj vi ankaŭ pagas nur por la datumoj, kiujn vi serĉas.

Post kiam mi montris al Athena miajn datumojn kaj ĝian formaton, mi faris kelkajn provojn kun demandoj kiel ĉi tio:

select * from intensityData limit 10;

Kaj rapide ricevis bone strukturitajn rezultojn. Preta.

Ĝis ni provis uzi la datumojn en nia laboro...

Oni petis min eltiri ĉiujn informojn pri SNP por testi la modelon. Mi faris la demandon:


select * from intensityData 
where snp = 'rs123456';

...kaj komencis atendi. Post ok minutoj kaj pli ol 4 TB da petitaj datumoj, mi ricevis la rezulton. Athena ŝargas laŭ la volumo de datumoj trovitaj, $ 5 por terabajto. Do ĉi tiu ununura peto kostis $20 kaj ok minutojn da atendo. Por funkciigi la modelon sur ĉiuj datumoj, ni devis atendi 38 jarojn kaj pagi $ 50 milionojn. Evidente, ĉi tio ne taŭgis por ni.

Necesis uzi Parquet...

Kion mi lernis: Atentu pri la grandeco de viaj Parquet-dosieroj kaj ilia organizo.

Mi unue provis ripari la situacion konvertante ĉiujn TSV-ojn al Parquet dosieroj. Ili estas oportunaj por labori kun grandaj datumaro ĉar la informoj en ili estas stokitaj en kolona formo: ĉiu kolumno kuŝas en sia propra memoro/diska segmento, kontraste al tekstaj dosieroj, en kiuj vicoj enhavas elementojn de ĉiu kolumno. Kaj se vi bezonas trovi ion, tiam simple legu la bezonatan kolumnon. Aldone, ĉiu dosiero stokas gamon da valoroj en kolumno, do se la valoro, kiun vi serĉas, ne estas en la intervalo de la kolumno, Spark ne perdos tempon skanante la tutan dosieron.

Mi kuris simplan taskon AWS-Gluo por konverti niajn TSV-ojn al Parquet kaj faligis la novajn dosierojn en Athena. Ĝi daŭris ĉirkaŭ 5 horojn. Sed kiam mi plenumis la peton, necesis proksimume la sama kvanto da tempo kaj iom malpli da mono por plenumi. La fakto estas, ke Spark, provante optimumigi la taskon, simple malpakis unu TSV-pecon kaj metis ĝin en sian propran Parquet-pecon. Kaj ĉar ĉiu peco estis sufiĉe granda por enhavi la tutajn rekordojn de multaj homoj, ĉiu dosiero enhavis ĉiujn SNP-ojn, do Spark devis malfermi ĉiujn dosierojn por ĉerpi la informojn kiujn ĝi bezonis.

Interese, ke la defaŭlta (kaj rekomendita) kunprema tipo de Parquet, akra, ne estas disigebla. Sekve, ĉiu ekzekutisto estis blokita en la tasko malpakigi kaj elŝuti la plenan 3,5 GB-datumaron.

Analizante 25TB uzante AWK kaj R

Ni komprenu la problemon

Kion mi lernis: Ordigo estas malfacila, precipe se la datumoj estas distribuitaj.

Ŝajnis al mi, ke nun mi komprenis la esencon de la problemo. Mi nur bezonis ordigi la datumojn laŭ SNP-kolumno, ne laŭ homoj. Tiam pluraj SNP-oj estos stokitaj en aparta datumpeco, kaj tiam la "inteligenta" funkcio de Parquet "malfermita nur se la valoro estas en la intervalo" montros sin en sia tuta gloro. Bedaŭrinde, ordigi tra miliardoj da vicoj disigitaj tra areto pruvis esti malfacila tasko.

AWS certe ne volas elsendi repagon pro la kialo "Mi estas distrita studento". Post kiam mi ordigis sur Amazon Glue, ĝi funkciis dum 2 tagoj kaj kraŝis.

Kio pri dispartigo?

Kion mi lernis: sekcioj en Spark devas esti ekvilibraj.

Tiam mi elpensis la ideon dividi datumojn en kromosomoj. Estas 23 el ili (kaj pluraj pli se vi konsideras mitokondrian DNA kaj nemapitaj regionoj).
Ĉi tio permesos al vi dividi la datumojn en pli malgrandajn partojn. Se vi aldonas nur unu linion al la Spark-eksportfunkcio en la Glue-skripto partition_by = "chr", tiam la datumoj devas esti dividitaj en sitelojn.

Analizante 25TB uzante AWK kaj R
La genaro konsistas el multaj fragmentoj nomataj kromosomoj.

Bedaŭrinde, ĝi ne funkciis. Kromosomoj havas malsamajn grandecojn, kio signifas malsamajn kvantojn da informoj. Ĉi tio signifas, ke la taskoj, kiujn Spark sendis al laboristoj, ne estis ekvilibraj kaj kompletigitaj malrapide, ĉar kelkaj nodoj finiĝis frue kaj estis neaktivaj. Tamen, la taskoj estis plenumitaj. Sed petinte unu SNP, la malekvilibro denove kaŭzis problemojn. La kosto de prilaborado de SNP-oj sur pli grandaj kromosomoj (tio estas, kie ni volas ricevi datumojn) nur malpliiĝis je proksimume faktoro de 10. Multe, sed ne sufiĉe.

Kio se ni dividus ĝin en eĉ pli malgrandajn partojn?

Kion mi lernis: Neniam provu fari 2,5 milionojn da sekcioj.

Mi decidis eliri ĉion kaj dividi ĉiun SNP. Tio certigis ke la sekcioj estis de egala grandeco. ESTIS MALBLA IDEO. Mi uzis Glue kaj aldonis senkulpan linion partition_by = 'snp'. La tasko komenciĝis kaj komencis efektivigi. Tagon poste mi kontrolis kaj vidis, ke ankoraŭ nenio estas skribita al S3, do mi mortigis la taskon. Ŝajnas, ke Glue skribis interajn dosierojn al kaŝita loko en S3, multajn dosierojn, eble kelkajn milionojn. Kiel rezulto, mia eraro kostis pli ol mil dolarojn kaj ne plaĉis al mia mentoro.

Dispartigo + ordigo

Kion mi lernis: Ordigo estas ankoraŭ malfacila, same kiel agordado de Spark.

Mia lasta provo pri dispartigo implikis min dividi la kromosomojn kaj poste ordigi ĉiun sekcion. En teorio, tio plirapidigus ĉiun demandon ĉar la dezirataj SNP-datenoj devis esti ene de kelkaj Parquet-pecoj ene de antaŭfiksita intervalo. Bedaŭrinde, ordigi eĉ dividitajn datumojn montriĝis malfacila tasko. Kiel rezulto, mi ŝanĝis al EMR por kutima areto kaj uzis ok potencajn okazojn (C5.4xl) kaj Sparklyr por krei pli flekseblan laborfluon...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tamen la tasko ankoraŭ ne estis plenumita. Mi agordis ĝin en malsamaj manieroj: pliigis la memor-atribuon por ĉiu demanda ekzekutisto, uzis nodojn kun granda kvanto da memoro, uzis elsendajn variablojn (elsendajn variablojn), sed ĉiufoje ĉi tiuj montriĝis duonmezuroj, kaj iom post iom la ekzekutistoj komencis malsukcesi ĝis ĉio ĉesos.

Mi fariĝas pli kreema

Kion mi lernis: Kelkfoje specialaj datumoj postulas specialajn solvojn.

Ĉiu SNP havas poziciovaloron. Ĉi tio estas nombro responda al la nombro da bazoj laŭ sia kromosomo. Ĉi tio estas bela kaj natura maniero organizi niajn datumojn. Komence mi volis dividi laŭ regionoj de ĉiu kromosomo. Ekzemple, pozicioj 1 - 2000, 2001 - 4000, ktp. Sed la problemo estas, ke SNP-oj ne estas egale distribuitaj tra la kromosomoj, do la grupgrandecoj do multe varias.

Analizante 25TB uzante AWK kaj R

Kiel rezulto, mi venis al disigo de pozicioj en kategoriojn (rango). Uzante la jam elŝutitajn datumojn, mi petis akiri liston de unikaj SNP-oj, iliaj pozicioj kaj kromosomoj. Poste mi ordigis la datumojn ene de ĉiu kromosomo kaj kolektis SNP-ojn en grupojn (bin) de difinita grandeco. Ni diru po 1000 SNP-oj. Ĉi tio donis al mi la rilaton SNP-al-grupo-po-kromosoma.

En la fino, mi faris grupojn (bin) de 75 SNP-oj, la kialo estos klarigita sube.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Unue provu kun Spark

Kion mi lernis: Spark-agregado estas rapida, sed dispartigo estas ankoraŭ multekosta.

Mi volis legi ĉi tiun malgrandan (2,5 milionoj da vicoj) datumkadron en Spark, kombini ĝin kun la krudaj datumoj, kaj poste dividi ĝin per la lastatempe aldonita kolumno. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

mi uzis sdf_broadcast(), do Spark scias ke ĝi devus sendi la datumkadron al ĉiuj nodoj. Ĉi tio estas utila se la datumoj estas malgrandaj en grandeco kaj necesas por ĉiuj taskoj. Alie, Spark provas esti inteligenta kaj distribuas datumojn laŭbezone, kio povas kaŭzi malrapidiĝon.

Kaj denove, mia ideo ne funkciis: la taskoj funkciis dum kelka tempo, kompletigis la kuniĝon, kaj tiam, kiel la ekzekutistoj lanĉitaj per dispartigo, ili komencis malsukcesi.

Aldonante AWK

Kion mi lernis: Ne dormu kiam oni instruas al vi la bazaĵojn. Verŝajne iu jam solvis vian problemon jam en la 1980-aj jaroj.

Ĝis ĉi tiu punkto, la kialo de ĉiuj miaj malsukcesoj kun Spark estis la miksaĵo de datumoj en la areto. Eble la situacio povas esti plibonigita per antaŭtraktado. Mi decidis provi dividi la krudajn tekstajn datumojn en kolumnojn de kromosomoj, do mi esperis provizi al Spark per "antaŭdividitaj" datumoj.

Mi serĉis ĉe StackOverflow kiel dividi per kolumnaj valoroj kaj trovis tia bonega respondo. Kun AWK vi povas dividi tekstdosieron per kolumnaj valoroj skribante ĝin en skripto anstataŭ sendi la rezultojn al stdout.

Mi skribis Bash-skripton por provi ĝin. Elŝutis unu el la pakitaj TSV-oj, poste malpakis ĝin uzante gzip kaj sendita al awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ĝi funkciis!

Plenigi la kernojn

Kion mi lernis: gnu parallel - ĝi estas magia afero, ĉiuj uzu ĝin.

La disiĝo estis sufiĉe malrapida kaj kiam mi komencis htoppor kontroli la uzon de potenca (kaj multekosta) EC2-instanco, montriĝis, ke mi uzas nur unu kernon kaj ĉirkaŭ 200 MB da memoro. Por solvi la problemon kaj ne perdi multe da mono, ni devis eltrovi kiel paraleligi la laboron. Feliĉe, en absolute mirinda libro Datenscienco ĉe la Komandlinio Mi trovis ĉapitron de Jeron Janssens pri paraleligo. De ĝi mi eksciis gnu parallel, tre fleksebla metodo por efektivigi multifadenadon en Unikso.

Analizante 25TB uzante AWK kaj R
Kiam mi komencis la dispartigo uzante la novan procezon, ĉio estis en ordo, sed ankoraŭ estis botelkolo - elŝuti S3-objektojn al disko ne estis tre rapida kaj ne plene paraleligita. Por ripari ĉi tion, mi faris ĉi tion:

  1. Mi eksciis, ke eblas efektivigi la elŝutan etapon de S3 rekte en la dukto, tute forigante mezan stokadon sur disko. Ĉi tio signifas, ke mi povas eviti skribi krudajn datumojn al disko kaj uzi eĉ pli malgrandan, kaj do pli malmultekostan, stokadon sur AWS.
  2. teamo aws configure set default.s3.max_concurrent_requests 50 multe pliigis la nombron da fadenoj kiujn AWS CLI uzas (defaŭlte estas 10).
  3. Mi ŝanĝis al EC2-instanco optimumigita por retrapideco, kun la litero n en la nomo. Mi trovis, ke la perdo de pretigpovo dum uzado de n-okazoj estas pli ol kompensita de la pliiĝo de ŝarĝrapideco. Por plej multaj taskoj mi uzis c5n.4xl.
  4. Ŝanĝita gzip sur pigz, ĉi tio estas gzip-ilo, kiu povas fari bonegajn aferojn por paraleligi la komence ne-paralelitan taskon de malkunpremado de dosieroj (ĉi tio plej malmulte helpis).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Ĉi tiuj paŝoj estas kombinitaj unu kun la alia por ke ĉio funkcias tre rapide. Pliigante elŝutajn rapidojn kaj forigante diskajn skribojn, mi nun povus prilabori 5-terabajtan pakaĵon en nur kelkaj horoj.

Ĉi tiu ĉi tweet devus esti mencii 'TSV'. Ve!

Uzante lastatempe analizitajn datumojn

Kion mi lernis: Spark ŝatas nekunpremitajn datumojn kaj ne ŝatas kombini sekciojn.

Nun la datumoj estis en S3 en malpakita (legu: komuna) kaj duon-ordita formato, kaj mi povis reveni al Spark denove. Surprizo atendis min: mi denove ne sukcesis atingi tion, kion mi volis! Estis tre malfacile diri al Spark precize kiel la datumoj estis dividitaj. Kaj eĉ kiam mi faris tion, montriĝis, ke estis tro da sekcioj (95 mil), kaj kiam mi uzis coalesce reduktis ilian nombron al raciaj limoj, tio detruis mian dispartigon. Mi certas, ke tio povas esti riparita, sed post kelkaj tagoj da serĉado mi ne povis trovi solvon. Mi finfine finis ĉiujn taskojn en Spark, kvankam daŭris iom da tempo kaj miaj dividitaj Parquet-dosieroj ne estis tre malgrandaj (~200 KB). Tamen, la datumoj estis kie ĝi estis bezonata.

Analizante 25TB uzante AWK kaj R
Tro malgranda kaj malebena, mirinda!

Testante lokajn Spark-demandojn

Kion mi lernis: Fajrero havas tro da superkompeto dum solvado de simplaj problemoj.

Elŝutante la datumojn en lerta formato, mi povis testi la rapidecon. Agordu R-skripton por ruli lokan Spark-servilon, kaj poste ŝargis Spark-datumkadron el la specifita Parquet grupa stokado (ujo). Mi provis ŝargi ĉiujn datumojn sed ne povis igi Sparklyr rekoni la dispartigo.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

La ekzekuto daŭris 29,415 sekundojn. Multe pli bone, sed ne tro bona por amasa testado de io ajn. Aldone, mi ne povis akceli aferojn kun kaŝmemoro ĉar kiam mi provis konservi datumkadron en memoro, Spark ĉiam kraŝis, eĉ kiam mi asignis pli ol 50 GB da memoro al datumaro kiu pezis malpli ol 15.

Reiru al AWK

Kion mi lernis: Asociaj tabeloj en AWK estas tre efikaj.

Mi rimarkis, ke mi povas atingi pli altajn rapidojn. Mi rememoris tion en mirinda AWK-lernilo de Bruce Barnett Mi legis pri bonega funkcio nomata "asociaj tabeloj" Esence, ĉi tiuj estas ŝlosil-valoraj paroj, kiuj ial estis nomitaj malsame en AWK, kaj tial mi iel ne multe pensis pri ili. Roman Cheplyaka memoris, ke la esprimo "asociecaj tabeloj" estas multe pli malnova ol la esprimo "ŝlosil-valora paro". Eĉ se vi serĉu la ŝlosilvaloron en Google Ngram, vi ne vidos ĉi tiun terminon tie, sed vi trovos asociajn tabelojn! Krome, la "ŝlosil-valora paro" plej ofte rilatas al datumbazoj, do multe pli sencas kompari ĝin kun hashmapo. Mi rimarkis, ke mi povus uzi ĉi tiujn asociajn tabelojn por asocii miajn SNP-ojn kun bin-tabelo kaj krudaj datumoj sen uzi Spark.

Por fari tion, en la AWK-skripto mi uzis la blokon BEGIN. Ĉi tio estas peco de kodo, kiu estas efektivigita antaŭ ol la unua linio de datumoj estas transdonita al la ĉefa korpo de la skripto.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

teamo while(getline...) ŝarĝis ĉiujn vicojn de la CSV-grupo (bin), starigis la unuan kolumnon (SNP-nomo) kiel la ŝlosilon por la asocia tabelo bin kaj la dua valoro (grupo) kiel la valoro. Tiam en la bloko { }, kiu estas ekzekutita sur ĉiuj linioj de la ĉefa dosiero, ĉiu linio estas sendita al la eligodosiero, kiu ricevas unikan nomon depende de sia grupo (bin): ..._bin_"bin[$1]"_....

Variabloj batch_num и chunk_id egalis la datenojn disponigitajn per la dukto, evitante raskondiĉon, kaj ĉiu ekzekutfadeno kuranta parallel, skribis al sia propra unika dosiero.

Ĉar mi disĵetis ĉiujn krudajn datumojn en dosierujojn sur kromosomoj postlasitaj de mia antaŭa eksperimento kun AWK, nun mi povus skribi alian Bash-skripton por prilabori unu kromosomon samtempe kaj sendi pli profundajn dividitajn datumojn al S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

La skripto havas du sekciojn parallel.

En la unua sekcio, datumoj estas legitaj de ĉiuj dosieroj enhavantaj informojn pri la dezirata kromosomo, tiam ĉi tiuj datumoj estas distribuitaj tra fadenoj, kiuj distribuas la dosierojn en la taŭgajn grupojn (bin). Por eviti raskondiĉojn kiam pluraj fadenoj skribas al la sama dosiero, AWK pasas la dosiernomojn por skribi datumojn al malsamaj lokoj, ekz. chr_10_bin_52_batch_2_aa.csv. Kiel rezulto, multaj malgrandaj dosieroj estas kreitaj sur la disko (por tio mi uzis terabajtajn EBS-volumojn).

Transportilo de la dua sekcio parallel trairas la grupojn (bin) kaj kombinas iliajn individuajn dosierojn en komunan CSV c catkaj poste sendas ilin por eksporti.

Dissendado en R?

Kion mi lernis: Vi povas kontakti stdin и stdout de R-skripto, kaj tial uzu ĝin en la dukto.

Vi eble rimarkis ĉi tiun linion en via Bash-skripto: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Ĝi tradukas ĉiujn kunligitajn grupdosieroj (bin) en la R-skripton sube. {} estas speciala tekniko parallel, kiu enmetas ajnajn datumojn kiujn ĝi sendas al la specifita rivereto rekte en la komandon mem. Opcio {#} provizas unikan fadenan ID, kaj {%} reprezentas la laborfendan nombron (ripeta, sed neniam samtempe). Listo de ĉiuj opcioj troveblas en dokumentado.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Kiam variablo file("stdin") transdonita al readr::read_csv, la datumoj tradukitaj en la R-skripton estas ŝarĝitaj en kadron, kiu tiam estas en la formo .rds-dosiero uzante aws.s3 skribita rekte al S3.

RDS estas io kiel juniora versio de Parquet, sen la ekstraĵoj de laŭtparolilo-stokado.

Fininte la Bash-skripton mi ricevis pakaĵon .rds-dosieroj situantaj en S3, kio permesis al mi uzi efikan kunpremadon kaj enkonstruitajn tipojn.

Malgraŭ la uzo de bremso R, ĉio funkciis tre rapide. Ne surprize, la partoj de R, kiuj legas kaj skribas datumojn, estas tre optimumigitaj. Post testado sur unu mezgranda kromosomo, la laboro finiĝis sur C5n.4xl ekzemplo en ĉirkaŭ du horoj.

S3 Limigoj

Kion mi lernis: Danke al inteligenta vojo efektivigo, S3 povas pritrakti multajn dosierojn.

Mi maltrankviliĝis, ĉu S3 povos trakti la multajn dosierojn, kiuj estis translokigitaj al ĝi. Mi povus sentigi la dosiernomojn, sed kiel S3 serĉus ilin?

Analizante 25TB uzante AWK kaj R
Dosierujoj en S3 estas nur por montri, fakte la sistemo ne interesiĝas pri la simbolo /. De la S3 FAQ paĝo.

Ŝajnas, ke S3 reprezentas la vojon al aparta dosiero kiel simpla ŝlosilo en speco de hashtabelo aŭ dokument-bazita datumbazo. Sitelo povas esti opiniita kiel tabelo, kaj dosieroj povas esti konsiderataj rekordoj en tiu tabelo.

Ĉar rapideco kaj efikeco estas gravaj por gajni profiton ĉe Amazon, ne estas surprize, ke ĉi tiu ŝlosilo-kiel-dosiera vojo sistemo estas ege optimumigita. Mi provis trovi ekvilibron: por ke mi ne bezonu fari multajn ricevi petojn, sed ke la petoj estis plenumitaj rapide. Montriĝis, ke plej bone estas fari ĉirkaŭ 20 mil ujn dosierojn. Mi pensas, se ni daŭre optimumigas, ni povas atingi pliigon de rapideco (ekzemple, farante specialan sitelon nur por datumoj, tiel reduktante la grandecon de la serĉtabelo). Sed ne estis tempo aŭ mono por pliaj eksperimentoj.

Kio pri kruckongruo?

Kion mi Lernis: La unua kaŭzo de malŝparo de tempo estas optimumigi vian konservan metodon antaŭtempe.

Je ĉi tiu punkto, estas tre grave demandi vin: "Kial uzi proprietan dosierformaton?" La kialo kuŝas en ŝarĝrapideco (gzipitaj CSV-dosieroj daŭris 7 fojojn pli longe por ŝarĝi) kaj kongruo kun niaj laborfluoj. Mi povas rekonsideri ĉu R povas facile ŝargi Parquet (aŭ Sago) dosierojn sen la Spark-ŝarĝo. Ĉiuj en nia laboratorio uzas R, kaj se mi bezonas konverti la datumojn al alia formato, mi ankoraŭ havas la originalajn tekstajn datumojn, do mi povas simple ruli la dukton denove.

Divido de laboro

Kion mi lernis: Ne provu optimumigi laborpostenojn permane, lasu la komputilon fari ĝin.

Mi elpurigis la laborfluon sur unu kromosomo, nun mi devas prilabori ĉiujn aliajn datumojn.
Mi volis levi plurajn EC2-instancojn por konvertiĝo, sed samtempe mi timis ricevi tre malekvilibran ŝarĝon tra malsamaj pretigaj laboroj (same kiel Spark suferis de malekvilibraj sekcioj). Krome, mi ne interesis levi unu kazon per kromosomo, ĉar por AWS-kontoj ekzistas defaŭlta limo de 10 okazoj.

Tiam mi decidis skribi skripton en R por optimumigi pretigajn laborojn.

Unue, mi petis al S3 kalkuli kiom da stoka spaco okupis ĉiu kromosomo.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Poste mi skribis funkcion, kiu prenas la totalan grandecon, miksas la ordon de la kromosomoj, dividas ilin en grupojn. num_jobs kaj diras al vi kiom malsamaj estas la grandecoj de ĉiuj pretigaj laboroj.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Poste mi trakuris mil miksaĵojn uzante ronronon kaj elektis la plej bonan.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Do mi finis kun aro da taskoj kiuj estis tre similaj en grandeco. Poste restis nur envolvi mian antaŭan Bash-skripton en granda buklo for. Ĉi tiu optimumigo daŭris ĉirkaŭ 10 minutojn por verki. Kaj ĉi tio estas multe malpli ol mi elspezus por permane kreado de taskoj se ili estus malekvilibraj. Tial mi pensas, ke mi pravis kun ĉi tiu prepara optimumigo.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Fine mi aldonas la malŝaltan komandon:

sudo shutdown -h now

... kaj ĉio funkciis! Uzante la AWS CLI, mi levis okazojn uzante la opcion user_data donis al ili Bash-skriptojn de iliaj taskoj por prilaborado. Ili kuris kaj fermiĝis aŭtomate, do mi ne pagis por ekstra pretigpovo.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Ni paku!

Kion mi lernis: La API devus esti simpla pro facileco kaj fleksebleco de uzo.

Fine mi ricevis la datumojn en la ĝusta loko kaj formo. Restis nur simpligi la procezon de uzado de datumoj kiel eble plej multe por plifaciligi ĝin al miaj kolegoj. Mi volis fari simplan API por krei petojn. Se estonte mi decidas ŝanĝi de .rds al Parquet dosieroj, tiam tio estu problemo por mi, ne por miaj kolegoj. Por tio mi decidis fari internan R-pakaĵon.

Konstruu kaj dokumenti tre simplan pakaĵon enhavantan nur kelkajn datumajn alirfunkciojn organizitajn ĉirkaŭ funkcio get_snp. Mi ankaŭ faris retejon por miaj kolegoj pkgdown, do ili povas facile vidi ekzemplojn kaj dokumentadon.

Analizante 25TB uzante AWK kaj R

Inteligenta kaŝmemoro

Kion mi lernis: Se viaj datumoj estas bone preparitaj, kaŝmemoro estos facila!

Ĉar unu el la ĉefaj laborfluoj aplikis la saman analizan modelon al la SNP-pakaĵo, mi decidis uzi binning al mia avantaĝo. Dum transdonado de datumoj per SNP, ĉiuj informoj de la grupo (ujo) estas alkroĉitaj al la resendita objekto. Tio estas, malnovaj demandoj povas (teorie) akceli la prilaboradon de novaj demandoj.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Konstruante la pakaĵon, mi kuris multajn komparnormojn por kompari rapidecon uzante malsamajn metodojn. Mi rekomendas ne neglekti ĉi tion, ĉar foje la rezultoj estas neatenditaj. Ekzemple, dplyr::filter estis multe pli rapida ol kaptado de vicoj uzanta indeks-bazitan filtradon, kaj preni ununuran kolumnon de filtrita datumkadro estis multe pli rapida ol uzado de indeksa sintakso.

Bonvolu noti ke la objekto prev_snp_results enhavas la ŝlosilon snps_in_bin. Ĉi tio estas aro de ĉiuj unikaj SNP-oj en grupo (ujo), ebligante vin rapide kontroli ĉu vi jam havas datumojn de antaŭa demando. Ĝi ankaŭ faciligas cirkuli tra ĉiuj SNP-oj en grupo (ujo) kun ĉi tiu kodo:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Результаты

Nun ni povas (kaj komencis serioze) prizorgi modelojn kaj scenarojn, kiuj antaŭe estis nealireblaj por ni. La plej bona afero estas, ke miaj laboratoriaj kolegoj ne devas pensi pri iuj komplikaĵoj. Ili nur havas funkcion, kiu funkcias.

Kaj kvankam la pakaĵo ŝparas al ili la detalojn, mi provis simpligi la datumformaton, ke ili povus eltrovi ĝin, se mi subite malaperus morgaŭ...

La rapideco pliiĝis rimarkinde. Ni kutime skanas funkcie signifajn genarfragmentojn. Antaŭe, ni ne povis fari tion (ĝi montriĝis tro multekosta), sed nun, danke al la grupa (bin) strukturo kaj kaŝmemoro, peto por unu SNP daŭras averaĝe malpli ol 0,1 sekundoj, kaj la uzado de datumoj estas tiel. malalta ke la kostoj por S3 estas arakidoj.

konkludo

Ĉi tiu artikolo tute ne estas gvidilo. La solvo montriĝis individua, kaj preskaŭ certe ne optimuma. Prefere, ĝi estas vojaĝraporto. Mi volas, ke aliaj komprenu, ke tiaj decidoj ne aperas plene formitaj en la kapo, ili estas rezulto de provo kaj eraro. Ankaŭ, se vi serĉas datuman scienciston, memoru, ke uzi ĉi tiujn ilojn efike postulas sperton, kaj sperto kostas monon. Mi ĝojas, ke mi havis la rimedojn por pagi, sed multaj aliaj, kiuj povas fari la saman laboron pli bone ol mi, neniam havos eblecon pro manko de mono eĉ provi.

Iloj pri grandaj datumoj estas multflankaj. Se vi havas tempon, vi povas preskaŭ certe skribi pli rapidan solvon uzante inteligentajn datumojn pri purigado, stokado kaj eltiro-teknikoj. Finfine temas pri analizo de kosto-profito.

Kion mi lernis:

  • ne estas malmultekosta maniero analizi 25 TB samtempe;
  • zorgu pri la grandeco de viaj Parquet-dosieroj kaj ilia organizo;
  • Sekcioj en Spark devas esti ekvilibraj;
  • Ĝenerale, neniam provu fari 2,5 milionojn da sekcioj;
  • Ordigo estas ankoraŭ malfacila, same kiel agordi Spark;
  • foje specialaj datumoj postulas specialajn solvojn;
  • Spark-agregado estas rapida, sed dispartigo estas ankoraŭ multekosta;
  • ne dormu, kiam oni instruas al vi la bazaĵojn, iu verŝajne jam solvis vian problemon jam en la 1980-aj jaroj;
  • gnu parallel - ĉi tio estas magia afero, ĉiuj uzu ĝin;
  • Spark ŝatas nekunpremitajn datumojn kaj ne ŝatas kombini sekciojn;
  • Spark havas tro multe da superkompetoj dum solvado de simplaj problemoj;
  • La asociaj tabeloj de AWK estas tre efikaj;
  • vi povas kontakti stdin и stdout de R-skripto, kaj tial uzu ĝin en la dukto;
  • Danke al inteligenta vojo efektivigo, S3 povas prilabori multajn dosierojn;
  • La ĉefa kialo por perdi tempon estas antaŭtempe optimumigi vian konservan metodon;
  • ne provu optimumigi taskojn permane, lasu la komputilon fari ĝin;
  • La API devus esti simpla pro facileco kaj fleksebleco de uzo;
  • Se viaj datumoj estas bone preparitaj, kaŝmemoro estos facila!

fonto: www.habr.com

Aldoni komenton