Phân tích cú pháp 25TB bằng AWK và R

Phân tích cú pháp 25TB bằng AWK và R
Làm thế nào để đọc bài viết này: Tôi xin lỗi vì văn bản quá dài và lộn xộn. Để tiết kiệm thời gian cho bạn, tôi bắt đầu mỗi chương bằng phần giới thiệu “Những gì tôi đã học được”, tóm tắt nội dung chính của chương trong một hoặc hai câu.

“Chỉ cho tôi giải pháp thôi!” Nếu bạn chỉ muốn biết tôi đến từ đâu thì hãy bỏ qua chương “Trở nên sáng tạo hơn”, nhưng tôi nghĩ đọc về thất bại sẽ thú vị và hữu ích hơn.

Gần đây tôi được giao nhiệm vụ thiết lập một quy trình xử lý một lượng lớn chuỗi DNA thô (về mặt kỹ thuật là chip SNP). Nhu cầu là nhanh chóng thu được dữ liệu về một vị trí di truyền nhất định (được gọi là SNP) để lập mô hình tiếp theo và các nhiệm vụ khác. Bằng cách sử dụng R và AWK, tôi có thể dọn dẹp và sắp xếp dữ liệu một cách tự nhiên, tăng tốc đáng kể quá trình xử lý truy vấn. Điều này không hề dễ dàng đối với tôi và cần phải lặp đi lặp lại nhiều lần. Bài viết này sẽ giúp bạn tránh được một số sai lầm của tôi và cho bạn thấy những gì tôi đã mắc phải.

Đầu tiên, một số giải thích giới thiệu.

Dữ liệu

Trung tâm xử lý thông tin di truyền ở trường đại học của chúng tôi đã cung cấp cho chúng tôi dữ liệu dưới dạng TSV 25 TB. Tôi nhận được chúng được chia thành 5 gói, được nén bằng Gzip, mỗi gói chứa khoảng 240 tệp 2,5 gigabyte. Mỗi hàng chứa dữ liệu cho một SNP từ một cá nhân. Tổng cộng, dữ liệu về ~60 triệu SNP và ~30 nghìn người đã được truyền đi. Ngoài thông tin SNP, các tệp còn chứa nhiều cột với các số phản ánh các đặc điểm khác nhau, chẳng hạn như cường độ đọc, tần số của các alen khác nhau, v.v. Tổng cộng có khoảng XNUMX cột với các giá trị duy nhất.

mục tiêu

Giống như bất kỳ dự án quản lý dữ liệu nào, điều quan trọng nhất là xác định cách sử dụng dữ liệu. Trong trường hợp này chúng tôi chủ yếu sẽ chọn mô hình và quy trình làm việc cho SNP dựa trên SNP. Tức là chúng ta sẽ chỉ cần dữ liệu trên một SNP tại một thời điểm. Tôi phải học cách truy xuất tất cả các bản ghi liên quan đến một trong 2,5 triệu SNP một cách dễ dàng, nhanh chóng và ít tốn kém nhất có thể.

Làm thế nào để không làm điều này

Để trích dẫn một lời sáo rỗng phù hợp:

Tôi không thất bại hàng nghìn lần, tôi chỉ khám phá ra hàng nghìn cách để tránh phân tích cú pháp một loạt dữ liệu theo định dạng thân thiện với truy vấn.

Lần thử đầu tiên

Tôi đã học được gì: Không có cách nào rẻ để phân tích 25 TB mỗi lần.

Sau khi tham gia khóa học “Các phương pháp nâng cao để xử lý dữ liệu lớn” tại Đại học Vanderbilt, tôi chắc chắn rằng thủ thuật này đã sẵn sàng. Có thể sẽ mất một hoặc hai giờ để thiết lập máy chủ Hive để chạy qua tất cả dữ liệu và báo cáo kết quả. Vì dữ liệu của chúng tôi được lưu trữ trong AWS S3 nên tôi đã sử dụng dịch vụ Athena, cho phép bạn áp dụng truy vấn Hive SQL cho dữ liệu S3. Bạn không cần thiết lập/tăng cụm Hive và bạn cũng chỉ trả tiền cho dữ liệu bạn đang tìm kiếm.

Sau khi cho Athena xem dữ liệu của mình và định dạng của nó, tôi đã chạy một số thử nghiệm với các truy vấn như thế này:

select * from intensityData limit 10;

Và nhanh chóng nhận được kết quả có cấu trúc tốt. Sẵn sàng.

Cho đến khi chúng tôi cố gắng sử dụng dữ liệu trong công việc của mình...

Tôi được yêu cầu lấy tất cả thông tin SNP để kiểm tra mô hình. Tôi đã chạy truy vấn:


select * from intensityData 
where snp = 'rs123456';

...và bắt đầu chờ đợi. Sau tám phút và hơn 4 TB dữ liệu được yêu cầu, tôi đã nhận được kết quả. Athena tính phí theo khối lượng dữ liệu được tìm thấy, 5 USD mỗi terabyte. Vì vậy, yêu cầu duy nhất này tốn 20 USD và 38 phút chờ đợi. Để chạy mô hình trên tất cả dữ liệu, chúng tôi phải đợi 50 năm và trả XNUMX triệu USD. Rõ ràng, điều này không phù hợp với chúng tôi.

Cần phải sử dụng sàn gỗ...

Tôi đã học được gì: Hãy cẩn thận với kích thước tệp Parquet của bạn và cách sắp xếp chúng.

Lần đầu tiên tôi cố gắng khắc phục tình trạng này bằng cách chuyển đổi tất cả TSV thành Tập tin sàn gỗ. Chúng thuận tiện khi làm việc với các tập dữ liệu lớn vì thông tin trong chúng được lưu trữ ở dạng cột: mỗi cột nằm trong đoạn bộ nhớ/đĩa riêng, trái ngược với các tệp văn bản, trong đó các hàng chứa các phần tử của mỗi cột. Và nếu bạn cần tìm thứ gì đó thì chỉ cần đọc cột bắt buộc. Ngoài ra, mỗi tệp lưu trữ một phạm vi giá trị trong một cột, vì vậy nếu giá trị bạn đang tìm kiếm không nằm trong phạm vi của cột, Spark sẽ không lãng phí thời gian quét toàn bộ tệp.

Tôi đã thực hiện một nhiệm vụ đơn giản Keo AWS để chuyển đổi TSV của chúng tôi thành Parquet và thả các tệp mới vào Athena. Mất khoảng 5 giờ. Nhưng khi tôi chạy yêu cầu, tôi mất khoảng thời gian tương tự và tốn ít tiền hơn một chút để hoàn thành. Thực tế là Spark, khi cố gắng tối ưu hóa nhiệm vụ, chỉ cần giải nén một đoạn TSV và đặt nó vào đoạn Parquet của riêng nó. Và vì mỗi chunk đủ lớn để chứa toàn bộ hồ sơ của nhiều người, mỗi file chứa tất cả SNP nên Spark phải mở tất cả các file để trích xuất thông tin cần thiết.

Điều thú vị là kiểu nén mặc định (và được khuyến nghị) của Parquet, linh hoạt, không thể chia tách được. Do đó, mỗi người thực thi bị mắc kẹt trong nhiệm vụ giải nén và tải xuống toàn bộ tập dữ liệu 3,5 GB.

Phân tích cú pháp 25TB bằng AWK và R

Hãy hiểu vấn đề

Tôi đã học được gì: Việc sắp xếp rất khó khăn, đặc biệt nếu dữ liệu bị phân tán.

Đối với tôi, dường như bây giờ tôi đã hiểu được bản chất của vấn đề. Tôi chỉ cần sắp xếp dữ liệu theo cột SNP chứ không phải theo người. Sau đó, một số SNP sẽ được lưu trữ trong một đoạn dữ liệu riêng biệt và sau đó chức năng “thông minh” của Parquet “chỉ mở nếu giá trị nằm trong phạm vi” sẽ hiển thị hết mức vinh quang. Thật không may, việc sắp xếp hàng tỷ hàng nằm rải rác trong một cụm tỏ ra là một nhiệm vụ khó khăn.

AWS chắc chắn không muốn hoàn lại tiền vì lý do "Tôi là một sinh viên mất tập trung". Sau khi tôi chạy phân loại trên Amazon Glue, nó chạy được 2 ngày và bị lỗi.

Còn việc phân vùng thì sao?

Tôi đã học được gì: Các phân vùng trong Spark phải được cân bằng.

Sau đó tôi nảy ra ý tưởng phân vùng dữ liệu theo nhiễm sắc thể. Có 23 trong số đó (và nhiều hơn nữa nếu bạn tính đến DNA ty thể và các vùng chưa được lập bản đồ).
Điều này sẽ cho phép bạn chia dữ liệu thành các phần nhỏ hơn. Nếu bạn chỉ thêm một dòng vào chức năng xuất Spark trong tập lệnh Glue partition_by = "chr", thì dữ liệu sẽ được chia thành các nhóm.

Phân tích cú pháp 25TB bằng AWK và R
Bộ gen bao gồm nhiều đoạn gọi là nhiễm sắc thể.

Thật không may, nó không hoạt động. Nhiễm sắc thể có kích thước khác nhau, có nghĩa là lượng thông tin khác nhau. Điều này có nghĩa là các nhiệm vụ mà Spark gửi cho công nhân không được cân bằng và hoàn thành chậm do một số nút hoàn thành sớm và không hoạt động. Tuy nhiên, các nhiệm vụ đã được hoàn thành. Nhưng khi yêu cầu một SNP, sự mất cân bằng lại gây ra vấn đề. Chi phí xử lý SNP trên các nhiễm sắc thể lớn hơn (tức là nơi chúng ta muốn lấy dữ liệu) chỉ giảm khoảng 10 lần. Rất nhiều, nhưng không đủ.

Điều gì sẽ xảy ra nếu chúng ta chia nó thành những phần nhỏ hơn nữa?

Tôi đã học được gì: Đừng bao giờ thử thực hiện 2,5 triệu phân vùng.

Tôi quyết định dốc toàn lực và phân vùng từng SNP. Điều này đảm bảo rằng các phân vùng có kích thước bằng nhau. ĐÓ LÀ MỘT Ý TƯỞNG TỒI. Tôi đã sử dụng Keo và thêm một dòng vô tội partition_by = 'snp'. Nhiệm vụ bắt đầu và bắt đầu thực hiện. Một ngày sau tôi kiểm tra và thấy vẫn chưa có gì được ghi vào S3 nên tôi đã kill task. Có vẻ như Glue đang ghi các tệp trung gian vào một vị trí ẩn trong S3, rất nhiều tệp, có lẽ là vài triệu. Kết quả là sai lầm của tôi đã tiêu tốn hơn một nghìn đô la và không làm hài lòng người cố vấn của tôi.

Phân vùng + sắp xếp

Tôi đã học được gì: Việc sắp xếp vẫn còn khó khăn, điều chỉnh Spark cũng vậy.

Nỗ lực phân vùng cuối cùng của tôi liên quan đến việc tôi phân vùng các nhiễm sắc thể và sau đó sắp xếp từng phân vùng. Về lý thuyết, điều này sẽ tăng tốc mỗi truy vấn vì dữ liệu SNP mong muốn phải nằm trong một vài khối Parquet trong một phạm vi nhất định. Thật không may, việc sắp xếp dữ liệu thậm chí được phân vùng hóa ra lại là một nhiệm vụ khó khăn. Do đó, tôi đã chuyển sang EMR cho cụm tùy chỉnh và sử dụng 5.4 phiên bản mạnh mẽ (CXNUMXxl) và Sparklyr để tạo quy trình làm việc linh hoạt hơn...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...tuy nhiên, nhiệm vụ vẫn chưa hoàn thành. Tôi đã định cấu hình nó theo nhiều cách khác nhau: tăng phân bổ bộ nhớ cho từng trình thực thi truy vấn, sử dụng các nút có dung lượng bộ nhớ lớn, sử dụng các biến quảng bá (biến quảng bá), nhưng mỗi lần chúng đều trở thành biện pháp nửa vời và dần dần những người thực thi bắt đầu thất bại cho đến khi mọi thứ dừng lại.

Tôi đang trở nên sáng tạo hơn

Tôi đã học được gì: Đôi khi dữ liệu đặc biệt đòi hỏi các giải pháp đặc biệt.

Mỗi SNP có một giá trị vị trí. Đây là con số tương ứng với số lượng bazơ dọc theo nhiễm sắc thể của nó. Đây là một cách hay và tự nhiên để sắp xếp dữ liệu của chúng tôi. Lúc đầu tôi muốn phân chia theo vùng của từng nhiễm sắc thể. Ví dụ: vị trí 1 - 2000, 2001 - 4000, v.v. Nhưng vấn đề là SNP không được phân bố đồng đều trên các nhiễm sắc thể, do đó kích thước nhóm sẽ rất khác nhau.

Phân tích cú pháp 25TB bằng AWK và R

Kết quả là tôi đã phân chia các vị trí thành các danh mục (thứ hạng). Sử dụng dữ liệu đã tải xuống, tôi đã thực hiện yêu cầu lấy danh sách các SNP duy nhất, vị trí và nhiễm sắc thể của chúng. Sau đó, tôi sắp xếp dữ liệu trong mỗi nhiễm sắc thể và thu thập SNP thành các nhóm (thùng) có kích thước nhất định. Giả sử mỗi cái có 1000 SNP. Điều này đã mang lại cho tôi mối quan hệ giữa SNP với nhóm trên mỗi nhiễm sắc thể.

Cuối cùng, tôi đã tạo các nhóm (bin) gồm 75 SNP, lý do sẽ được giải thích bên dưới.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

Lần đầu tiên thử với Spark

Tôi đã học được gì: Tập hợp Spark nhanh nhưng việc phân vùng vẫn tốn kém.

Tôi muốn đọc khung dữ liệu nhỏ (2,5 triệu hàng) này vào Spark, kết hợp nó với dữ liệu thô và sau đó phân vùng theo cột mới được thêm vào bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

Tôi đã sử dụng sdf_broadcast(), vì vậy Spark biết rằng nó sẽ gửi khung dữ liệu đến tất cả các nút. Điều này rất hữu ích nếu dữ liệu có kích thước nhỏ và cần thiết cho mọi tác vụ. Nếu không, Spark cố gắng tỏ ra thông minh và phân phối dữ liệu khi cần, điều này có thể gây ra tình trạng chậm lại.

Và một lần nữa, ý tưởng của tôi đã không thành công: các nhiệm vụ đã hoạt động được một thời gian, hoàn thành việc kết hợp, và sau đó, giống như những người thực thi được khởi chạy bằng cách phân vùng, chúng bắt đầu thất bại.

Thêm AWK

Tôi đã học được gì: Đừng ngủ khi bạn đang được dạy những điều cơ bản. Chắc chắn ai đó đã giải quyết được vấn đề của bạn vào những năm 1980.

Cho đến thời điểm này, nguyên nhân dẫn đến mọi thất bại của tôi với Spark là do dữ liệu lộn xộn trong cụm. Có lẽ tình hình có thể được cải thiện bằng cách điều trị trước. Tôi quyết định thử chia dữ liệu văn bản thô thành các cột nhiễm sắc thể, vì vậy tôi hy vọng có thể cung cấp cho Spark dữ liệu “được phân vùng trước”.

Tôi đã tìm kiếm trên StackOverflow về cách chia theo giá trị cột và tìm thấy một câu trả lời tuyệt vời như vậy Với AWK, bạn có thể chia tệp văn bản theo giá trị cột bằng cách viết nó dưới dạng tập lệnh thay vì gửi kết quả tới stdout.

Tôi đã viết một tập lệnh Bash để dùng thử. Đã tải xuống một trong các TSV đã đóng gói, sau đó giải nén nó bằng cách sử dụng gzip và gửi đến awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

Nó đã làm việc!

Làm đầy lõi

Tôi đã học được gì: gnu parallel - đó là một điều kỳ diệu, mọi người nên sử dụng.

Sự tách biệt diễn ra khá chậm và khi tôi bắt đầu htopđể kiểm tra việc sử dụng phiên bản EC2 mạnh mẽ (và đắt tiền), hóa ra tôi chỉ sử dụng một lõi và khoảng 200 MB bộ nhớ. Để giải quyết vấn đề và không bị mất nhiều tiền, chúng tôi phải tìm cách song song hóa công việc. May mắn thay, trong một cuốn sách hoàn toàn tuyệt vời Khoa học dữ liệu tại Dòng lệnh Tôi tìm thấy một chương của Jeron Janssens về song song hóa. Từ đó tôi đã học được về gnu parallel, một phương pháp rất linh hoạt để triển khai đa luồng trong Unix.

Phân tích cú pháp 25TB bằng AWK và R
Khi tôi bắt đầu phân vùng bằng quy trình mới, mọi thứ đều ổn nhưng vẫn có một nút thắt cổ chai - việc tải các đối tượng S3 xuống đĩa không nhanh lắm và không được song song hóa hoàn toàn. Để khắc phục điều này, tôi đã làm điều này:

  1. Tôi phát hiện ra rằng có thể triển khai giai đoạn tải xuống S3 trực tiếp trong quy trình, loại bỏ hoàn toàn bộ nhớ trung gian trên đĩa. Điều này có nghĩa là tôi có thể tránh ghi dữ liệu thô vào đĩa và sử dụng bộ nhớ thậm chí còn nhỏ hơn và do đó rẻ hơn trên AWS.
  2. đội aws configure set default.s3.max_concurrent_requests 50 tăng đáng kể số lượng luồng mà AWS CLI sử dụng (theo mặc định là 10).
  3. Tôi đã chuyển sang phiên bản EC2 được tối ưu hóa cho tốc độ mạng, có chữ cái n trong tên. Tôi nhận thấy rằng việc mất khả năng xử lý khi sử dụng n phiên bản được bù đắp nhiều hơn bằng tốc độ tải tăng lên. Đối với hầu hết các tác vụ tôi đã sử dụng c5n.4xl.
  4. Đã thay đổi gzip trên pigz, đây là một công cụ gzip có thể thực hiện những điều thú vị để song song hóa nhiệm vụ giải nén tệp không song song ban đầu (điều này ít giúp ích nhất).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

Các bước này được kết hợp với nhau để mọi thứ diễn ra rất nhanh chóng. Bằng cách tăng tốc độ tải xuống và loại bỏ việc ghi đĩa, giờ đây tôi có thể xử lý gói 5 terabyte chỉ trong vài giờ.

Dòng tweet này lẽ ra phải đề cập đến 'TSV'. Than ôi.

Sử dụng dữ liệu mới được phân tích cú pháp

Tôi đã học được gì: Spark thích dữ liệu không nén và không thích kết hợp các phân vùng.

Bây giờ dữ liệu đã ở S3 ở định dạng đã giải nén (đọc: chia sẻ) và bán thứ tự, và tôi có thể quay lại Spark lần nữa. Một điều ngạc nhiên đang chờ đợi tôi: Tôi lại không đạt được điều mình mong muốn! Rất khó để nói cho Spark biết chính xác dữ liệu được phân vùng như thế nào. Và ngay cả khi tôi làm điều này, hóa ra có quá nhiều phân vùng (95 nghìn) và khi tôi sử dụng coalesce giảm số lượng của chúng xuống mức giới hạn hợp lý, điều này đã phá hủy khả năng phân vùng của tôi. Tôi chắc chắn điều này có thể khắc phục được nhưng sau vài ngày tìm kiếm, tôi không thể tìm ra giải pháp. Cuối cùng tôi đã hoàn thành tất cả các nhiệm vụ trong Spark, mặc dù phải mất một thời gian và các tệp Parquet được chia nhỏ của tôi không nhỏ lắm (~ 200 KB). Tuy nhiên, dữ liệu đã ở đúng nơi cần thiết.

Phân tích cú pháp 25TB bằng AWK và R
Quá nhỏ và không đồng đều, tuyệt vời!

Kiểm tra các truy vấn Spark cục bộ

Tôi đã học được gì: Spark tốn quá nhiều chi phí khi giải quyết các vấn đề đơn giản.

Bằng cách tải xuống dữ liệu ở định dạng thông minh, tôi có thể kiểm tra tốc độ. Thiết lập tập lệnh R để chạy máy chủ Spark cục bộ, sau đó tải khung dữ liệu Spark từ bộ lưu trữ (bin) nhóm Parquet được chỉ định. Tôi đã cố tải tất cả dữ liệu nhưng không thể khiến Sparklyr nhận ra phân vùng.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

Quá trình thực hiện mất 29,415 giây. Tốt hơn nhiều, nhưng không quá tốt để thử nghiệm hàng loạt bất cứ thứ gì. Ngoài ra, tôi không thể tăng tốc mọi thứ bằng bộ nhớ đệm vì khi tôi cố gắng lưu khung dữ liệu vào bộ nhớ, Spark luôn gặp sự cố, ngay cả khi tôi phân bổ hơn 50 GB bộ nhớ cho tập dữ liệu có trọng lượng nhỏ hơn 15.

Quay lại AWK

Tôi đã học được gì: Mảng kết hợp trong AWK rất hiệu quả.

Tôi nhận ra rằng tôi có thể đạt được tốc độ cao hơn. Tôi nhớ lại điều đó một cách tuyệt vời Hướng dẫn AWK của Bruce Barnett Tôi đọc về một tính năng thú vị tên là “mảng kết hợp" Về cơ bản, đây là các cặp khóa-giá trị, vì lý do nào đó được gọi khác nhau trong AWK và do đó, bằng cách nào đó, tôi không nghĩ nhiều về chúng. La Mã Cheplyaka nhớ lại rằng thuật ngữ “mảng kết hợp” cũ hơn nhiều so với thuật ngữ “cặp khóa-giá trị”. Kể cả nếu bạn tra cứu khóa-giá trị trong Google Ngram, bạn sẽ không thấy thuật ngữ này ở đó, nhưng bạn sẽ tìm thấy các mảng kết hợp! Ngoài ra, “cặp khóa-giá trị” thường được liên kết với cơ sở dữ liệu nhiều nhất, do đó, việc so sánh nó với một bản đồ băm sẽ hợp lý hơn nhiều. Tôi nhận ra rằng tôi có thể sử dụng các mảng kết hợp này để liên kết SNP của mình với bảng bin và dữ liệu thô mà không cần sử dụng Spark.

Để làm điều này, trong tập lệnh AWK tôi đã sử dụng khối BEGIN. Đây là đoạn mã được thực thi trước khi dòng dữ liệu đầu tiên được chuyển đến phần chính của tập lệnh.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

Đội while(getline...) đã tải tất cả các hàng từ nhóm CSV (bin), đặt cột đầu tiên (tên SNP) làm khóa cho mảng kết hợp bin và giá trị thứ hai (nhóm) làm giá trị. Sau đó trong khối { }, được thực thi trên tất cả các dòng của tệp chính, mỗi dòng được gửi đến tệp đầu ra, tệp này nhận một tên duy nhất tùy thuộc vào nhóm (bin) của nó: ..._bin_"bin[$1]"_....

Biến batch_num и chunk_id khớp với dữ liệu do đường ống cung cấp, tránh tình trạng tương tranh và mỗi luồng thực thi đang chạy parallel, được ghi vào tập tin duy nhất của riêng nó.

Vì tôi đã phân tán tất cả dữ liệu thô vào các thư mục trên nhiễm sắc thể còn sót lại từ thử nghiệm trước đây với AWK nên giờ đây tôi có thể viết một tập lệnh Bash khác để xử lý từng nhiễm sắc thể và gửi dữ liệu được phân vùng sâu hơn tới S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

Kịch bản có hai phần parallel.

Trong phần đầu tiên, dữ liệu được đọc từ tất cả các tệp chứa thông tin trên nhiễm sắc thể mong muốn, sau đó dữ liệu này được phân phối qua các luồng, phân phối các tệp vào các nhóm (bin) thích hợp. Để tránh tình trạng chạy đua khi nhiều luồng ghi vào cùng một tệp, AWK chuyển tên tệp để ghi dữ liệu vào các vị trí khác nhau, ví dụ: chr_10_bin_52_batch_2_aa.csv. Kết quả là, nhiều tệp nhỏ được tạo trên đĩa (để làm điều này, tôi đã sử dụng ổ EBS terabyte).

Băng tải từ phần thứ hai parallel đi qua các nhóm (bin) và kết hợp các tệp riêng lẻ của chúng vào c CSV chung catvà sau đó gửi chúng đi xuất khẩu.

Phát sóng ở R?

Tôi đã học được gì: Bạn có thể liên hệ stdin и stdout từ tập lệnh R và do đó sử dụng tập lệnh đó trong quy trình.

Bạn có thể đã nhận thấy dòng này trong tập lệnh Bash của mình: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... Nó dịch tất cả các tệp nhóm được nối (bin) sang tập lệnh R bên dưới. {} là một kỹ thuật đặc biệt parallel, chèn bất kỳ dữ liệu nào nó gửi đến luồng được chỉ định trực tiếp vào chính lệnh đó. Lựa chọn {#} cung cấp một ID luồng duy nhất và {%} đại diện cho số vị trí công việc (lặp lại, nhưng không bao giờ đồng thời). Một danh sách tất cả các tùy chọn có thể được tìm thấy trong tài liệu.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

Khi một biến file("stdin") Truyền đến readr::read_csv, dữ liệu được dịch sang tập lệnh R sẽ được tải vào một khung, sau đó có dạng .rds-file sử dụng aws.s3 được ghi trực tiếp vào S3.

RDS giống như một phiên bản cơ sở của Parquet, không có ngăn lưu trữ loa rườm rà.

Sau khi hoàn thành tập lệnh Bash, tôi nhận được một gói .rds-files nằm trong S3, cho phép tôi sử dụng các kiểu nén và tích hợp hiệu quả.

Mặc dù sử dụng phanh R nhưng mọi thứ vẫn hoạt động rất nhanh chóng. Không có gì đáng ngạc nhiên khi các phần đọc và ghi dữ liệu của R được tối ưu hóa cao độ. Sau khi thử nghiệm trên một nhiễm sắc thể cỡ trung bình, công việc đã hoàn thành trên phiên bản C5n.4xl trong khoảng hai giờ.

Hạn chế của S3

Tôi đã học được gì: Nhờ triển khai đường dẫn thông minh, S3 có thể xử lý nhiều tệp.

Tôi lo lắng liệu S3 có thể xử lý được nhiều tệp được chuyển sang nó hay không. Tôi có thể làm cho tên tệp có ý nghĩa, nhưng S3 sẽ tìm chúng như thế nào?

Phân tích cú pháp 25TB bằng AWK và R
Các thư mục trong S3 chỉ mang tính chất trưng bày, thực tế hệ thống không quan tâm đến ký hiệu /. Từ trang Câu hỏi thường gặp về S3.

Có vẻ như S3 biểu thị đường dẫn đến một tệp cụ thể dưới dạng một khóa đơn giản trong một loại bảng băm hoặc cơ sở dữ liệu dựa trên tài liệu. Một nhóm có thể được coi là một bảng và các tệp có thể được coi là các bản ghi trong bảng đó.

Vì tốc độ và hiệu quả rất quan trọng trong việc tạo ra lợi nhuận tại Amazon nên không có gì ngạc nhiên khi hệ thống đường dẫn khóa dưới dạng tệp này được tối ưu hóa một cách kỳ lạ. Tôi đã cố gắng tìm sự cân bằng: để tôi không phải đưa ra nhiều yêu cầu nhận nhưng yêu cầu đó được thực hiện nhanh chóng. Hóa ra tốt nhất nên tạo khoảng 20 nghìn tệp bin. Tôi nghĩ nếu tiếp tục tối ưu hóa, chúng tôi có thể đạt được tốc độ tăng lên (ví dụ: tạo một nhóm đặc biệt chỉ dành cho dữ liệu, do đó giảm kích thước của bảng tra cứu). Nhưng không có thời gian và tiền bạc cho những thí nghiệm tiếp theo.

Còn khả năng tương thích chéo thì sao?

Điều tôi học được: Nguyên nhân số một gây lãng phí thời gian là tối ưu hóa phương pháp lưu trữ của bạn quá sớm.

Tại thời điểm này, điều quan trọng là bạn phải tự hỏi: “Tại sao lại sử dụng định dạng tệp độc quyền?” Lý do nằm ở tốc độ tải (tệp CSV được nén lâu hơn 7 lần để tải) và khả năng tương thích với quy trình làm việc của chúng tôi. Tôi có thể xem xét lại liệu R có thể dễ dàng tải các tệp Parquet (hoặc Mũi tên) mà không cần tải Spark hay không. Mọi người trong phòng thí nghiệm của chúng tôi đều sử dụng R và nếu tôi cần chuyển đổi dữ liệu sang định dạng khác, tôi vẫn có dữ liệu văn bản gốc, vì vậy tôi có thể chạy lại quy trình.

Phân công công việc

Tôi đã học được gì: Đừng cố gắng tối ưu hóa công việc một cách thủ công, hãy để máy tính làm việc đó.

Tôi đã gỡ lỗi quy trình làm việc trên một nhiễm sắc thể, bây giờ tôi cần xử lý tất cả các dữ liệu khác.
Tôi muốn nâng cao một số phiên bản EC2 để chuyển đổi, nhưng đồng thời tôi lại sợ nhận được tải rất mất cân bằng trên các công việc xử lý khác nhau (giống như Spark gặp phải tình trạng phân vùng không cân bằng). Ngoài ra, tôi không quan tâm đến việc tăng một phiên bản trên mỗi nhiễm sắc thể, vì đối với tài khoản AWS, giới hạn mặc định là 10 phiên bản.

Sau đó, tôi quyết định viết một tập lệnh bằng R để tối ưu hóa các công việc xử lý.

Đầu tiên, tôi yêu cầu S3 tính toán mỗi nhiễm sắc thể chiếm bao nhiêu không gian lưu trữ.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

Sau đó, tôi viết một hàm lấy tổng kích thước, xáo trộn thứ tự của các nhiễm sắc thể, chia chúng thành các nhóm num_jobs và cho bạn biết quy mô của tất cả các công việc xử lý khác nhau như thế nào.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

Sau đó, tôi thực hiện hàng nghìn lần xáo trộn bằng cách sử dụng tiếng gừ gừ và chọn ra kết quả tốt nhất.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

Vì vậy, tôi đã hoàn thành một nhóm nhiệm vụ có quy mô rất giống nhau. Sau đó, tất cả những gì còn lại là gói tập lệnh Bash trước đó của tôi vào một vòng lặp lớn for. Việc tối ưu hóa này mất khoảng 10 phút để viết. Và số tiền này ít hơn nhiều so với số tiền tôi chi cho việc tạo các tác vụ theo cách thủ công nếu chúng không cân bằng. Vì vậy, tôi nghĩ rằng tôi đã đúng với sự tối ưu hóa sơ bộ này.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

Cuối cùng tôi thêm lệnh tắt máy:

sudo shutdown -h now

... và mọi thứ đã diễn ra tốt đẹp! Bằng cách sử dụng AWS CLI, tôi đã đưa ra các phiên bản bằng tùy chọn user_data đưa cho họ các tập lệnh Bash về nhiệm vụ của họ để xử lý. Chúng chạy và tự động tắt nên tôi không phải trả thêm tiền cho sức mạnh xử lý.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

Hãy đóng gói!

Tôi đã học được gì: API phải đơn giản để sử dụng dễ dàng và linh hoạt.

Cuối cùng tôi đã nhận được dữ liệu ở đúng nơi và đúng hình thức. Tất cả những gì còn lại là đơn giản hóa quá trình sử dụng dữ liệu nhiều nhất có thể để giúp đồng nghiệp của tôi dễ dàng hơn. Tôi muốn tạo một API đơn giản để tạo yêu cầu. Nếu trong tương lai tôi quyết định chuyển từ .rds đối với các tập tin Parquet, thì đây sẽ là một vấn đề đối với tôi chứ không phải đối với đồng nghiệp của tôi. Vì điều này, tôi quyết định tạo một gói R nội bộ.

Xây dựng và ghi lại một gói rất đơn giản chỉ chứa một vài hàm truy cập dữ liệu được sắp xếp xung quanh một hàm get_snp. Tôi cũng đã tạo một trang web cho đồng nghiệp của mình pkgdown, để họ có thể dễ dàng xem các ví dụ và tài liệu.

Phân tích cú pháp 25TB bằng AWK và R

Bộ nhớ đệm thông minh

Tôi đã học được gì: Nếu dữ liệu của bạn được chuẩn bị tốt, việc lưu vào bộ nhớ đệm sẽ dễ dàng!

Vì một trong những quy trình công việc chính đã áp dụng mô hình phân tích tương tự cho gói SNP nên tôi đã quyết định sử dụng tính năng tạo nhóm để có lợi cho mình. Khi truyền dữ liệu qua SNP, mọi thông tin từ nhóm (bin) đều được gắn vào đối tượng trả về. Nghĩa là, các truy vấn cũ (về lý thuyết) có thể tăng tốc độ xử lý các truy vấn mới.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

Khi xây dựng gói, tôi đã chạy nhiều điểm chuẩn để so sánh tốc độ khi sử dụng các phương pháp khác nhau. Tôi khuyên bạn không nên bỏ qua điều này, vì đôi khi kết quả không như mong đợi. Ví dụ, dplyr::filter nhanh hơn nhiều so với việc chụp các hàng bằng cách sử dụng tính năng lọc dựa trên chỉ mục và truy xuất một cột từ khung dữ liệu đã lọc nhanh hơn nhiều so với sử dụng cú pháp lập chỉ mục.

Xin lưu ý rằng đối tượng prev_snp_results chứa chìa khóa snps_in_bin. Đây là một mảng gồm tất cả các SNP duy nhất trong một nhóm (thùng), cho phép bạn kiểm tra nhanh xem bạn đã có dữ liệu từ truy vấn trước đó hay chưa. Nó cũng giúp bạn dễ dàng lặp qua tất cả SNP trong một nhóm (thùng) bằng mã này:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

Những phát hiện

Bây giờ chúng tôi có thể (và đã bắt đầu nghiêm túc) chạy các mô hình và kịch bản mà trước đây chúng tôi không thể tiếp cận được. Điều tuyệt vời nhất là các đồng nghiệp trong phòng thí nghiệm của tôi không phải lo lắng về bất kỳ biến chứng nào. Họ chỉ có một chức năng hoạt động.

Và mặc dù gói này cung cấp cho họ thông tin chi tiết, nhưng tôi đã cố gắng làm cho định dạng dữ liệu đủ đơn giản để họ có thể tìm ra nếu tôi đột nhiên biến mất vào ngày mai...

Tốc độ đã tăng lên rõ rệt. Chúng tôi thường quét các đoạn gen có chức năng quan trọng. Trước đây, chúng tôi không thể làm điều này (hóa ra là quá đắt), nhưng bây giờ, nhờ cấu trúc nhóm (bin) và bộ đệm, yêu cầu cho một SNP mất trung bình ít hơn 0,1 giây và mức sử dụng dữ liệu là như vậy thấp đến nỗi chi phí cho S3 chỉ là hạt đậu.

Kết luận

Bài viết này không phải là một hướng dẫn nào cả. Giải pháp hóa ra mang tính cá nhân và gần như chắc chắn không tối ưu. Đúng hơn, nó là một cuốn tạp chí du lịch. Tôi muốn người khác hiểu rằng những quyết định như vậy dường như không được hình thành đầy đủ trong đầu, chúng là kết quả của quá trình thử và sai. Ngoài ra, nếu bạn đang tìm kiếm một nhà khoa học dữ liệu, hãy nhớ rằng việc sử dụng những công cụ này một cách hiệu quả đòi hỏi phải có kinh nghiệm và kinh nghiệm sẽ tốn tiền. Tôi rất vui vì mình có đủ khả năng để trả tiền, nhưng nhiều người khác có thể làm công việc tương tự tốt hơn tôi sẽ không bao giờ có cơ hội vì thậm chí không có tiền để thử.

Các công cụ dữ liệu lớn rất linh hoạt. Nếu có thời gian, bạn gần như chắc chắn có thể viết một giải pháp nhanh hơn bằng cách sử dụng các kỹ thuật làm sạch, lưu trữ và trích xuất dữ liệu thông minh. Cuối cùng, nó đi đến phân tích chi phí-lợi ích.

Những gì tôi đã học được:

  • không có cách nào rẻ để phân tích 25 TB mỗi lần;
  • hãy cẩn thận với kích thước tệp Parquet của bạn và cách sắp xếp chúng;
  • Các phân vùng trong Spark phải được cân bằng;
  • Nói chung, đừng bao giờ cố gắng tạo 2,5 triệu phân vùng;
  • Việc sắp xếp vẫn còn khó khăn, cũng như việc thiết lập Spark;
  • đôi khi dữ liệu đặc biệt đòi hỏi những giải pháp đặc biệt;
  • Tập hợp Spark nhanh nhưng việc phân vùng vẫn tốn kém;
  • đừng ngủ khi họ dạy bạn những điều cơ bản, có lẽ ai đó đã giải quyết được vấn đề của bạn từ những năm 1980;
  • gnu parallel - đây là một điều kỳ diệu, mọi người nên sử dụng;
  • Spark thích dữ liệu không nén và không thích kết hợp các phân vùng;
  • Spark tốn quá nhiều chi phí khi giải quyết các vấn đề đơn giản;
  • Mảng kết hợp của AWK rất hiệu quả;
  • bạn có thể liên hệ stdin и stdout từ tập lệnh R và do đó sử dụng tập lệnh đó trong quy trình;
  • Nhờ triển khai đường dẫn thông minh, S3 có thể xử lý nhiều tệp;
  • Lý do chính gây lãng phí thời gian là tối ưu hóa sớm phương pháp lưu trữ của bạn;
  • đừng cố gắng tối ưu hóa các tác vụ một cách thủ công, hãy để máy tính làm việc đó;
  • API phải đơn giản để sử dụng dễ dàng và linh hoạt;
  • Nếu dữ liệu của bạn được chuẩn bị tốt, việc lưu vào bộ nhớ đệm sẽ dễ dàng!

Nguồn: www.habr.com

Thêm một lời nhận xét