Analitzant 25 TB amb AWK i R

Analitzant 25 TB amb AWK i R
Com llegir aquest article: Demano disculpes perquè el text sigui tan llarg i caòtic. Per estalviar-vos temps, començo cada capítol amb una introducció "El que he après", que resumeix l'essència del capítol en una o dues frases.

"Només mostra'm la solució!" Si només voleu veure d'on he vingut, aneu al capítol "Ser més inventiu", però crec que és més interessant i útil llegir sobre el fracàs.

Recentment, em van encarregar de configurar un procés per processar un gran volum de seqüències d'ADN en brut (tècnicament un xip SNP). La necessitat era obtenir ràpidament dades sobre una ubicació genètica determinada (anomenada SNP) per al modelatge posterior i altres tasques. Utilitzant R i AWK, vaig poder netejar i organitzar les dades d'una manera natural, accelerant molt el processament de consultes. Això no va ser fàcil per a mi i va requerir nombroses iteracions. Aquest article us ajudarà a evitar alguns dels meus errors i us mostrarà amb què vaig acabar.

En primer lloc, algunes explicacions introductòries.

Dades

El nostre centre de processament d'informació genètica de la universitat ens va proporcionar dades en forma de TSV de 25 TB. Els vaig rebre dividits en 5 paquets, comprimits per Gzip, cadascun dels quals contenia uns 240 fitxers de quatre gigabytes. Cada fila contenia dades per a un SNP d'un individu. En total, es van transmetre dades sobre ~ 2,5 milions de SNP i ~ 60 mil persones. A més de la informació SNP, els fitxers contenien nombroses columnes amb números que reflectien diverses característiques, com ara la intensitat de lectura, la freqüència de diferents al·lels, etc. En total hi havia unes 30 columnes amb valors únics.

Objectiu

Com amb qualsevol projecte de gestió de dades, el més important era determinar com s'utilitzarien les dades. En aquest cas seleccionarem principalment models i fluxos de treball per a SNP basats en SNP. És a dir, només necessitarem dades d'un SNP alhora. Vaig haver d'aprendre a recuperar tots els registres associats a un dels 2,5 milions de SNP de la manera més fàcil, ràpida i econòmica possible.

Com no fer això

Per citar un tòpic adequat:

No vaig fallar mil vegades, només vaig descobrir mil maneres d'evitar analitzar un munt de dades en un format fàcil de consultar.

Primer intent

Què he après: No hi ha cap manera barata d'analitzar 25 TB alhora.

Després d'haver fet el curs "Mètodes avançats per al processament de dades grans" a la Universitat de Vanderbilt, estava segur que el truc estava a la bossa. Probablement trigarà una o dues hores a configurar el servidor Hive per executar totes les dades i informar del resultat. Com que les nostres dades s'emmagatzemen a AWS S3, vaig utilitzar el servei Athena, que us permet aplicar consultes Hive SQL a dades S3. No cal que configureu/augmenteu un clúster Hive, i també pagueu només per les dades que cerqueu.

Després de mostrar a Athena les meves dades i el seu format, vaig fer algunes proves amb consultes com aquesta:

select * from intensityData limit 10;

I ràpidament va rebre resultats ben estructurats. A punt.

Fins que vam intentar utilitzar les dades en el nostre treball...

Em van demanar que extregués tota la informació SNP per provar el model. Vaig executar la consulta:


select * from intensityData 
where snp = 'rs123456';

...i va començar a esperar. Després de vuit minuts i més de 4 TB de dades sol·licitades, vaig rebre el resultat. Athena cobra pel volum de dades trobades, 5 dòlars per terabyte. Així que aquesta sol·licitud única va costar 20 dòlars i vuit minuts d'espera. Per executar el model amb totes les dades, vam haver d'esperar 38 anys i pagar 50 milions de dòlars, evidentment, això no ens va ser adequat.

Calia utilitzar parquet...

Què he après: Aneu amb compte amb la mida dels vostres fitxers Parquet i la seva organització.

Primer vaig intentar solucionar la situació convertint tots els TSV a Fitxers de parquet. Són convenients per treballar amb grans conjunts de dades perquè la informació que contenen s'emmagatzema en forma de columna: cada columna es troba en el seu propi segment de memòria/disc, a diferència dels fitxers de text, en què les files contenen elements de cada columna. I si necessiteu trobar alguna cosa, llegiu la columna necessària. A més, cada fitxer emmagatzema un rang de valors en una columna, de manera que si el valor que busqueu no es troba dins de l'interval de la columna, Spark no perdrà temps escanejant tot el fitxer.

Vaig executar una tasca senzilla AWS Cola per convertir els nostres TSV a Parquet i va deixar caure els nous fitxers a Athena. Va trigar unes 5 hores. Però quan vaig executar la sol·licitud, vaig trigar aproximadament la mateixa quantitat de temps i una mica menys de diners per completar-la. El fet és que Spark, intentant optimitzar la tasca, simplement va desempaquetar un tros de TSV i el va posar al seu propi tros de Parquet. I com que cada tros era prou gran per contenir els registres sencers de moltes persones, cada fitxer contenia tots els SNP, de manera que Spark va haver d'obrir tots els fitxers per extreure la informació que necessitava.

Curiosament, el tipus de compressió predeterminat (i recomanat) de Parquet, ràpid, no es pot dividir. Per tant, cada executor es va quedar atrapat en la tasca de desempaquetar i descarregar el conjunt de dades complet de 3,5 GB.

Analitzant 25 TB amb AWK i R

Entenem el problema

Què he après: L'ordenació és difícil, sobretot si les dades estan distribuïdes.

Em va semblar que ara entenia l'essència del problema. Només necessitava ordenar les dades per columna SNP, no per persones. A continuació, s'emmagatzemaran diversos SNP en un tros de dades separat i, a continuació, la funció "intel·ligent" de Parquet "s'obre només si el valor està dins del rang" es mostrarà en tota la seva glòria. Malauradament, ordenar milers de milions de files repartides per un clúster va resultar ser una tasca difícil.

Definitivament, AWS no vol emetre cap reemborsament a causa del motiu "Sóc un estudiant distret". Després d'executar la classificació a Amazon Glue, va funcionar durant 2 dies i es va estavellar.

Què passa amb la partició?

Què he après: les particions a Spark s'han d'equilibrar.

Aleshores se'm va ocórrer la idea de dividir les dades en els cromosomes. N'hi ha 23 (i diversos més si es té en compte l'ADN mitocondrial i les regions no cartografiades).
Això us permetrà dividir les dades en fragments més petits. Si només afegiu una línia a la funció d'exportació de Spark a l'script Glue partition_by = "chr", llavors les dades s'han de dividir en dipòsits.

Analitzant 25 TB amb AWK i R
El genoma està format per nombrosos fragments anomenats cromosomes.

Malauradament, no va funcionar. Els cromosomes tenen diferents mides, la qual cosa significa diferents quantitats d'informació. Això vol dir que les tasques que Spark enviava als treballadors no es van equilibrar i es van completar lentament perquè alguns nodes van acabar aviat i estaven inactius. No obstant això, les tasques es van completar. Però en demanar un SNP, el desequilibri va tornar a causar problemes. El cost de processar els SNP en cromosomes més grans (és a dir, on volem obtenir dades) només ha disminuït aproximadament un factor de 10. Molt, però no suficient.

I si el dividim en parts encara més petites?

Què he après: Mai intenteu fer 2,5 milions de particions en absolut.

Vaig decidir fer-ho tot i dividir cada SNP. Això assegurava que les particions fossin de la mateixa mida. VA SER UNA MALA IDEA. Vaig utilitzar Glue i vaig afegir una línia innocent partition_by = 'snp'. La tasca va començar i es va començar a executar. Un dia després vaig comprovar i vaig veure que encara no hi havia res escrit a S3, així que vaig acabar amb la tasca. Sembla que Glue estava escrivint fitxers intermedis en una ubicació oculta a S3, molts fitxers, potser un parell de milions. Com a resultat, el meu error va costar més de mil dòlars i no va agradar al meu mentor.

Partició + ordenació

Què he après: L'ordenació encara és difícil, igual que ajustar Spark.

El meu darrer intent de partició va implicar particionar els cromosomes i després ordenar cada partició. En teoria, això acceleraria cada consulta perquè les dades SNP desitjades havien d'estar dins d'uns quants trossos de Parquet dins d'un rang determinat. Malauradament, ordenar fins i tot les dades particionades va resultar ser una tasca difícil. Com a resultat, vaig canviar a EMR per a un clúster personalitzat i vaig utilitzar vuit instàncies potents (C5.4xl) i Sparklyr per crear un flux de treball més flexible...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... tanmateix, la tasca encara no s'havia completat. Ho vaig configurar de diferents maneres: va augmentar l'assignació de memòria per a cada executor de consultes, vaig utilitzar nodes amb una gran quantitat de memòria, vaig utilitzar variables de broadcast (variables de difusió), però cada vegada aquestes van resultar ser mitges mesures i, a poc a poc, els executors van començar. fracassar fins que tot es va aturar.

Estic cada cop més creatiu

Què he après: De vegades, les dades especials requereixen solucions especials.

Cada SNP té un valor de posició. Aquest és un nombre corresponent al nombre de bases al llarg del seu cromosoma. Aquesta és una manera agradable i natural d'organitzar les nostres dades. Al principi volia dividir per regions de cada cromosoma. Per exemple, les posicions 1 - 2000, 2001 - 4000, etc. Però el problema és que els SNP no es distribueixen uniformement entre els cromosomes, de manera que les mides del grup variaran molt.

Analitzant 25 TB amb AWK i R

Com a resultat, vaig arribar a un desglossament de les posicions en categories (rang). Utilitzant les dades ja descarregades, vaig executar una sol·licitud per obtenir una llista de SNP únics, les seves posicions i cromosomes. A continuació, vaig ordenar les dades dins de cada cromosoma i vaig recollir els SNP en grups (bin) d'una mida determinada. Posem per cas 1000 SNP cadascun. Això em va donar la relació SNP-a-grup-per-cromosoma.

Al final, vaig fer grups (bin) de 75 SNP, el motiu s'explicarà a continuació.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Primer prova amb Spark

Què he après: L'agregació d'espurnes és ràpida, però la partició encara és cara.

Volia llegir aquest petit marc de dades (2,5 milions de files) a Spark, combinar-lo amb les dades en brut i, a continuació, dividir-lo per la columna recentment afegida. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

jo solia sdf_broadcast(), de manera que Spark sap que hauria d'enviar el marc de dades a tots els nodes. Això és útil si les dades són de mida petita i es requereixen per a totes les tasques. En cas contrari, Spark intenta ser intel·ligent i distribueix dades segons sigui necessari, cosa que pot provocar alentiments.

I de nou, la meva idea no va funcionar: les tasques van funcionar durant un temps, van completar el sindicat i després, com els marmessors llançats per partició, van començar a fracassar.

Afegint AWK

Què he après: No dormiu quan us ensenyen els conceptes bàsics. Segurament algú ja va resoldre el teu problema als anys vuitanta.

Fins a aquest punt, el motiu de tots els meus errors amb Spark va ser el revolt de dades del clúster. Potser la situació es pot millorar amb un tractament previ. Vaig decidir provar de dividir les dades de text en brut en columnes de cromosomes, així que esperava proporcionar a Spark dades "preparticionades".

Vaig cercar a StackOverflow com dividir per valors de columna i vaig trobar una resposta tan gran. Amb AWK podeu dividir un fitxer de text per valors de columna escrivint-lo en un script en lloc d'enviar els resultats a stdout.

Vaig escriure un script Bash per provar-lo. Heu descarregat un dels TSV empaquetats i, a continuació, l'heu desempaquetat gzip i enviat a awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Ha funcionat!

Omplint els nuclis

Què he après: gnu parallel - és una cosa màgica, tothom l'hauria de fer servir.

La separació va ser bastant lenta i quan vaig començar htopper comprovar l'ús d'una potent (i cara) instància EC2, va resultar que només feia servir un nucli i uns 200 MB de memòria. Per resoldre el problema i no perdre molts diners, vam haver d'esbrinar com paral·lelitzar el treball. Afortunadament, en un llibre absolutament sorprenent Ciència de dades a la línia de comandaments Vaig trobar un capítol de Jeron Janssens sobre paral·lelització. D'això n'he après gnu parallel, un mètode molt flexible per implementar multithreading a Unix.

Analitzant 25 TB amb AWK i R
Quan vaig començar el particionament amb el nou procés, tot estava bé, però encara hi havia un coll d'ampolla: la descàrrega d'objectes S3 al disc no era molt ràpida i no estava totalment paral·lelitzada. Per solucionar-ho, vaig fer això:

  1. Vaig descobrir que és possible implementar l'etapa de descàrrega S3 directament al pipeline, eliminant completament l'emmagatzematge intermedi al disc. Això vol dir que puc evitar escriure dades en brut al disc i utilitzar un emmagatzematge encara més petit i, per tant, més barat a AWS.
  2. equip aws configure set default.s3.max_concurrent_requests 50 va augmentar molt el nombre de fils que utilitza AWS CLI (per defecte n'hi ha 10).
  3. Vaig canviar a una instància EC2 optimitzada per a la velocitat de la xarxa, amb la lletra n al nom. He trobat que la pèrdua de potència de processament quan s'utilitzen n instàncies es compensa amb l'augment de la velocitat de càrrega. Per a la majoria de tasques he utilitzat c5n.4xl.
  4. Canviat gzip en pigz, aquesta és una eina gzip que pot fer coses interessants per paral·lelitzar la tasca inicialment no paral·lelitzada de descomprimir fitxers (això va ajudar el menys).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Aquests passos es combinen entre si per fer que tot funcioni molt ràpidament. Augmentant la velocitat de descàrrega i eliminant les escriptures de disc, ara podria processar un paquet de 5 terabytes en poques hores.

Aquest tuit hauria d'haver esmentat "TSV". Ai!

Utilitzant dades recentment analitzades

Què he après: A Spark li agraden les dades sense comprimir i no li agrada combinar particions.

Ara les dades estaven a S3 en un format desempaquetat (llegiu: compartit) i semiordenat, i podria tornar a Spark de nou. M'esperava una sorpresa: vaig tornar a no aconseguir el que volia! Va ser molt difícil dir a Spark exactament com es repartien les dades. I fins i tot quan vaig fer això, va resultar que hi havia massa particions (95 mil) i quan vaig utilitzar coalesce va reduir el seu nombre a límits raonables, això va destruir la meva partició. Estic segur que es pot solucionar, però després d'un parell de dies de recerca no he pogut trobar una solució. Finalment vaig acabar totes les tasques a Spark, tot i que va trigar una estona i els meus fitxers de parquet dividits no eren molt petits (~200 KB). Tanmateix, les dades eren on es necessitaven.

Analitzant 25 TB amb AWK i R
Massa petit i desigual, meravellós!

Prova de consultes locals de Spark

Què he après: Spark té massa sobrecàrrega a l'hora de resoldre problemes senzills.

En baixar les dades en un format intel·ligent, vaig poder provar la velocitat. Configureu un script R per executar un servidor Spark local i, a continuació, carregueu un marc de dades Spark des de l'emmagatzematge del grup de Parquet especificat (bin). Vaig intentar carregar totes les dades, però no vaig aconseguir que Sparklyr reconegués la partició.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

L'execució va trigar 29,415 segons. Molt millor, però no massa bo per fer proves massives de res. A més, no vaig poder accelerar les coses amb la memòria cau perquè quan intentava emmagatzemar un marc de dades a la memòria, Spark sempre fallava, fins i tot quan vaig assignar més de 50 GB de memòria a un conjunt de dades que pesava menys de 15.

Torna a AWK

Què he après: Les matrius associatives en AWK són molt eficients.

Em vaig adonar que podia aconseguir velocitats més altes. Ho vaig recordar d'una manera meravellosa Tutorial AWK de Bruce Barnett Vaig llegir sobre una característica fantàstica anomenada "matrius associatives" Essencialment, es tracta de parells clau-valor, que per algun motiu es van anomenar de manera diferent a AWK i, per tant, d'alguna manera no hi vaig pensar gaire. Roman Cheplyaka va recordar que el terme "matrius associatives" és molt més antic que el terme "parell clau-valor". Encara que tu cerqueu el valor-clau a Google Ngram, no hi veureu aquest terme, però hi trobareu matrius associatives! A més, el "parell clau-valor" s'associa amb més freqüència a bases de dades, de manera que té molt més sentit comparar-lo amb un mapa hash. Em vaig adonar que podia utilitzar aquestes matrius associatives per associar els meus SNP amb una taula de paperera i dades en brut sense utilitzar Spark.

Per fer-ho, a l'script AWK vaig utilitzar el bloc BEGIN. Aquest és un fragment de codi que s'executa abans que la primera línia de dades es passi al cos principal de l'script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Equip while(getline...) carregat totes les files del grup CSV (bin), establiu la primera columna (nom SNP) com a clau per a la matriu associativa bin i el segon valor (grup) com a valor. Després al bloc { }, que s'executa a totes les línies del fitxer principal, cada línia s'envia al fitxer de sortida, que rep un nom únic en funció del seu grup (bin): ..._bin_"bin[$1]"_....

Variables batch_num и chunk_id va fer coincidir les dades proporcionades pel pipeline, evitant una condició de carrera i cada fil d'execució en funcionament parallel, va escriure al seu propi fitxer únic.

Com que vaig dispersar totes les dades en brut a les carpetes dels cromosomes sobrants del meu experiment anterior amb AWK, ara podria escriure un altre script Bash per processar un cromosoma alhora i enviar dades particionades més profundes a S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

El guió té dues seccions parallel.

A la primera secció, les dades es llegeixen de tots els fitxers que contenen informació sobre el cromosoma desitjat, després aquestes dades es distribueixen entre fils, que distribueixen els fitxers als grups adequats (bin). Per evitar condicions de carrera quan diversos fils escriuen al mateix fitxer, AWK passa els noms dels fitxers per escriure dades a diferents llocs, p. chr_10_bin_52_batch_2_aa.csv. Com a resultat, es creen molts fitxers petits al disc (per això vaig utilitzar volums EBS de terabytes).

Transportador de la segona secció parallel passa pels grups (bin) i combina els seus fitxers individuals en un CSV comú c cati després els envia per exportar.

Transmissió en R?

Què he après: Pots contactar stdin и stdout des d'un script R i, per tant, utilitzar-lo en el pipeline.

És possible que hàgiu notat aquesta línia al vostre script Bash: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Tradueix tots els fitxers de grup concatenats (bin) a l'script R a continuació. {} és una tècnica especial parallel, que insereix qualsevol dada que enviï al flux especificat directament a la pròpia comanda. Opció {#} proporciona un ID de fil únic i {%} representa el número de l'espai de treball (repetit, però mai simultàniament). Es pot trobar una llista de totes les opcions a documentació.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Quan una variable file("stdin") transmès a readr::read_csv, les dades traduïdes a l'script R es carreguen en un marc, que després es troba en el formulari .rds-fitxer utilitzant aws.s3 escrit directament a S3.

RDS és una mica com una versió júnior de Parquet, sense els adorns de l'emmagatzematge d'altaveus.

Després d'acabar l'script de Bash, vaig rebre un paquet .rds-fitxers situats a S3, que em van permetre utilitzar una compressió eficient i tipus integrats.

Malgrat l'ús del fre R, tot va funcionar molt ràpidament. No en va, les parts de R que llegeixen i escriuen dades estan molt optimitzades. Després de provar en un cromosoma de mida mitjana, el treball es va completar en una instància C5n.4xl en unes dues hores.

S3 Limitacions

Què he après: Gràcies a la implementació de ruta intel·ligent, S3 pot gestionar molts fitxers.

Em preocupava si S3 seria capaç de gestionar els molts fitxers que s'hi van transferir. Podria fer que els noms dels fitxers tinguessin sentit, però com els buscaria S3?

Analitzant 25 TB amb AWK i R
Les carpetes a S3 són només per mostrar, de fet el sistema no està interessat en el símbol /. Des de la pàgina de preguntes freqüents de S3.

Sembla que S3 representa el camí d'accés a un fitxer determinat com una clau simple en una mena de taula hash o base de dades basada en documents. Un cub es pot considerar una taula i els fitxers es poden considerar registres d'aquesta taula.

Atès que la velocitat i l'eficiència són importants per obtenir beneficis a Amazon, no és d'estranyar que aquest sistema de claus com a ruta de fitxers estigui molt optimitzat. Vaig intentar trobar un equilibri: de manera que no hagués de fer moltes sol·licituds d'obtenció, però que les sol·licituds s'executessin ràpidament. Va resultar que el millor és fer uns 20 mil fitxers bin. Crec que si seguim optimitzant, podem aconseguir un augment de la velocitat (per exemple, fent un cub especial només per a dades, reduint així la mida de la taula de cerca). Però no hi havia temps ni diners per a més experiments.

Què passa amb la compatibilitat creuada?

El que he après: la primera causa de la pèrdua de temps és l'optimització prematura del vostre mètode d'emmagatzematge.

En aquest punt, és molt important preguntar-se: "Per què utilitzar un format de fitxer propietari?" El motiu rau en la velocitat de càrrega (els fitxers CSV amb gzip van trigar 7 vegades més a carregar-se) i la compatibilitat amb els nostres fluxos de treball. Puc reconsiderar si R pot carregar fàcilment fitxers Parquet (o Arrow) sense la càrrega de Spark. Tothom al nostre laboratori utilitza R i, si necessito convertir les dades a un altre format, encara tinc les dades de text originals, així que puc tornar a executar el pipeline.

Divisió del treball

Què he après: No intenteu optimitzar els treballs manualment, deixeu que ho faci l'ordinador.

He depurat el flux de treball en un cromosoma, ara necessito processar totes les altres dades.
Volia generar diverses instàncies EC2 per a la conversió, però al mateix temps tenia por d'obtenir una càrrega molt desequilibrada en diferents treballs de processament (igual que Spark patia particions desequilibrades). A més, no m'interessava augmentar una instància per cromosoma, perquè per als comptes d'AWS hi ha un límit predeterminat de 10 instàncies.

Aleshores vaig decidir escriure un script en R per optimitzar els treballs de processament.

Primer, vaig demanar a S3 que calculés quant espai d'emmagatzematge ocupava cada cromosoma.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Llavors vaig escriure una funció que pren la mida total, barreja l'ordre dels cromosomes, els divideix en grups. num_jobs i us indica com de diferents són les mides de tots els treballs de processament.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Llavors vaig recórrer mil barreges amb ronronament i vaig triar el millor.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Així que vaig acabar amb un conjunt de tasques de mida molt semblant. Aleshores tot el que quedava era embolicar el meu script de Bash anterior en un gran bucle for. Aquesta optimització va trigar uns 10 minuts a escriure. I això és molt menys del que gastaria en crear tasques manualment si estiguessin desequilibrades. Per tant, crec que tenia raó amb aquesta optimització preliminar.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Al final afegeixo l'ordre d'apagat:

sudo shutdown -h now

... i tot va sortir! Utilitzant l'AWS CLI, vaig plantejar instàncies amb l'opció user_data els va donar scripts Bash de les seves tasques per processar. S'executaven i s'apagaven automàticament, així que no estava pagant per poder de processament addicional.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Fem les maletes!

Què he après: L'API ha de ser senzilla per facilitar-ne l'ús i per a la flexibilitat d'ús.

Finalment, vaig obtenir les dades al lloc i la forma adequats. Només quedava simplificar el procés d'ús de les dades al màxim per facilitar-ho als meus companys. Volia fer una API senzilla per crear sol·licituds. Si en el futur decideixo canviar de .rds als fitxers Parquet, llavors això hauria de ser un problema per a mi, no per als meus companys. Per això vaig decidir fer un paquet R intern.

Creeu i documenteu un paquet molt senzill que conté només unes quantes funcions d'accés a dades organitzades al voltant d'una funció get_snp. També vaig fer una pàgina web per als meus companys pkgdown, perquè puguin veure fàcilment exemples i documentació.

Analitzant 25 TB amb AWK i R

Emmagatzematge en memòria cau intel·ligent

Què he après: Si les vostres dades estan ben preparades, la memòria cau serà fàcil!

Com que un dels principals fluxos de treball va aplicar el mateix model d'anàlisi al paquet SNP, vaig decidir utilitzar binning al meu avantatge. Quan es transmeten dades mitjançant SNP, tota la informació del grup (bin) s'adjunta a l'objecte retornat. És a dir, les consultes antigues poden (en teoria) accelerar el processament de consultes noves.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

En crear el paquet, vaig executar molts punts de referència per comparar la velocitat quan utilitzava diferents mètodes. Recomano no descuidar-ho, perquè de vegades els resultats són inesperats. Per exemple, dplyr::filter era molt més ràpid que capturar files mitjançant un filtratge basat en indexació, i recuperar una sola columna d'un marc de dades filtrat era molt més ràpid que utilitzar la sintaxi d'indexació.

Tingueu en compte que l'objecte prev_snp_results conté la clau snps_in_bin. Aquesta és una matriu de tots els SNP únics en un grup (bin), que us permet comprovar ràpidament si ja teniu dades d'una consulta anterior. També fa que sigui fàcil recórrer tots els SNP d'un grup (bin) amb aquest codi:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Troballes

Ara podem (i hem començat a executar seriosament) models i escenaris que abans ens eren inaccessibles. El millor és que els meus companys de laboratori no han de pensar en cap complicació. Només tenen una funció que funciona.

I tot i que el paquet els estalvia els detalls, vaig intentar que el format de dades fos prou senzill perquè ho poguessin esbrinar si desaparegués de sobte demà...

La velocitat ha augmentat notablement. Normalment escanegem fragments de genoma funcionalment significatius. Anteriorment, no ho podíem fer (va resultar massa car), però ara, gràcies a l'estructura del grup (bin) i a la memòria cau, una sol·licitud d'un SNP triga de mitjana menys de 0,1 segons i l'ús de dades és tan baix que els costos de S3 siguin cacauets.

Conclusió

Aquest article no és en absolut una guia. La solució va resultar ser individual i, gairebé segur, no òptima. Més aviat, és un diari de viatge. Vull que els altres entenguin que aquestes decisions no apareixen totalment formades al cap, són el resultat d'assaig i error. A més, si busqueu un científic de dades, tingueu en compte que utilitzar aquestes eines de manera eficaç requereix experiència i l'experiència costa diners. Estic content d'haver tingut els mitjans per pagar, però molts altres que poden fer la mateixa feina millor que jo mai tindran l'oportunitat per manca de diners ni tan sols de provar-ho.

Les eines de big data són versàtils. Si teniu temps, gairebé segur que podeu escriure una solució més ràpida mitjançant tècniques intel·ligents de neteja, emmagatzematge i extracció de dades. En definitiva, es tracta d'una anàlisi cost-benefici.

El que vaig aprendre:

  • no hi ha cap manera barata d'analitzar 25 TB alhora;
  • aneu amb compte amb la mida dels vostres fitxers Parquet i la seva organització;
  • Les particions a Spark han d'estar equilibrades;
  • En general, no intenteu mai fer 2,5 milions de particions;
  • L'ordenació encara és difícil, igual que configurar Spark;
  • de vegades les dades especials requereixen solucions especials;
  • L'agregació d'espurnes és ràpida, però la partició encara és cara;
  • no dormiu quan us ensenyin les bases, probablement algú ja va resoldre el vostre problema als anys vuitanta;
  • gnu parallel - això és una cosa màgica, tothom l'hauria de fer servir;
  • A Spark li agraden les dades sense comprimir i no li agrada combinar particions;
  • Spark té massa sobrecàrrega a l'hora de resoldre problemes senzills;
  • Les matrius associatives d'AWK són molt eficients;
  • pots contactar stdin и stdout des d'un script R i, per tant, utilitzar-lo en el pipeline;
  • Gràcies a la implementació de ruta intel·ligent, S3 pot processar molts fitxers;
  • El motiu principal per perdre el temps és optimitzar prematurament el vostre mètode d'emmagatzematge;
  • no intenteu optimitzar les tasques manualment, deixeu que ho faci l'ordinador;
  • L'API hauria de ser senzilla per la facilitat i la flexibilitat d'ús;
  • Si les vostres dades estan ben preparades, la memòria cau serà fàcil!

Font: www.habr.com

Afegeix comentari