ما پریکړه وکړه چې ټول لاړ شم او هر SNP تقسیم کړم. دا یقیني کړه چې ویشونه د مساوي اندازې وو. دا یوه بده مفکوره وه. ما ګلو کارولی او یو بې ګناه کرښه یې اضافه کړه partition_by = 'snp'. دنده یې پیل کړه او اجرا یې پیل کړه. یوه ورځ وروسته ما معاینه کړه او ویې لیدل چې S3 ته لاهم هیڅ نه دی لیکل شوی، نو ما دنده ووژله. داسې ښکاري چې ګلو په S3 کې پټ ځای ته منځمهاله فایلونه لیکي، ډیری فایلونه، شاید یو څو ملیون. د پایلې په توګه، زما غلطۍ د زرو ډالرو څخه ډیر لګښت درلود او زما لارښود خوښ نه کړ.
تقسیم کول + ترتیب کول
ما څه زده کړل: ترتیب کول لاهم ستونزمن دي، لکه څنګه چې د سپارک سره سمون لري.
د ویشلو په برخه کې زما وروستۍ هڅه د کروموزومونو ویشل او بیا د هرې برخې ترتیب کول شامل وو. په تیوري کې، دا به هره پوښتنه ګړندۍ کړي ځکه چې د مطلوب SNP ډیټا باید په ټاکل شوي حد کې د څو پارکیټونو په مینځ کې وي. له بده مرغه، حتی د ویشل شوي ډاټا ترتیب کول یو ستونزمن کار و. د پایلې په توګه، ما د ګمرک کلستر لپاره EMR ته لاړ او اته پیاوړي مثالونه (C5.4xl) او سپارکلیر کارولی ترڅو ډیر انعطاف وړ کاري فلو رامینځته کړي ...
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
په هرصورت، دا کار لاهم بشپړ شوی نه و. ما دا په بیلابیلو لارو تنظیم کړی: د هرې پوښتنې اجرا کونکي لپاره د حافظې تخصیص ډیر شوی ، د لوی مقدار حافظې سره نوډونه کارول شوي ، د نشر متغیر (د نشر متغیر) کارول شوي ، مګر هرځل چې دا نیمه اندازه وګرځیدل ، او ورو ورو اجرا کونکي پیل شول. تر هغه چې هرڅه ودریږي ناکام شي.
ما څه زده کړل: د سپارک راټولول ګړندي دي ، مګر تقسیم کول لاهم ګران دي.
ما غوښتل دا کوچنی (2,5 ملیون قطارونه) ډیټا چوکاټ په سپارک کې ولولم، دا د خام ډیټا سره یوځای کړم، او بیا یې د نوي اضافه شوي کالم لخوا ویشم bin.
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
ما کارول sdf_broadcast()، نو سپارک پوهیږي چې دا باید د ډیټا چوکاټ ټولو نوډونو ته واستوي. دا ګټور دی که چیرې معلومات د اندازې وړ وي او د ټولو دندو لپاره اړین وي. که نه نو، سپارک هڅه کوي هوښیار وي او د اړتیا سره سم ډیټا توزیع کوي، کوم چې د سستیدو لامل کیدی شي.
او بیا، زما مفکوره کار ونکړ: دندې د یو څه وخت لپاره کار وکړ، اتحادیه یې بشپړه کړه، او بیا، لکه څنګه چې د ویشلو لخوا پیل شوي اجرا کونکي، دوی ناکام شول.
ما څه زده کړل: gnu parallel - دا یو جادو شی دی، هرڅوک باید وکاروي.
جلا کول خورا ورو وو او کله چې ما پیل کړ htopد ځواکمن (او ګران) EC2 مثال کارولو چک کولو لپاره، دا معلومه شوه چې زه یوازې یو کور او شاوخوا 200 MB حافظه کاروم. د ستونزې د حل لپاره او ډیرې پیسې له لاسه نه ورکوو، موږ باید دا معلومه کړو چې څنګه کار موازي کړو. خوشبختانه، په بشپړ ډول په زړه پورې کتاب کې په کمانډ لاین کې د ډیټا ساینس ما د موازي کولو په اړه د جیرون جانسن لخوا یو فصل وموند. له هغې څخه ما زده کړل gnu parallelپه یونیکس کې د ملټي ریډینګ پلي کولو لپاره خورا انعطاف وړ میتود.
کله چې ما د نوي پروسې په کارولو سره تقسیم کول پیل کړل ، هرڅه سم وو ، مګر لاهم یو خنډ شتون درلود - ډیسک ته د S3 شیانو ډاونلوډ خورا ګړندی نه و او په بشپړ ډول موازي نه و. د دې د حل لپاره، ما دا وکړل:
ما وموندله چې دا ممکنه ده چې د S3 ډاونلوډ مرحله په مستقیم ډول په پایپ لاین کې پلي کړئ، په بشپړ ډول په ډیسک کې منځمهاله ذخیره له منځه یوسي. دا پدې مانا ده چې زه کولی شم ډیسک ته د خام ډیټا لیکلو څخه مخنیوی وکړم او حتی کوچني وکاروم ، او له همدې امله ارزانه ، په AWS کې ذخیره کول.
ما د شبکې سرعت لپاره مطلوب EC2 مثال ته بدل کړ، په نوم کې د n لیک سره. ما وموندله چې د پروسس کولو بریښنا ضایع کول کله چې د n - مثالونو کارول د بارولو سرعت کې د زیاتوالي له امله جبران کیږي. د ډیری کارونو لپاره ما c5n.4xl کارولی.
بدل شو gzip په pigz، دا د gzip وسیله ده چې کولی شي د فایلونو د کمپریس کولو ابتدايي غیر موازي دندې موازي کولو لپاره عالي شیان ترسره کړي (دې لږترلږه مرسته کړې).
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe
"awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
دا مرحلې د یو بل سره یوځای شوي ترڅو هرڅه په چټکۍ سره کار وکړي. د ډاونلوډ سرعت زیاتولو او د ډیسک لیکونو له مینځه وړلو سره، زه اوس کولی شم یوازې په څو ساعتونو کې د 5 ټیرابایټ کڅوړه پروسس کړم.
ما څه زده کړل: سپارک غیر کمپریس شوي ډیټا خوښوي او د برخو ترکیب کول نه خوښوي.
اوس ډاټا په S3 کې په غیر پیک شوي (لوستل: شریک شوي) او نیمه ترتیب شوي بڼه کې وه، او زه کولی شم بیرته سپارک ته راستون شم. یو حیرانتیا زما په تمه وه: زه بیا د هغه څه په ترلاسه کولو کې پاتې راغلم چې ما غوښتل! دا ډیره ستونزمنه وه چې سپارک ته ووایاست چې ډاټا څنګه ویشل شوې وه. او حتی کله چې ما دا وکړل، دا معلومه شوه چې ډیری برخې شتون لري (95 زره)، او کله چې ما کارولې coalesce د دوی شمیر مناسب حد ته راټیټ کړ، دې زما ویش ویجاړ کړ. زه ډاډه یم چې دا حل کیدی شي، مګر د څو ورځو لټون وروسته ما د حل لاره ونه موندله. ما په نهایت کې په سپارک کې ټولې دندې پای ته ورسولې، که څه هم دا یو څه وخت ونیو او زما د ویشلو پارکیټ فایلونه خورا کوچني نه وو (~ 200 KB). په هرصورت، ډاټا هغه ځای و چې ورته اړتیا وه.
ډیر کوچنی او نا مساوی، په زړه پوری!
د محلي سپارک پوښتنو ازموینه
ما څه زده کړل: سپارک د ساده ستونزو د حل کولو په وخت کې ډیر سر لري.
په هوښیار شکل کې د ډیټا ډاونلوډ کولو سره ، ما د دې وړتیا درلوده چې سرعت معاینه کړم. د محلي سپارک سرور چلولو لپاره د R سکریپټ تنظیم کړئ، او بیا د ټاکل شوي پارکیټ ګروپ ذخیره (بن) څخه د سپارک ډیټا چوکاټ پورته کړئ. ما هڅه وکړه چې ټول معلومات پورته کړم مګر د ویشلو پیژندلو لپاره سپارکلیر ترلاسه نکړم.
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
اعدام 29,415 ثانیې وخت نیولی. ډیر ښه، مګر د هر څه د ډله ایزې ازموینې لپاره خورا ښه ندي. سربیره پردې ، زه نشم کولی د کیچ کولو سره شیان ګړندي کړم ځکه چې کله ما په حافظه کې د ډیټا چوکاټ کیش کولو هڅه وکړه ، سپارک تل خراب شو ، حتی کله چې ما له 50 GB څخه ډیر حافظه ډیټا سیټ ته ځانګړې کړې چې وزن یې له 15 څخه کم وي.
تغیرات batch_num и chunk_id د پایپ لاین لخوا چمتو شوي ډیټا سره سمون لري ، د ریس حالت څخه مخنیوی کوي ، او د هر اجرا کولو تار روان دی parallel، خپل ځانګړي فایل ته یې لیکلي.
له هغه وخته چې ما د AWK سره زما د پخوانۍ تجربې څخه پاتې شوي ټول خام ډیټا په کروموزومونو فولډرونو کې توزیع کړې ، اوس زه کولی شم په یو وخت کې د یو کروموزوم پروسس کولو لپاره بل باش سکریپټ ولیکم او S3 ته ژور تقسیم شوي ډیټا ولیږم.
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
کله چې بسته جوړه کړه، ما د مختلف میتودونو کارولو پر مهال د سرعت پرتله کولو لپاره ډیری بنچمارکونه واخیستل. زه وړاندیز کوم چې دا غفلت مه کوئ، ځکه چې ځینې وختونه پایلې غیر متوقع وي. د مثال په ډول، dplyr::filter د شاخص پراساس فلټر کولو په کارولو سره د قطارونو نیولو په پرتله خورا ګړندی و ، او د فلټر شوي ډیټا چوکاټ څخه د واحد کالم ترلاسه کول د شاخص کولو ترکیب کارولو په پرتله خورا ګړندي و.
# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}
پایلې
اوس موږ کولی شو (او په جدي توګه یې پیل کړو) موډلونه او سناریوګانې چلوو چې مخکې موږ ته د لاسرسي وړ نه و. غوره شی دا دی چې زما د لابراتوار همکاران اړتیا نلري د کوم پیچلتیا په اړه فکر وکړي. دوی یوازې یو فعالیت لري چې کار کوي.
او که څه هم کڅوړه دوی ته توضیحات ورکوي ، ما هڅه وکړه چې د ډیټا فارمیټ دومره ساده کړم چې دوی کولی شي دا معلومه کړي که زه سبا ناڅاپه ورک شوم ...