Parsing 25TB mat AWK a R

Parsing 25TB mat AWK a R
Wéi liesen dësen Artikel: Ech entschëllege mech fir datt den Text sou laang a chaotesch ass. Fir Iech Zäit ze spueren, fänken ech all Kapitel mat enger "Wat ech geléiert" Aféierung un, déi d'Essenz vum Kapitel an engem oder zwee Sätz resüméiert.

"Weis mir just d'Léisung!" Wann Dir just wëllt gesinn wou ech hierkënnt, da sprangt op d'Kapitel "Méi Inventiv ginn", awer ech mengen et ass méi interessant an nëtzlech iwwer Versoen ze liesen.

Ech krut viru kuerzem d'Aufgab fir e Prozess opzestellen fir e grousse Volumen vu roude DNA Sequenzen ze veraarbecht (technesch en SNP Chip). De Besoin war séier Daten iwwer eng bestëmmte genetesch Plaz ze kréien (genannt SNP) fir spéider Modeller an aner Aufgaben. Mat R an AWK konnt ech d'Donnéeën op eng natierlech Manéier botzen an organiséieren, wat d'Ufroveraarbechtung staark beschleunegt. Dëst war net einfach fir mech an erfuerdert vill Iteratiounen. Dësen Artikel hëlleft Iech e puer vu menge Feeler ze vermeiden a weisen Iech wat ech opgehalen hunn.

Éischt, e puer Aféierungscoursen Erklärungen.

Donnéeën

Eis Universitéit genetesch Informatiounsveraarbechtungszentrum huet eis Daten a Form vun engem 25 TB TSV zur Verfügung gestallt. Ech krut se opgedeelt a 5 Packagen, kompriméiert vu Gzip, jidderee vun deenen ongeféier 240 véier-Gigabyte Dateien enthält. All Zeil enthält Daten fir een SNP vun engem Individuum. Am Ganzen goufen Daten iwwer ~ 2,5 Millioune SNPs a ~ 60 Tausend Leit iwwerdroen. Zousätzlech zu SNP Informatioun enthalen d'Dateien vill Kolonnen mat Zuelen déi verschidde Charakteristike reflektéieren, sou wéi d'Liesintensitéit, d'Frequenz vu verschiddene Allele, etc. Am Ganzen waren et ongeféier 30 Saile mat eenzegaartege Wäerter.

Goal geschoss huet

Wéi mat all Datemanagementprojet, war dat Wichtegst fir ze bestëmmen wéi d'Date benotzt ginn. An dësem Fall mir wäerte meeschtens Modeller a Workflows fir SNP auswielen baséiert op SNP. Dat ass, mir wäerten nëmmen Daten op engem SNP gläichzäiteg brauchen. Ech hu misse léieren wéi ech all d'Records, déi mat engem vun den 2,5 Millioune SNPs verbonne sinn, sou einfach, séier a bëlleg wéi méiglech recuperéieren.

Wéi dëst net ze maachen

Fir e passende Cliché ze zitéieren:

Ech hunn net dausend Mol gescheitert, ech hunn just dausend Weeër entdeckt fir eng Rëtsch Daten an engem Ufro-frëndlecht Format ze vermeiden.

Éischt Versuch

Wat hunn ech geléiert: Et gëtt kee bëllege Wee fir 25 TB gläichzäiteg ze analyséieren.

Nodeems ech de Cours "Advanced Methods for Big Data Processing" op der Vanderbilt University gemaach hunn, war ech sécher datt den Trick an der Täsch war. Et wäert wahrscheinlech eng Stonn oder zwou daueren fir den Hive-Server opzestellen fir all d'Donnéeën duerchzeféieren an d'Resultat ze berichten. Well eis Donnéeën an AWS S3 gespäichert sinn, hunn ech de Service benotzt Athena, wat Iech erlaabt Hive SQL Ufroen op S3 Daten z'applizéieren. Dir braucht net engem Hive Stärekoup Ariichten / erhéijen, an Dir bezuelt och nëmmen fir d'Donnéeën Dir sicht.

Nodeems ech Athena meng Donnéeën a säi Format gewisen hunn, hunn ech e puer Tester mat Ufroen wéi dëst gemaach:

select * from intensityData limit 10;

A séier gutt strukturéiert Resultater kritt. Fäerdeg.

Bis mir probéiert hunn d'Donnéeën an eiser Aarbecht ze benotzen ...

Ech gouf gefrot all d'SNP Informatioun erauszezéien fir de Modell ze testen. Ech hunn d'Ufro gemaach:


select * from intensityData 
where snp = 'rs123456';

...an ugefaang ze waarden. No aacht Minutten a méi wéi 4 TB vun ugefrote Donnéeën hunn ech d'Resultat kritt. Athena Käschten duerch de Volume vun Daten fonnt, $ 5 pro Terabyte. Also dës eenzeg Ufro kascht $ 20 an aacht Minutte Waarde. Fir de Modell op all Daten auszeféieren, hu mer misse 38 Joer waarden a 50 Millioune bezuelen, selbstverständlech war dat net fir eis passend.

Et war néideg Parquet ze benotzen ...

Wat hunn ech geléiert: Sidd virsiichteg mat der Gréisst vun Äre Parquet Dateien an hir Organisatioun.

Ech hunn als éischt probéiert d'Situatioun ze fixéieren andeems ech all TSVs konvertéieren Parquet Fichieren. Si si praktesch fir mat groussen Datesätz ze schaffen, well d'Informatioun an hinnen a spalteform gespäichert ass: all Kolonn läit an engem eegene Gedächtnis-/Disksegment, am Géigesaz zu Textdateien, an deenen Reihen Elementer vun all Kolonn enthalen. A wann Dir eppes muss fannen, da liest just déi erfuerderlech Kolonn. Zousätzlech späichert all Datei eng Rei vu Wäerter an enger Kolonn, also wann de Wäert deen Dir sicht net am Beräich vun der Kolonn ass, wäert Spark keng Zäit verschwenden fir déi ganz Datei ze scannen.

Ech hunn eng einfach Aufgab gemaach AWS Klebstoff fir eis TSVs op Parquet ze konvertéieren an déi nei Dateien an Athena ze falen. Et huet ongeféier 5 Stonnen gedauert. Awer wann ech d'Ufro gemaach hunn, huet et ongeféier déiselwecht Zäit an e bësse manner Suen gedauert fir ze kompletéieren. D'Tatsaach ass datt Spark, probéiert d'Aufgab ze optimiséieren, einfach een TSV Stéck ausgepackt huet an en an säin eegene Parquet Stéck gesat huet. A well all Stéck grouss genuch war fir déi ganz Opzeechnunge vu ville Leit ze halen, enthält all Datei all SNPs, sou datt de Spark all d'Dateien huet missen opmaachen fir déi néideg Informatioun ze extrahieren.

Interessanterweis ass Parquet's Standard (a recommandéiert) Kompressiounstyp, snappy, net opgedeelt. Dofir war all Exekutor fest an der Aufgab fir déi voll 3,5 GB Dataset auszepaken an erofzelueden.

Parsing 25TB mat AWK a R

Loosst eis de Problem verstoen

Wat hunn ech geléiert: Sortéieren ass schwéier, besonnesch wann d'Donnéeë verdeelt sinn.

Et huet mir geschéngt datt ech elo d'Essenz vum Problem verstanen hunn. Ech brauch nëmmen d'Donnéeën no SNP Kolonn ze sortéieren, net vu Leit. Da ginn e puer SNPs an engem getrennten Datenschnëtt gespäichert, an dann d'Parquet "Smart" Funktioun "oppen nëmmen wann de Wäert am Beräich ass" wäert sech an all senger Herrlechkeet weisen. Leider huet d'Sortéieren duerch Milliarden Reihen, déi iwwer e Stärekoup verstreet sinn, eng schwiereg Aufgab bewisen.

AWS wëll definitiv net e Remboursement ausginn aus dem Grond "I'm a distracted student". Nodeems ech d'Sortéierung op Amazon Glue gelaf hunn, ass et fir 2 Deeg gelaf an ass erofgefall.

Wat iwwer d'Partitionéierung?

Wat hunn ech geléiert: Partitionen am Spark mussen ausgeglach sinn.

Dunn sinn ech op d'Iddi komm fir Daten a Chromosomen ze partitionéieren. Et ginn 23 vun hinnen (a e puer méi wann Dir d'Mitochondrial DNA an onmapéiert Regiounen berücksichtegt).
Dëst erlaabt Iech d'Donnéeën a méi kleng Stécker opzedeelen. Wann Dir nëmmen eng Zeil un d'Spark Export Funktioun am Glue Skript bäidréit partition_by = "chr", da sollen d'Donnéeën an Eemer opgedeelt ginn.

Parsing 25TB mat AWK a R
De Genom besteet aus ville Fragmenter genannt Chromosomen.

Leider huet et net geklappt. Chromosomen hu verschidde Gréissten, dat heescht verschidde Quantitéiten un Informatioun. Dëst bedeit datt d'Aufgaben, déi Spark un d'Aarbechter geschéckt huet, net equilibréiert a lues ofgeschloss sinn, well e puer Noden fréi fäerdeg waren an idle waren. Allerdéngs sinn d'Aufgaben ofgeschloss. Awer wann een no engem SNP gefrot gouf, huet den Desequiliber nees Problemer gesuergt. D'Käschte vun der Veraarbechtung vun SNPs op gréissere Chromosomen (dat ass, wou mir Daten wëllen kréien) sinn nëmmen ëm ongeféier e Faktor vun 10 erofgaang. Vill, awer net genuch.

Wat wa mir et an nach méi kleng Deeler opdeelen?

Wat hunn ech geléiert: Probéiert ni 2,5 Millioune Partitionen iwwerhaapt ze maachen.

Ech hu beschloss alles eraus ze goen an all SNP ze partitionéieren. Dëst huet gesuergt datt d'Partitionen vun der selwechter Gréisst waren. ET WAR ENG BLECH IDEE. Ech hunn Glue benotzt an eng onschëlleg Linn bäigefüügt partition_by = 'snp'. D'Aufgab huet ugefaang an huet ugefaang auszeféieren. En Dag méi spéit hunn ech gepréift a gesinn datt et nach ëmmer näischt op S3 geschriwwen ass, also hunn ech d'Aufgab ëmbruecht. Et gesäit aus wéi Glue Zwëschendateien op eng verstoppte Plaz am S3 geschriwwen huet, vill Dateien, vläicht e puer Millioune. Als Resultat huet mäi Feeler méi wéi dausend Dollar kascht an huet mäi Mentor net gefall.

Partitionéieren + Zortéieren

Wat hunn ech geléiert: Sortéieren ass nach ëmmer schwéier, sou wéi Spark ofstëmmen.

Mäi leschte Versuch fir ze partitionéieren involvéiert mech d'Chromosomen ze partitionéieren an dann all Partition ze sortéieren. An Theorie géif dëst all Ufro beschleunegen, well déi gewënscht SNP-Daten an e puer Parquet-Stécker an engem bestëmmte Beräich musse sinn. Leider huet d'Sortéierung vu souguer opgedeelt Donnéeën eng schwiereg Aufgab erausgestallt. Als Resultat hunn ech op EMR fir e personaliséierte Cluster gewiesselt an aacht mächteg Instanzen (C5.4xl) a Sparkliner benotzt fir e méi flexibelen Workflow ze kreéieren ...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...d'Aufgab war awer nach net ofgeschloss. Ech hunn et op verschidde Manéieren konfiguréiert: erhéicht d'Erënnerungsallokatioun fir all Ufro Exekutor, benotzt Noden mat enger grousser Quantitéit un Erënnerung, benotzt Broadcast Variablen (Sendungsvariablen), awer all Kéier hunn dës sech als hallef Moossnamen erausgestallt, a lues a lues hunn d'Exekutoren ugefaang ze versoen bis alles opgehalen huet.

Ech ginn méi kreativ

Wat hunn ech geléiert: Heiansdo speziell Daten verlaangen speziell Léisungen.

All SNP huet eng Positioun Wäert. Dëst ass eng Zuel, déi der Unzuel vun de Basen laanscht säi Chromosom entsprécht. Dëst ass e flotten an natierleche Wee fir eis Donnéeën ze organiséieren. Am Ufank wollt ech no Regioune vun all Chromosom opdeelen. Zum Beispill, Positiounen 1 - 2000, 2001 - 4000, etc. Awer de Problem ass datt SNPs net gläichméisseg iwwer d'Chromosomen verdeelt sinn, sou datt d'Gruppgréissten dofir staark variéieren.

Parsing 25TB mat AWK a R

Als Resultat sinn ech op eng Ënnerdeelung vu Positiounen a Kategorien (Rang) komm. Mat der scho erofgelueden Donnéeën hunn ech eng Ufro gemaach fir eng Lëscht vun eenzegaartege SNPs ze kréien, hir Positiounen a Chromosomen. Duerno hunn ech d'Donnéeën an all Chromosom zortéiert a SNPs a Gruppen (Bin) vun enger bestëmmter Gréisst gesammelt. Loosst eis soen 1000 SNPs all. Dëst huet mir d'SNP-zu-Grupp-pro-Chromosom Relatioun.

Um Enn hunn ech Gruppen (Bin) vun 75 SNPs gemaach, de Grond gëtt hei ënnen erkläert.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Éischt probéieren mat Spark

Wat hunn ech geléiert: Spark Aggregatioun ass séier, awer d'Partitionéierung ass nach ëmmer deier.

Ech wollt dëse klenge (2,5 Millioune Reihen) Dateframe a Spark liesen, et mat de rauen Donnéeën kombinéieren, an dann duerch déi nei dobäigesate Kolonn opdeelen bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Ech benotzt sdf_broadcast(), sou datt Spark weess datt et den Dateframe op all Noden schéckt. Dëst ass nëtzlech wann d'Donnéeën kleng a Gréisst sinn an erfuerderlech fir all Aufgaben. Soss probéiert Spark intelligent ze sinn a verdeelt Daten wéi néideg, wat Verlängerungen verursaache kann.

An erëm, meng Iddi huet net geschafft: d'Aufgaben hunn eng Zäit geschafft, d'Gewerkschaft ofgeschloss, an dann, wéi d'Exekutoren, déi duerch d'Partitionéierung lancéiert goufen, hunn se ugefaang ze versoen.

AWK derbäi

Wat hunn ech geléiert: Schlof net wann Dir d'Grondlage geléiert gëtt. Sécher huet iergendeen Äre Problem schonn an den 1980er Joren geléist.

Bis zu dësem Zäitpunkt war de Grond fir all meng Feeler mam Spark de Jumble vun Daten am Cluster. Vläicht kann d'Situatioun mat der Pre-Behandlung verbessert ginn. Ech hu beschloss ze probéieren déi rau Textdaten a Kolonnen vu Chromosomen opzedeelen an der Hoffnung de Spark mat "pre-partitionéierten" Donnéeën ze liwweren.

Ech hunn op StackOverflow gesicht wéi een duerch Kolonnwäerter opgedeelt gëtt a fonnt sou eng super Äntwert. Mat AWK kënnt Dir eng Textdatei no Kolonnwäerter opdeelen andeems Dir se an engem Skript schreift anstatt d'Resultater ze schécken stdout.

Ech hunn e Bash Skript geschriwwen fir et auszeprobéieren. Huelt ee vun de verpackte TSVs erof, dann huet se ausgepackt benotzt gzip an geschéckt ze awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Et huet geklappt!

Fëllt d'Kären

Wat hunn ech geléiert: gnu parallel - et ass eng magesch Saach, jidderee soll et benotzen.

D'Trennung war zimlech lues a wéi ech ugefaang hunn htopfir d'Benotzung vun enger mächteger (an deier) EC2 Instanz z'iwwerpréiwen, huet sech erausgestallt datt ech nëmmen ee Kär an ongeféier 200 MB Erënnerung benotzt. Fir de Problem ze léisen an net vill Suen ze verléieren, hu mir missen erausfannen, wéi d'Aarbecht paralleliséiert gëtt. Glécklecherweis an engem absolut erstaunlech Buch Data Science op der Command Line Ech hunn e Kapitel vum Jeron Janssens iwwer Paralleliséierung fonnt. Vun et hunn ech geléiert iwwer gnu parallel, eng ganz flexibel Method fir Multithreading an Unix ëmzesetzen.

Parsing 25TB mat AWK a R
Wéi ech d'Partitionéierung mam neie Prozess ugefaang hunn, war alles gutt, awer et war nach ëmmer e Flaschenhals - S3 Objekter op Disk eroflueden war net ganz séier an net voll paralleliséiert. Fir dëst ze fixéieren, hunn ech dat gemaach:

  1. Ech hunn erausfonnt datt et méiglech ass d'S3 Download-Stage direkt an der Pipeline ëmzesetzen, déi Zwëschenlagerung op der Disk komplett eliminéiert. Dëst bedeit datt ech vermeide kann réi Daten op Disk ze schreiwen an nach méi kleng, an dofir méi bëlleg, Späicheren op AWS benotzen.
  2. Equipe aws configure set default.s3.max_concurrent_requests 50 d'Zuel vun de Threads déi AWS CLI benotzt staark erhéicht (par défaut sinn et 10).
  3. Ech sinn op eng EC2 Instanz optimiséiert fir Netzwierkgeschwindegkeet, mam Buschtaf n am Numm. Ech hu festgestallt datt de Verloscht vun der Veraarbechtungskraaft beim Gebrauch vun n-Instanzen méi wéi kompenséiert gëtt duerch d'Erhéijung vun der Luedegeschwindegkeet. Fir déi meescht Aufgaben hunn ech c5n.4xl benotzt.
  4. Geännert gzip op pigz, Dëst ass e gzip-Tool dat cool Saache maache kann fir déi ursprénglech net-parallelliséiert Aufgab fir Dateien ze dekompriméieren (dëst huet am mannsten gehollef).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Dës Schrëtt gi matenee kombinéiert fir datt alles ganz séier funktionnéiert. Duerch d'Erhéijung vun den Downloadgeschwindegkeeten an d'Eliminatioun vun Disk Schreiwen, konnt ech elo e 5 Terabyte Package an nëmmen e puer Stonnen veraarbechten.

Dësen Tweet sollt "TSV" ernimmen. Och.

Benotzt nei parséiert Donnéeën

Wat hunn ech geléiert: Spark huet onkompriméiert Donnéeën gär a kombinéiere net gär Partitionen.

Elo waren d'Donnéeën am S3 an engem ausgepackt (liesen: gedeelt) an semi-organiséiert Format, an ech konnt erëm op Spark zréckkommen. Eng Iwwerraschung huet op mech gewaart: Ech hunn erëm net fäerdeg bruecht wat ech wollt! Et war ganz schwéier Spark genau ze soen wéi d'Donnéeën opgedeelt goufen. An och wann ech dat gemaach hunn, huet sech erausgestallt datt et ze vill Partitionen waren (95 Tausend), a wann ech benotzt hunn coalesce reduzéiert hir Zuel op raisonnabel Grenzen, dëst huet meng Partitionéierung zerstéiert. Ech si sécher datt dëst fixéiert ka ginn, awer no e puer Deeg no Sichen konnt ech keng Léisung fannen. Ech hunn schlussendlech all d'Aufgaben am Spark fäerdeg gemaach, obwuel et eng Zäit gedauert huet a meng gespléckt Parquet Dateien net ganz kleng waren (~ 200 KB). Wéi och ëmmer, d'Donnéeën waren do wou se gebraucht goufen.

Parsing 25TB mat AWK a R
Ze kleng an ongläich, wonnerbar!

Testen lokal Spark Ufroen

Wat hunn ech geléiert: Spark huet ze vill Overhead wann Dir einfach Problemer léist.

Andeems ech d'Donnéeën an engem clevere Format eroflueden, konnt ech d'Geschwindegkeet testen. Setzt e R Skript fir e lokale Spark-Server ze lafen, an huet dann e Spark-Dateframe vun der spezifizéierter Parquet-Gruppspäicherung (bin) gelueden. Ech hu probéiert all d'Donnéeën ze lueden, awer konnt Sparklyr net kréien fir d'Partitionéierung ze erkennen.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

D'Ausféierung huet 29,415 Sekonnen gedauert. Vill besser, awer net ze gutt fir Massetest vun eppes. Zousätzlech konnt ech d'Saachen net mam Caching beschleunegen, well wann ech probéiert hunn en Dateframe an der Erënnerung ze cache, ass Spark ëmmer erofgefall, och wann ech méi wéi 50 GB Erënnerung un en Dataset zougedeelt hunn dee manner wéi 15 gewien huet.

Zréck op AWK

Wat hunn ech geléiert: Associativ Arrays an AWK si ganz effizient.

Ech hu gemierkt datt ech méi héich Geschwindegkeet erreechen konnt. Ech erënneren, datt an engem wonnerbar AWK Tutorial vum Bruce Barnett Ech liesen iwwer eng cool Feature genannt "assoziativ Arrays" Am Wesentlechen sinn dës Schlësselwäertpaaren, déi aus iergendengem Grond anescht an AWK genannt goufen, an dofir hunn ech iergendwéi net vill iwwer si geduecht. Roman Cheplyaka erënnert datt de Begrëff "assoziativ Arrays" vill méi al ass wéi de Begrëff "Schlësselwäertpaar". Och wann Dir kuckt de Schlësselwäert am Google Ngram op, Dir gesitt dëse Begrëff net do, awer Dir fannt assoziativ Arrays! Zousätzlech ass de "Schlëssel-Wäertpaar" meeschtens mat Datenbanken verbonnen, sou datt et vill méi Sënn mécht et mat engem Hashmap ze vergläichen. Ech hu gemierkt datt ech dës assoziativ Arrays benotze kéint fir meng SNPs mat enger Bin-Tabelle a Matière Daten ze associéieren ouni Spark ze benotzen.

Fir dëst ze maachen, am AWK Skript hunn ech de Block benotzt BEGIN. Dëst ass e Stéck Code dat ausgefouert gëtt ier déi éischt Zeil vun Daten un den Haaptkierper vum Skript weidergeleet gëtt.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Equipe while(getline...) all Zeilen aus der CSV-Grupp (Bin) gelueden, setze déi éischt Kolonn (SNP-Numm) als Schlëssel fir den assoziativen Array bin an den zweete Wäert (Grupp) als Wäert. Dann am Block { }, déi op all Zeilen vun der Haaptdatei ausgefouert gëtt, gëtt all Zeil an d'Ausgabdatei geschéckt, déi en eenzegaartegen Numm kritt ofhängeg vu senger Grupp (Bin): ..._bin_"bin[$1]"_....

Verännerlechen batch_num и chunk_id entsprécht d'Donnéeën, déi vun der Pipeline geliwwert ginn, e Rennzoustand ze vermeiden, an all Ausféierungsfuedem leeft parallel, op seng eege eenzegaarteg Datei geschriwwen.

Zënter datt ech all déi réi Donnéeën an Ordner op Chromosomen verspreet hunn, déi vu mengem viregten Experiment mat AWK iwwreg waren, konnt ech elo en anert Bash Skript schreiwen fir ee Chromosom gläichzäiteg ze veraarbechten an méi déif partitionéiert Daten op S3 ze schécken.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

De Skript huet zwou Sektiounen parallel.

Am éischten Abschnitt ginn d'Date gelies vun alle Dateien, déi Informatioun iwwer de gewënschten Chromosom enthalen, da ginn dës Donnéeën iwwer Threads verdeelt, déi d'Dateien an déi entspriechend Gruppen (Bin) verdeelen. Fir Course Konditiounen ze vermeiden wann MÉI thread an déi selwecht Fichier schreiwen, Passë AWK d'Datei Nimm fir eng schreiwen Daten op verschidde Plazen, z.B. chr_10_bin_52_batch_2_aa.csv. Als Resultat gi vill kleng Dateien op der Disk erstallt (fir dëst hunn ech Terabyte EBS Bänn benotzt).

Conveyor aus der zweeter Sektioun parallel geet duerch d'Gruppen (Bin) a verbënnt hir individuell Dateien a gemeinsame CSV c cata schéckt se dann fir Export.

Sendung an R?

Wat hunn ech geléiert: Dir kënnt Kontakt stdin и stdout aus engem R Skript, an dofir benotzen se an der Pipeline.

Dir hutt vläicht dës Linn an Ärem Bash Skript gemierkt: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Et iwwersetzt all verbonne Gruppdateien (Bin) an de R Skript hei drënner. {} ass eng speziell Technik parallel, déi all Daten setzt, déi se an de spezifizéierte Stroum schéckt, direkt an de Kommando selwer. Optioun {#} stellt eng eenzegaarteg thread ID, an {%} duerstellt der Aarbecht Slot Zuel (widderholl, awer ni gläichzäiteg). Eng Lëscht vun all Optiounen kann fonnt ginn an Dokumentatioun.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Wann eng Variabel file("stdin") iwwerdroen un readr::read_csv, ginn d'Donnéeën, déi an de R Skript iwwersat sinn, an e Frame gelueden, deen dann an der Form ass .rds-Datei benotzt aws.s3 geschriwwen direkt op S3.

RDS ass eppes wéi eng Junior Versioun vum Parquet, ouni d'Frills vum Lautsprecherlagerung.

Nodeems ech de Bash Skript ofgeschloss hunn, krut ech e Bündel .rds-Dateien, déi am S3 sinn, wat et erlaabt huet, effizient Kompressioun an agebaute Typen ze benotzen.

Trotz der Benotzung vun der Brems R, huet alles ganz séier geschafft. Net iwwerraschend sinn d'Deeler vum R déi Daten liesen a schreiwen héich optimiséiert. Nom Test op engem mëttelgrousse Chromosom huet d'Aarbecht op enger C5n.4xl Instanz an ongeféier zwou Stonnen ofgeschloss.

S3 Aschränkungen

Wat hunn ech geléiert: Dank Smart Wee Ëmsetzung, kann S3 vill Fichieren verschaffen.

Ech war besuergt ob S3 fäeg wier déi vill Dateien ze handhaben déi op se transferéiert goufen. Ech kéint de Fichier Nimm maachen Sënn maachen, mee wéi géif S3 kucken fir hinnen?

Parsing 25TB mat AWK a R
Classeure am S3 si just fir ze weisen, tatsächlech ass de System net un d'Symbol interesséiert /. Vun der S3 FAQ Säit.

Et schéngt, datt S3 de Wee zu enger bestëmmter Datei als einfache Schlëssel an enger Zort Hash-Table oder Dokument-baséiert Datebank duerstellt. En Eemer kann als Dësch geduecht ginn, an Dateie kënnen als records an där Tabell ugesi ginn.

Zënter Geschwindegkeet an Effizienz si wichteg fir e Gewënn bei Amazon ze maachen, ass et keng Iwwerraschung datt dëse Key-as-a-Datei-Wee System freaking optimiséiert ass. Ech hu probéiert e Gläichgewiicht ze fannen: sou datt ech net vill Get-Demande musse maachen, mä datt d'Demande séier ausgefouert goufen. Et huet sech erausgestallt datt et am beschten ass ongeféier 20 Tausend Bin Dateien ze maachen. Ech mengen, wa mir weider optimiséieren, kënne mir eng Erhéijung vun der Geschwindegkeet erreechen (zum Beispill, e speziellen Eemer just fir Daten ze maachen, sou datt d'Gréisst vun der Lookup-Tabelle reduzéiert gëtt). Awer et war keng Zäit oder Suen fir weider Experimenter.

Wat iwwer Kräizkompatibilitéit?

Wat ech geléiert hunn: Déi Nummer eent Ursaach vu verschwenden Zäit ass Är Späichermethod ze fréi ze optimiséieren.

Zu dësem Zäitpunkt ass et ganz wichteg Iech selwer ze froen: "Firwat e propriétaire Dateiformat benotzen?" De Grond läit an der Luedegeschwindegkeet (gzipped CSV Dateien huet 7 Mol méi laang gedauert fir ze lueden) a Kompatibilitéit mat eise Workflows. Ech kann iwwerdenken ob R einfach Parquet (oder Arrow) Dateien ouni d'Spark-Laascht luede kann. Jiddereen an eisem Labo benotzt R, a wann ech d'Donnéeën an en anert Format muss konvertéieren, hunn ech nach ëmmer déi ursprénglech Textdaten, also kann ech d'Pipeline einfach erëm lafen.

Divisioun vun der Aarbecht

Wat hunn ech geléiert: Probéiert net Aarbechtsplazen manuell ze optimiséieren, loosst de Computer et maachen.

Ech hunn de Workflow op engem Chromosom debugged, elo muss ech all déi aner Daten veraarbechten.
Ech wollt e puer EC2 Instanzen fir d'Konversioun erhéijen, awer gläichzäiteg hat ech Angscht eng ganz onbalancéiert Belaaschtung iwwer verschidde Veraarbechtungsjobs ze kréien (sou wéi de Spark ënner onbalancéierten Partitionen gelidden huet). Ausserdeem war ech net interesséiert fir eng Instanz pro Chromosom z'erhéijen, well fir AWS Konten gëtt et eng Standardlimit vun 10 Instanzen.

Dunn hunn ech decidéiert e Skript am R ze schreiwen fir d'Veraarbechtungsjobs ze optimiséieren.

Als éischt hunn ech S3 gefrot fir ze berechnen wéi vill Späicherplatz all Chromosom besat huet.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Dunn hunn ech eng Funktioun geschriwwen déi d'Gesamtgréisst hëlt, d'Uerdnung vun de Chromosomen rëselt, se a Gruppen opdeelt num_jobs a seet Iech wéi verschidden d'Gréisst vun all Veraarbechtung Aarbechtsplazen sinn.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Dunn sinn ech duerch dausend Shuffles mat Purrr gelaf an hunn déi Bescht gewielt.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Also hunn ech mat enger Rei vun Aufgaben opgehalen, déi ganz ähnlech a Gréisst waren. Da war alles wat iwwreg war fir mäi fréiere Bash Skript an eng grouss Loop ze wéckelen for. Dës Optimisatioun huet ongeféier 10 Minutten gedauert fir ze schreiwen. An dëst ass vill manner wéi ech géif verbréngen fir manuell Aufgaben ze kreéieren wa se onbalancéiert waren. Dofir mengen ech, datt ech mat dëser virleefeg Optimiséierung richteg war.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Um Enn fügen ech de Shutdown Kommando:

sudo shutdown -h now

... an alles huet geklappt! Mat der AWS CLI hunn ech Instanzen opgeworf mat der Optioun user_data huet hinnen Bash Scripte vun hiren Aufgaben fir Veraarbechtung. Si lafen an automatesch zou, sou datt ech net fir extra Veraarbechtungskraaft bezuelt hunn.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Loosst eis packen!

Wat hunn ech geléiert: D'API soll einfach sinn fir d'Liichtegkeet an d'Flexibilitéit vum Gebrauch.

Endlech hunn ech d'Donnéeën op déi richteg Plaz a Form. Et bleift just fir de Prozess vun der Benotzung vun Donnéeën sou vill wéi méiglech ze vereinfachen fir et meng Kollegen méi einfach ze maachen. Ech wollt eng einfach API maachen fir Ufroen ze kreéieren. Wann ech an Zukunft décidéieren ze wiesselen ugefaangen .rds zu Parquetdateien, da soll dat e Problem fir mech sinn, net fir meng Kollegen. Fir dëst hunn ech decidéiert en internen R Package ze maachen.

Bauen an dokumentéieren e ganz einfache Package deen nëmmen e puer Datenzougang Funktiounen enthält, organiséiert ronderëm eng Funktioun get_snp. Ech hunn och eng Websäit fir meng Kollegen gemaach pkg erof, sou datt se einfach Beispiller an Dokumentatioun gesinn.

Parsing 25TB mat AWK a R

Smart Caching

Wat hunn ech geléiert: Wann Är Donnéeë gutt virbereet sinn, wäert Cache einfach sinn!

Well ee vun den Haapt Workflows dee selwechte Analysemodell op de SNP Package applizéiert huet, hunn ech décidéiert Binning zu mengem Virdeel ze benotzen. Wann Dir Daten iwwer SNP vermëttelt, gëtt all Informatioun aus der Grupp (Bin) un de zréckginn Objet befestegt. Dat ass, al Ufroe kënnen (an Theorie) d'Veraarbechtung vun neien Ufroen beschleunegen.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Wann ech de Package bauen, hunn ech vill Benchmarks gemaach fir d'Geschwindegkeet ze vergläichen wann Dir verschidde Methoden benotzt. Ech recommandéieren dëst net ze vernoléissegen, well heiansdo sinn d'Resultater onerwaart. Zum Beispill, dplyr::filter war vill méi séier wéi Zeile mat Indexéierungsbaséierter Filterung z'erhalen, an eng eenzeg Kolonn aus engem gefilterten Dateframe zréckzekommen war vill méi séier wéi d'Indexsyntax ze benotzen.

Maacht weg datt den Objet prev_snp_results enthält de Schlëssel snps_in_bin. Dëst ass eng Array vun all eenzegaartegen SNPs an enger Grupp (Bin), wat Iech erlaabt Iech séier z'iwwerpréiwen ob Dir schonn Daten aus enger fréierer Ufro hutt. Et mécht et och einfach duerch all d'SNPs an enger Grupp (Bin) mat dësem Code ze loopen:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Resultater

Elo kënne mir (an hunn ugefaang eescht) Modeller an Szenarie lafen, déi virdru fir eis onzougänglech waren. Déi bescht Saach ass datt meng Labokollegen net iwwer Komplikatioune mussen nodenken. Si hu just eng Funktioun déi funktionnéiert.

An och wann de Package hinnen d'Detailer erspuert, hunn ech probéiert den Dateformat einfach genuch ze maachen datt se et erausfannen wann ech muer op eemol verschwannen ...

D'Vitesse ass däitlech eropgaang. Mir scannen normalerweis funktionell bedeitend Genomfragmenter. Virdru konnte mir dat net maachen (et huet sech ze deier erausgestallt), awer elo, dank der Grupp (Bin) Struktur a Caching, eng Ufro fir een SNP dauert am Duerchschnëtt manner wéi 0,1 Sekonnen, an d'Dateverbrauch ass sou niddereg, datt d'Käschte fir S3 peanuts sinn.

Konklusioun

Dësen Artikel ass guer kee Guide. D'Léisung huet sech als individuell erausgestallt, a bal sécher net optimal. Et ass éischter e Reesvertrag. Ech wëll anerer verstoen datt esou Entscheedungen net voll am Kapp geformt sinn, si sinn d'Resultat vum Versuch a Feeler. Och wann Dir no engem Datewëssenschaftler sicht, bedenkt datt d'Benotzung vun dësen Tools effektiv Erfahrung erfuerdert, an d'Erfahrung kascht Geld. Ech si frou, datt ech d'Moyene hate fir ze bezuelen, awer vill anerer, déi déi selwecht Aarbecht besser maache wéi ech, wäerten aus Mangel u Suen ni d'Méiglechkeet hunn iwwerhaapt ze probéieren.

Big Data Tools si villsäiteg. Wann Dir d'Zäit hutt, kënnt Dir bal sécher eng méi séier Léisung schreiwen mat intelligenten Datenreinigung, Lagerung an Extraktiounstechniken. Schlussendlech kënnt et op eng Käschte-Virdeeler Analyse erof.

Wat ech geléiert hunn:

  • et gëtt kee bëllege Wee fir 25 TB gläichzäiteg ze analyséieren;
  • Sief virsiichteg mat der Gréisst vun Äre Parquet Dateien an hir Organisatioun;
  • Partitionen am Spark mussen ausgeglach sinn;
  • Am Allgemengen, probéiert ni 2,5 Millioune Partitionen ze maachen;
  • Sortéieren ass nach ëmmer schwéier, sou wéi och Spark opzestellen;
  • heiansdo speziell daten verlaangt speziell Léisungen;
  • Spark Aggregatioun ass séier, awer d'Partitionéierung ass nach ëmmer deier;
  • schlof net wann se Iech d'Basis léieren, iergendeen huet Äre Problem wahrscheinlech schonn an den 1980er geléist;
  • gnu parallel - dat ass eng magesch Saach, jidderee soll et benotzen;
  • Spark huet onkompriméiert Donnéeën gär a kombinéiere net gär Partitionen;
  • Spark huet ze vill Overhead wann Dir einfache Probleemer léist;
  • AWK assoziativ Arrays si ganz effizient;
  • Dir kënnt Kontakt stdin и stdout vun engem R Skript, an dofir benotzen se an der Pipeline;
  • Dank Smart Wee Ëmsetzung, kann S3 vill Fichieren Prozess;
  • Den Haaptgrond fir Zäit ze verschwenden ass virzäiteg Är Späichermethod ze optimiséieren;
  • probéiert net d'Aufgaben manuell ze optimiséieren, loosst de Computer et maachen;
  • D'API sollt einfach sinn fir d'Liichtegkeet an d'Flexibilitéit vum Gebrauch;
  • Wann Är Donnéeën gutt virbereet sinn, wäert Cache einfach sinn!

Source: will.com

Setzt e Commentaire