Parsing 25TB with AWK and R

Parsing 25TB with AWK and R
How to read this article: I apologize for the fact that the text turned out to be so long and chaotic. To save you time, I begin each chapter with an introduction, "What I Learned," in which I summarize the essence of the chapter in one or two sentences.

"Just show me the solution!" If you just want to see where I've come from, then skip to the Getting Creative chapter, but I think it's more interesting and useful to read about failures.

Recently, I was assigned to set up a process for processing a large amount of raw DNA sequences (technically a SNP chip). It was necessary to quickly obtain data on a given genetic location (which is called SNP) for subsequent modeling and other tasks. With the help of R and AWK, I was able to clean up and organize the data in a natural way, greatly speeding up query processing. It was not easy for me and required numerous iterations. This article will help you avoid some of my mistakes and show you what I ended up with.

First, some introductory explanations.

Data

Our university genetic information processing center provided us with data in the form of TSV with a volume of 25 Tb. I received them broken into 5 Gzip-compressed packages, each of which contained about 240 four-gigabyte files. Each row contained data for one SNP per person. In total, data on ~2,5 million SNPs and ~60 thousand people were transmitted. In addition to SNP information, the files contained numerous columns with numbers reflecting various characteristics, such as reading intensity, frequency of different alleles, and so on. In total there were about 30 columns with unique values.

Goal

As with any data management project, the most important thing was to determine how the data would be used. In this case we will for the most part match models and workflows for SNP based on SNP. That is, we will need data for only one SNP at a time. I had to learn how to extract all the records related to one of the 2,5 million SNPs as simply, quickly and cheaply as possible.

How not to

To quote a fitting clichΓ©:

I didn't fail a thousand times, I just discovered a thousand ways not to parse a bunch of data in a query-friendly format.

First try

What have I learned: there is no cheap way to parse 25 TB at a time.

After listening to the Advanced Big Data Processing course at Vanderbilt University, I was sure that it was in the bag. It will probably take an hour or two to set up the Hive server in order to run through all the data and report on the result. Since our data is stored in AWS S3, I used the service Athena, which allows you to apply Hive SQL queries to S3 data. There is no need to set up/raise the Hive cluster, and you only pay for the data you are looking for.

After I showed Athena my data and its format, I ran several tests with queries like this:

select * from intensityData limit 10;

And quickly got well-structured results. Ready.

Until we tried to use the data at work ...

I was asked to pull out all the information on the SNP in order to test the model on it. I ran a query:


select * from intensityData 
where snp = 'rs123456';

... and began to wait. After eight minutes and more than 4 TB of requested data, I got the result. Athena charges for the amount of data it finds, at $5 per terabyte. So this single request cost $20 and an eight minute wait. To run the model through all the data, we had to wait 38 years and pay $ 50 million. Obviously, this did not suit us.

Should have used Parquet...

What have I learned: Be careful with the size of your Parquet files and their organization.

I first tried to remedy the situation by converting all TSVs to Parquet files. They are convenient for working with large data sets, because the information in them is stored in a columnar form: each column lies in its own memory / disk segment, unlike text files, in which the lines contain the elements of each column. And if you need to find something, then just read the necessary column. In addition, each file stores a range of values ​​in a column, so if the value you are looking for is not in the range of a column, Spark will not waste time scanning the entire file.

I ran a simple task AWS Glue to convert our TSVs to Parquet and dumped the new files into Athena. This took about 5 hours. But when I ran the query, it took about the same amount of time and a little less money to complete. The fact is that Spark, trying to optimize the task, simply unpacked one TSV chunk and put it into its own Parquet chunk. And since each chunk was large enough to hold the full records of many people, each file contained all the SNPs, so Spark had to open all the files to extract the necessary information.

Curiously, Parquet's default (and recommended) compression type, snappy, is not splittable. Therefore, each executor stuck on the task of unpacking and downloading the full 3,5 GB dataset.

Parsing 25TB with AWK and R

Understanding the problem

What have I learned: sorting is difficult, especially if the data is distributed.

It seemed to me that now I understood the essence of the problem. I only needed to sort the data by the SNP column, not by people. Then several SNPs will be stored in a separate data chunk, and then the β€œsmart” Parquet function β€œopen only if the value is in the range” will show itself in all its glory. Unfortunately, sorting the billions of rows scattered across the cluster proved to be a difficult task.

AWS definitely doesn't want to return the money for the reason "I'm an absent-minded student." After I ran sorting on Amazon Glue, it ran for 2 days and then failed.

What about partitioning?

What have I learned: Partitions in Spark must be balanced.

Then I came up with the idea of ​​partitioning data into chromosomes. There are 23 of them (and a few more, given the mitochondrial DNA and unmapped regions).
This will allow you to split the data into smaller chunks. If you add just one line to the Spark export function in the Glue script partition_by = "chr", then the data must be decomposed into buckets.

Parsing 25TB with AWK and R
The genome is made up of numerous fragments called chromosomes.

Unfortunately it didn't work. Chromosomes have different sizes, and therefore a different amount of information. This means that the tasks that Spark sent to the workers were not balanced and were running slowly because some nodes finished early and were idle. However, the tasks were completed. But when one SNP was requested, the imbalance again caused problems. The cost of processing SNPs on larger chromosomes (that is, where we want to get data from) has decreased by only about 10 times. A lot, but not enough.

And if you divide into even smaller partitions?

What have I learned: never try to make 2,5 million partitions at all.

I decided to go all out and partitioned each SNP. This guaranteed the same size of the partitions. WAS A BAD IDEA. I used Glue and added an innocent line partition_by = 'snp'. The task started and began to run. A day later, I checked and saw that nothing was written to S3 so far, so I killed the task. Looks like Glue was writing intermediate files to a hidden location in S3, and a lot of files, maybe a couple of million. As a result, my mistake cost more than a thousand dollars and did not please my mentor.

Partitioning + sorting

What have I learned: sorting is still difficult, as is setting up Spark.

The last attempt at partitioning was that I partitioned the chromosomes and then sorted each partition. In theory, this would speed up each request, because the desired SNP data would have to be within a few Parquet chunks within the given range. Alas, sorting even partitioned data turned out to be a difficult task. As a result, I switched to EMR for a custom cluster and used eight powerful instances (C5.4xl) and Sparklyr to create a more flexible workflow…

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

…but the task was still not completed. I set it up in various ways: increased the memory allocation for each request executor, used nodes with a large amount of memory, applied broadcast variables (broadcasting variable), but each time it turned out to be half-measures, and gradually the executors began to fail until everything stopped.

Getting creative

What have I learned: sometimes special data require special solutions.

Each SNP has a position value. This is the number corresponding to the number of bases that lie along its chromosome. This is a nice and natural way to organize our data. At first I wanted to partition by regions of each chromosome. For example, positions 1 - 2000, 2001 - 4000, etc. But the problem is that SNPs are unevenly distributed across chromosomes, so the size of the groups will therefore vary greatly.

Parsing 25TB with AWK and R

As a result, I came to the division into categories (rank) of positions. Based on the already loaded data, I ran a request for a list of unique SNPs, their positions and chromosomes. Then I sorted the data within each chromosome and assembled the SNPs into groups (bins) of a given size. Let's say 1000 SNPs. This gave me a group-on-chromosome SNP relationship.

In the end, I made groups (bin) of 75 SNPs, the reason will be explained below.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

First try with Spark

What have I learned: joining in Spark is fast, but partitioning is still expensive.

I wanted to read this small (2,5 million rows) data frame into Spark, merge it with the raw data, and then partition on the newly added column bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

I used sdf_broadcast(), so Spark knows that it should send the dataframe to all nodes. This is useful if the data is small and required for all tasks. Otherwise, Spark tries to be smart and allocate data as needed, which can cause slowdowns.

And again, my idea did not work: the tasks worked for some time, completed the union, and then, like the executors launched by partitioning, they began to fail.

I add AWK

What have I learned: Don't sleep when you are being taught the basics. Surely someone already solved your problem back in the 1980s.

Up to this point, the reason for all my failures with Spark was the jumble of data in the cluster. Perhaps the situation can be improved with preprocessing. I decided to try splitting the raw text data into columns of chromosomes, so I was hoping to provide Spark with "pre-partitioned" data.

I searched StackOverflow for how to split by column values ​​and found such a great answer. With AWK, you can split a text file by column values ​​by writing it in a script instead of sending the results to stdout.

I wrote a Bash script for testing. Downloaded one of the packaged TSVs, then unpacked it with gzip and sent to awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

It worked!

Filling the cores

What have I learned: gnu parallel is a magical thing, everyone should use it.

The separation was rather slow, and when I ran htopto test using a powerful (and expensive) EC2 instance, it turned out that I was using only one core and about 200 MB of memory. To solve the problem and not lose a lot of money, it was necessary to figure out how to parallelize the work. Fortunately, in an absolutely amazing book Data Science at the Command Line Jeron Janssens, I found a chapter on parallelization. From it I learned about gnu parallel, a very flexible way to implement multithreading in Unix.

Parsing 25TB with AWK and R
When I started the partition with a new process, everything was fine, but there was a bottleneck - downloading S3 objects to disk was not too fast and not fully parallelized. To fix this, I did this:

  1. I found out that it is possible to implement the S3 download stage directly in the pipeline, completely eliminating intermediate storage on disk. This means that I can avoid writing raw data to disk and use even smaller and therefore cheaper storage on AWS.
  2. Team aws configure set default.s3.max_concurrent_requests 50 greatly increased the number of threads that AWS CLI uses (the default is 10).
  3. Switched to a network speed-optimized EC2 instance, with the letter n in the name. I have found that the loss in processing power from using n-instances is more than offset by the increase in download speed. For most tasks, I used c5n.4xl.
  4. Changed gzip on pigz, it's a gzip tool that can do cool things to parallelize the inherently non-parallelized task of decompressing files (this helped the least).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

These steps are combined with each other to make everything work very quickly. Thanks to increased download speeds and no disk writes, I was now able to process a 5 terabyte package in just a few hours.

This tweet should have mentioned 'TSV'. Alas.

Using newly parsed data

What have I learned: Spark likes uncompressed data and doesn't like to combine partitions.

Now the data was in S3 in an uncompressed (read: shared) and semi-ordered format, and I could return to Spark again. I was in for a surprise: I again failed to achieve what I wanted! It was very difficult to tell Spark exactly how the data was partitioned. And even when I did it, it turned out that there were too many partitions (95 thousand), and when I used coalesce reduced their number to reasonable limits, it broke my partitioning. I'm sure this can be fixed, but after a couple of days of searching, I could not find a solution. I eventually completed all the tasks in Spark, although it took some time and my split Parquet files were not very small (~200Kb). However, the data was where it needed to be.

Parsing 25TB with AWK and R
Too small and uneven, wonderful!

Testing local Spark queries

What have I learned: There is too much overhead in Spark when solving simple problems.

By loading the data in a thoughtful format, I was able to test the speed. I set up an R script to start a local Spark server, and then loaded the Spark data frame from the specified Parquet group storage (bin). I tried to load all the data but couldn't get Sparklyr to recognize the partitioning.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

The execution took 29,415 seconds. Much better, but not too good for mass testing anything. Also, I couldn't speed things up with caching, because when I tried to cache a data frame in memory, Spark always crashed, even when I allocated more than 50 GB of memory for a dataset that weighed less than 15.

Return to AWK

What have I learned: Associative arrays in AWK are very efficient.

I realized that I could achieve higher speed. I remembered that in a wonderful AWK guide by Bruce Barnett I read about a cool feature called "associative arrays". In fact, these are key-value pairs, which for some reason were called differently in AWK, and therefore I somehow didn’t remember them much. Roman Chepliaka recalled that the term "associative arrays" is much older than the term "key-value pair". Even if you look up the key-value on Google Ngram, you will not see this term there, but you will find associative arrays! In addition, the "key-value pair" is most often associated with databases, so it is much more logical to compare with a hashmap. I realized that I could use these associative arrays to link my SNPs to the bin table and raw data without using Spark.

To do this, in the AWK script, I used the block BEGIN. This is a piece of code that is executed before the first line of data is passed to the main body of the script.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Team while(getline...) loaded all rows from the CSV group (bin), set the first column (SNP name) as the key for the associative array bin and the second value (group) as the value. Then in the block { }, which is executed for all lines of the main file, each line is sent to the output file, which receives a unique name depending on its group (bin): ..._bin_"bin[$1]"_....

Variables batch_num ΠΈ chunk_id matched the data provided by the pipeline, thus avoiding a race condition, and each thread of execution launched parallel, wrote to its own unique file.

Since I had all the raw data scattered into folders by chromosomes left over from my previous experiment with AWK, I could now write another Bash script to process a chromosome at a time and give deeper partitioned data to S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

The script has two sections parallel.

In the first section, data is read from all files containing information on the desired chromosome, then this data is distributed among streams that scatter the files into the appropriate groups (bin). To avoid race conditions when multiple threads write to the same file, AWK passes filenames for writing data to different places, for example, chr_10_bin_52_batch_2_aa.csv. As a result, many small files are created on the disk (for this I used terabyte EBS volumes).

Pipeline from the second section parallel goes through groups (bin) and combines their individual files into common CSV c catand then sends them for export.

Broadcasting in R?

What have I learned: you can refer to stdin ΠΈ stdout from an R script, and therefore use it in the pipeline.

In the Bash script, you may have noticed this line: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... It translates all the concatenated group files (bin) into the following R script. {} is a special technique parallel, which inserts any data it sends to the specified stream directly into the command itself. Option {#} provides a unique thread ID, and {%} represents the slot number of the job (repeated, but never at the same time). A list of all options can be found in documentation.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

When the variable file("stdin") transferred to readr::read_csv, the data translated into the R script is loaded into a frame, which is then in the form .rds-file using aws.s3 recorded directly in S3.

RDS is like a junior version of Parquet, without the frills of columnar storage.

After completing the bash script, I got a bundle .rds-files residing in S3, which allowed me to use efficient compression and built-in types.

Despite the use of brake R, everything worked very quickly. Not surprisingly, the R fragments responsible for reading and writing data are well optimized. After testing on one medium-sized chromosome, the job completed on a C5n.4xl instance in about two hours.

S3 restrictions

What have I learned: With smart path implementation, S3 can handle many files.

I was worried that S3 would be able to process the many files that were passed to it. I could make the filenames meaningful, but how would S3 look up them?

Parsing 25TB with AWK and R
Folders in S3 are just for beauty, in fact the system is not interested in the symbol /. From the S3 FAQ page.

It looks like S3 represents the path to a particular file as a simple key in some kind of hash table or document-based database. A bucket can be thought of as a table, and files can be thought of as records in that table.

Since speed and efficiency are essential to making money at Amazon, it's no surprise that this key-as-path-to-file system is awesomely optimized. I tried to strike a balance: not having to make a lot of get requests, but still making the requests fast. It turned out that it is best to do about 20 thousand bin files. I think if you continue to optimize, you can achieve an increase in speed (for example, make a special bucket only for data, thus reducing the size of the lookup table). But there was no time and money for further experiments.

What about cross compatibility?

What I learned: The main reason for wasting time is prematurely optimizing your storage method.

At this point, it's important to ask yourself, "Why use a proprietary file format?" The reason lies in the download speed (gzip-packed CSV files took 7 times longer to load) and compatibility with our workflows. I might reconsider if R can easily load Parquet (or Arrow) files without the burden of Spark. In our lab, everyone uses R, and if I need to convert the data to another format, I still have the original text data, so I can just run the pipeline again.

Division of work

What have I learned: Don't try to optimize tasks manually, let the computer do it.

I debugged the workflow on one chromosome, now I need to process all the other data.
I wanted to raise several EC2 instances for conversion, but at the same time I was afraid of getting extremely unbalanced load in different processing jobs (in the same way that Spark suffered from unbalanced partitions). In addition, I was reluctant to raise one instance per chromosome, because AWS accounts have a default limit of 10 instances.

Then I decided to write an R script to optimize processing tasks.

First, I asked S3 to calculate how much storage space each chromosome takes up.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Then I wrote a function that takes the total size, shuffles the order of the chromosomes, divides them into groups num_jobs and reports how the sizes of all processing jobs differ.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 Γ— 3]>

Then I ran XNUMX shuffles with purrr and picked the best one.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

So I got a set of tasks, very similar in size. Then all that was left was to wrap my previous Bash script in a big loop for. It took about 10 minutes to write this optimization. And this is much less than I would spend on manually creating tasks in the case of their imbalance. Therefore, I think that I did not lose with this preliminary optimization.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

At the end I add the shutdown command:

sudo shutdown -h now

… and everything turned out! Using AWS CLI, I raised instances and through the option user_data handed them the Bash scripts of their jobs for processing. They ran and shut down automatically, so I didn't pay for the extra processing power.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Packing!

What have I learned: The API should be kept simple for the sake of simplicity and flexibility of use.

Finally, I got the data in the right place and form. It remained to simplify the process of using the data as much as possible, so that it would be easier for my colleagues. I wanted to make a simple API for making requests. If in the future I decide to switch from .rds on Parquet files, then this should be a problem for me, not for colleagues. To do this, I decided to make an internal R-package.

Compiled and documented a very simple package containing just a few functions to access the data collected around the function get_snp. I also made a website for my colleagues pkgdownso they can easily view examples and documentation.

Parsing 25TB with AWK and R

Smart caching

What have I learned: if your data is well prepared, caching will be easy!

Since one of the main workflows applied the same analysis model to the SNP batch, I decided to use binning to my advantage. When transmitting data via SNP, all information from the group (bin) is attached to the returned object. That is, old requests can (in theory) speed up the processing of new requests.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

When building the package, I ran a lot of benchmarks to compare the speed when using different methods. I recommend not to neglect this, because sometimes the results are unexpected. For example, dplyr::filter was much faster than capturing rows using index-based filtering, and getting a single column from the filtered data frame was much faster than using the index syntax.

Please note that the object prev_snp_results contains the key snps_in_bin. This is an array of all unique SNPs in a group (bin), allowing you to quickly check if there is already data from a previous request. It also makes it easy to loop through all the SNPs in a group (bin) with this code:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

The results

Now we can (and have started in earnest) to run models and scenarios that were previously inaccessible to us. The best thing is that my lab colleagues don't have to think about any complications. They just have a working function.

And although the package spares them details, I tried to make the data format simple enough that they could figure it out if I suddenly disappeared tomorrow ...

The speed has increased markedly. We usually scan functionally significant fragments of the genome. Previously, we could not do this (it turned out to be too expensive), but now, thanks to the group (bin) structure and caching, it takes on average less than 0,1 seconds to query one SNP, and data usage is so low that S3 costs are cheap.

Conclusion

This article is not a guide at all. The decision turned out to be individual, and almost certainly not optimal. Rather, it is a story about a journey. I want others to understand that decisions like this don't come fully formed in the head, it's the result of trial and error. Also, if you're looking for a data scientist, keep in mind that it takes experience to use these tools effectively, and experience costs money. I'm happy that I had the funds to pay, but many others who can do the same job better than me will never have the opportunity because they don't have the money to even try.

Big data tools are versatile. If you have the time, you can almost certainly write a faster solution using smart data cleansing, storage, and retrieval techniques. Ultimately, it all comes down to cost-benefit analysis.

What I have learned:

  • there is no cheap way to parse 25 TB at a time;
  • be careful with the size of your Parquet files and their organization;
  • partitions in Spark must be balanced;
  • never try to make 2,5 million partitions at all;
  • sorting is still difficult, as is setting up Spark;
  • sometimes special data require special solutions;
  • joining in Spark is fast, but partitioning is still expensive;
  • do not sleep when you are taught the basics, surely someone already solved your problem back in the 1980s;
  • gnu parallel - this is a magical thing, everyone should use it;
  • Spark loves uncompressed data and doesn't like to combine partitions;
  • there is too much overhead in Spark when solving simple problems;
  • associative arrays in AWK are very efficient;
  • you can contact stdin ΠΈ stdout from an R script, and therefore use it in the pipeline;
  • due to the smart implementation of paths, S3 can process many files;
  • the main reason for wasting time is premature optimization of your storage method;
  • do not try to optimize tasks manually, let the computer do it;
  • The API should be simple for the sake of simplicity and flexibility of use;
  • if your data is well prepared, caching will be easy!

Source: habr.com

Add a comment