چگونه این مقاله را بخوانیم: از طولانی بودن و آشفتگی متن عذرخواهی می کنم. برای صرفه جویی در وقت شما، هر فصل را با مقدمه ای با عنوان «آنچه یاد گرفتم» شروع می کنم که ماهیت فصل را در یک یا دو جمله خلاصه می کند.
"فقط راه حل را به من نشان بده!" اگر فقط میخواهید ببینید از کجا آمدهام، به بخش «مخترعتر شدن» بروید، اما فکر میکنم خواندن درباره شکست جالبتر و مفیدتر است.
من اخیراً وظیفه راهاندازی فرآیندی برای پردازش حجم زیادی از توالیهای DNA خام (از نظر فنی یک تراشه SNP) را بر عهده داشتم. نیاز به دستیابی سریع دادهها در مورد یک مکان ژنتیکی معین (به نام SNP) برای مدلسازی بعدی و کارهای دیگر بود. با استفاده از R و AWK، من توانستم داده ها را به روشی طبیعی تمیز و سازماندهی کنم و سرعت پردازش پرس و جو را بسیار افزایش داد. این برای من آسان نبود و نیاز به تکرارهای متعدد داشت. این مقاله به شما کمک می کند تا از برخی اشتباهات من اجتناب کنید و به شما نشان دهد که در نهایت با چه چیزی به پایان رسیدم.
ابتدا چند توضیح مقدماتی.
اطلاعات
مرکز پردازش اطلاعات ژنتیکی دانشگاه ما داده هایی را در قالب یک TSV 25 ترابایتی در اختیار ما قرار داد. من آنها را در 5 بسته فشرده شده توسط Gzip دریافت کردم که هر کدام شامل 240 فایل چهار گیگابایتی بود. هر ردیف حاوی داده هایی برای یک SNP از یک فرد بود. در مجموع، دادههای 2,5 میلیون SNP و 60 هزار نفر منتقل شد. علاوه بر اطلاعات SNP، فایل ها حاوی ستون های متعددی با اعدادی بودند که ویژگی های مختلفی مانند شدت خواندن، فراوانی آلل های مختلف و غیره را منعکس می کردند. در مجموع حدود 30 ستون با مقادیر منحصر به فرد وجود داشت.
هدف
مانند هر پروژه مدیریت داده، مهمترین چیز تعیین نحوه استفاده از داده ها بود. در این مورد ما بیشتر مدلها و گردشهای کاری را برای SNP بر اساس SNP انتخاب میکنیم. یعنی در هر زمان فقط به داده های یک SNP نیاز خواهیم داشت. من باید یاد می گرفتم که چگونه تمام رکوردهای مرتبط با یکی از 2,5 میلیون SNP را به آسانی، سریع و ارزان ترین حد ممکن بازیابی کنم.
چگونه این کار را نکنیم
برای نقل یک کلیشه مناسب:
من هزاران بار شکست نخوردم، فقط هزار راه برای جلوگیری از تجزیه دسته ای از داده ها در قالبی مناسب برای پرس و جو کشف کردم.
اولین تلاش
چه آموخته ام: هیچ راه ارزانی برای تجزیه 25 ترابایت در یک زمان وجود ندارد.
پس از گذراندن دوره "روش های پیشرفته برای پردازش داده های بزرگ" در دانشگاه واندربیلت، مطمئن شدم که این ترفند در کیف است. راه اندازی سرور Hive احتمالاً یک یا دو ساعت طول می کشد تا تمام داده ها را اجرا کند و نتیجه را گزارش کند. از آنجایی که داده های ما در AWS S3 ذخیره شده است، از این سرویس استفاده کردم الههء عقل و زیبایی، که به شما امکان می دهد کوئری های Hive SQL را روی داده های S3 اعمال کنید. شما نیازی به راهاندازی/بالا بردن یک خوشه Hive ندارید، و همچنین فقط برای دادههایی که به دنبال آن هستید پول میپردازید.
بعد از اینکه دادهها و قالب آن را به آتنا نشان دادم، آزمایشهایی را با پرس و جوهایی مانند این انجام دادم:
select * from intensityData limit 10;
و به سرعت نتایج با ساختار خوبی دریافت کرد. آماده.
تا اینکه سعی کردیم از داده ها در کارمان استفاده کنیم...
از من خواسته شد که تمام اطلاعات SNP را برای آزمایش مدل بردارم. من پرس و جو را اجرا کردم:
select * from intensityData
where snp = 'rs123456';
... و شروع به انتظار كردن كرد. پس از هشت دقیقه و بیش از 4 ترابایت اطلاعات درخواستی، نتیجه را دریافت کردم. آتنا بر اساس حجم داده های یافت شده، 5 دلار به ازای هر ترابایت هزینه می گیرد. بنابراین این درخواست تنها 20 دلار و هشت دقیقه انتظار هزینه دارد. برای اجرای مدل بر روی تمام داده ها، باید 38 سال صبر می کردیم و 50 میلیون دلار پرداخت می کردیم، بدیهی است که این برای ما مناسب نبود.
استفاده از پارکت ضروری بود...
چه آموخته ام: مراقب حجم فایل های پارکت و سازماندهی آنها باشید.
من ابتدا سعی کردم با تبدیل همه TSV ها به این وضعیت درست کنم فایل های پارکت. آنها برای کار با مجموعه داده های بزرگ راحت هستند زیرا اطلاعات موجود در آنها به شکل ستونی ذخیره می شود: هر ستون در بخش حافظه / دیسک خود قرار دارد، برخلاف فایل های متنی، که در آن ردیف ها حاوی عناصر هر ستون هستند. و اگر نیاز به پیدا کردن چیزی دارید، فقط ستون مورد نیاز را بخوانید. علاوه بر این، هر فایل محدوده ای از مقادیر را در یک ستون ذخیره می کند، بنابراین اگر مقدار مورد نظر شما در محدوده ستون نباشد، Spark زمان را برای اسکن کل فایل تلف نمی کند.
من یک کار ساده را اجرا کردم چسب AWS تا TSV های خود را به پارکت تبدیل کنیم و فایل های جدید را به آتنا رها کنیم. حدود 5 ساعت طول کشید. اما وقتی درخواست را اجرا کردم، تقریباً به همان مقدار زمان و کمی پول کمتر برای تکمیل نیاز داشت. واقعیت این است که Spark، در تلاش برای بهینه سازی کار، به سادگی یک تکه TSV را باز کرد و آن را در قطعه پارکت خودش قرار داد. و از آنجایی که هر تکه به اندازه کافی بزرگ بود که کل سوابق افراد زیادی را در خود جای دهد، هر فایل حاوی تمام SNP ها بود، بنابراین Spark مجبور بود تمام فایل ها را باز کند تا اطلاعات مورد نیاز خود را استخراج کند.
جالب اینجاست که نوع فشرده سازی پیش فرض (و توصیه شده) پارکت، Snappy، قابل تقسیم نیست. بنابراین، هر مجری در وظیفه باز کردن و دانلود مجموعه داده کامل 3,5 گیگابایتی مانده بود.
بیایید مشکل را درک کنیم
چه آموخته ام: مرتب سازی دشوار است، به خصوص اگر داده ها توزیع شده باشند.
به نظرم رسید که اکنون اصل مشکل را درک کرده ام. من فقط باید داده ها را بر اساس ستون SNP مرتب کنم، نه بر اساس افراد. سپس چندین SNP در یک تکه داده جداگانه ذخیره میشوند و سپس تابع «هوشمند» پارکت «فقط در صورتی باز شود که مقدار در محدوده باشد» با شکوه خود را نشان میدهد. متأسفانه، مرتب کردن میلیاردها ردیف پراکنده در یک خوشه کار دشواری بود.
من در کالج در کلاس الگوریتم شرکت می کنم: "اوه، هیچ کس به پیچیدگی محاسباتی همه این الگوریتم های مرتب سازی اهمیت نمی دهد"
من سعی می کنم یک ستون در 20 ترابایت مرتب کنم #جرقه جدول: "چرا این همه طول می کشد؟" # علم داده مبارزات.
AWS قطعاً به دلیل "من دانش آموز پریشان هستم" نمی خواهد بازپرداختی را صادر کند. بعد از مرتب سازی روی چسب آمازون، 2 روز کار کرد و خراب شد.
در مورد پارتیشن بندی چطور؟
چه آموخته ام: پارتیشن ها در اسپارک باید متعادل باشند.
سپس به ایده پارتیشن بندی داده ها در کروموزوم ها رسیدم. 23 مورد از آنها وجود دارد (و اگر DNA میتوکندری و مناطق نقشه برداری نشده را در نظر بگیرید، چندین مورد دیگر وجود دارد).
این به شما امکان می دهد داده ها را به قطعات کوچکتر تقسیم کنید. اگر فقط یک خط به تابع صادرات Spark در اسکریپت Glue اضافه کنید partition_by = "chr"، سپس داده ها باید به سطل ها تقسیم شوند.
ژنوم از قطعات متعددی به نام کروموزوم تشکیل شده است.
متاسفانه کار نکرد. کروموزوم ها اندازه های متفاوتی دارند که به معنای مقادیر متفاوتی از اطلاعات است. این بدان معنی است که وظایفی که اسپارک برای کارگران ارسال می کرد متعادل نبودند و به آرامی تکمیل می شدند زیرا برخی از گره ها زود به پایان می رسیدند و بیکار بودند. با این حال، وظایف تکمیل شد. اما هنگام درخواست یک SNP، عدم تعادل دوباره مشکلاتی را ایجاد کرد. هزینه پردازش SNP در کروموزوم های بزرگتر (یعنی جایی که می خواهیم داده ها را دریافت کنیم) تنها حدود 10 برابر کاهش یافته است. زیاد، اما کافی نیست.
اگر آن را به قسمت های کوچکتر تقسیم کنیم چه؟
چه آموخته ام: هرگز سعی نکنید 2,5 میلیون پارتیشن انجام دهید.
تصمیم گرفتم همه چیز را انجام دهم و هر SNP را پارتیشن بندی کنم. این اطمینان حاصل می کند که پارتیشن ها اندازه یکسانی دارند. این یک ایده بد بود. من از چسب استفاده کردم و یک خط بی گناه اضافه کردم partition_by = 'snp'. کار شروع شد و شروع به اجرا شد. یک روز بعد چک کردم دیدم هنوز چیزی روی S3 نوشته نشده است، بنابراین کار را کشتم. به نظر می رسد که Glue در حال نوشتن فایل های میانی در یک مکان مخفی در S3 بوده است، فایل های زیادی، شاید چند میلیون. در نتیجه اشتباه من بیش از هزار دلار هزینه داشت و مربی من را خوشحال نکرد.
پارتیشن بندی + مرتب سازی
چه آموخته ام: مرتب سازی همچنان دشوار است، مانند تنظیم Spark.
آخرین تلاش من برای پارتیشن بندی شامل تقسیم کروموزوم ها و مرتب سازی هر پارتیشن بود. در تئوری، این امر هر پرس و جو را سرعت می بخشد زیرا داده های SNP مورد نظر باید در چند قطعه پارکت در یک محدوده معین قرار می گرفتند. متأسفانه، مرتب سازی حتی داده های پارتیشن بندی شده کار دشواری بود. در نتیجه، من برای یک کلاستر سفارشی به EMR تغییر مکان دادم و از هشت نمونه قدرتمند (C5.4xl) و Sparklyr برای ایجاد یک گردش کاری انعطافپذیرتر استفاده کردم.
# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
group_by(chr) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr')
)
... با این حال، کار هنوز تکمیل نشده است. من آن را به روشهای مختلفی پیکربندی کردم: تخصیص حافظه را برای هر اجراکننده پرسوجو افزایش دادم، از گرههایی با مقدار زیادی حافظه استفاده کردم، از متغیرهای پخش (متغیرهای پخش) استفاده کردم، اما هر بار اینها نیمه اندازهگیری شدند و به تدریج مجریها شروع به کار کردند. شکست خوردن تا زمانی که همه چیز متوقف شود.
چه آموخته ام: گاهی اوقات داده های خاص راه حل های خاصی را می طلبد.
هر SNP یک مقدار موقعیت دارد. این عدد مربوط به تعداد بازها در امتداد کروموزوم آن است. این یک روش خوب و طبیعی برای سازماندهی داده های ما است. در ابتدا می خواستم بر اساس مناطق هر کروموزوم تقسیم بندی کنم. به عنوان مثال، موقعیت های 1 - 2000، 2001 - 4000 و غیره. اما مشکل این است که SNP ها به طور مساوی در بین کروموزوم ها توزیع نمی شوند، بنابراین اندازه گروه ها بسیار متفاوت خواهد بود.
در نتیجه، من به تقسیم موقعیت ها به دسته ها (رتبه) رسیدم. با استفاده از دادههای دانلود شده قبلی، درخواستی برای به دست آوردن فهرستی از SNPهای منحصربهفرد، موقعیتها و کروموزومهای آنها ارائه کردم. سپس دادههای درون هر کروموزوم را مرتب کردم و SNPها را در گروههایی با اندازه معین جمعآوری کردم. فرض کنید 1000 SNP هر کدام. این به من رابطه SNP به گروه در کروموزوم داد.
در پایان گروه های (bin) 75 SNP را ساختم که دلیل آن در ادامه توضیح داده می شود.
چه آموخته ام: تجمع جرقه سریع است، اما پارتیشن بندی هنوز گران است.
من میخواستم این قاب داده کوچک (2,5 میلیون ردیف) را در Spark بخوانم، آن را با دادههای خام ترکیب کنم و سپس آن را با ستون جدید اضافه شده تقسیم کنم. bin.
# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
group_by(chr_bin) %>%
arrange(Position) %>%
Spark_write_Parquet(
path = DUMP_LOC,
mode = 'overwrite',
partition_by = c('chr_bin')
)
من استفاده کردم sdf_broadcast()، بنابراین Spark می داند که باید چارچوب داده را به همه گره ها ارسال کند. این در صورتی مفید است که داده ها از نظر اندازه کوچک باشند و برای همه وظایف مورد نیاز باشند. در غیر این صورت، Spark سعی می کند هوشمند باشد و داده ها را در صورت نیاز توزیع می کند که می تواند باعث کاهش سرعت شود.
و دوباره، ایده من کار نکرد: وظایف برای مدتی کار کرد، اتحادیه را تکمیل کرد، و سپس، مانند مجریانی که با پارتیشن بندی راه اندازی شدند، شروع به شکست کردند.
اضافه کردن AWK
چه آموخته ام: وقتی اصول اولیه را به شما می آموزند نخوابید. مطمئناً کسی قبلاً مشکل شما را در دهه 1980 حل کرده است.
تا این مرحله، دلیل تمام شکست های من با Spark به هم ریختگی داده ها در خوشه بود. شاید بتوان با پیش درمان وضعیت را بهبود بخشید. تصمیم گرفتم دادههای متن خام را به ستونهای کروموزوم تقسیم کنم، بنابراین امیدوار بودم که دادههای «پیشپارتیشنشده» را به Spark ارائه دهم.
من در StackOverflow برای نحوه تقسیم بر اساس مقادیر ستون جستجو کردم و پیدا کردم چنین پاسخ عالی با AWK می توانید یک فایل متنی را با نوشتن آن در یک اسکریپت به جای ارسال نتایج به مقادیر ستون تقسیم کنید. stdout.
من یک اسکریپت Bash نوشتم تا آن را امتحان کنم. یکی از TSV های بسته بندی شده را دانلود کرد، سپس آن را با استفاده از بسته بندی باز کرد gzip و ارسال شد به awk.
چه آموخته ام: gnu parallel - این یک چیز جادویی است، همه باید از آن استفاده کنند.
جدایی خیلی کند بود و وقتی شروع کردم htopبرای بررسی استفاده از یک نمونه قدرتمند (و گران قیمت) EC2، معلوم شد که من فقط از یک هسته و حدود 200 مگابایت حافظه استفاده می کردم. برای حل مشکل و از دست ندادن پول زیاد، باید دریابیم که چگونه کار را موازی کنیم. خوشبختانه، در یک کتاب کاملا شگفت انگیز علم داده در خط فرمان من فصلی از جرون یانسنز در مورد موازی سازی پیدا کردم. از آن یاد گرفتم gnu parallel، یک روش بسیار منعطف برای پیاده سازی multithreading در یونیکس.
وقتی پارتیشن بندی را با استفاده از فرآیند جدید شروع کردم، همه چیز خوب بود، اما هنوز یک تنگنا وجود داشت - دانلود اشیاء S3 روی دیسک خیلی سریع نبود و کاملاً موازی نبود. برای رفع این مشکل، این کار را انجام دادم:
متوجه شدم که میتوان مرحله دانلود S3 را مستقیماً در خط لوله پیادهسازی کرد و فضای ذخیرهسازی میانی روی دیسک را کاملاً حذف کرد. این بدان معنی است که می توانم از نوشتن داده های خام روی دیسک اجتناب کنم و از فضای ذخیره سازی کوچکتر و در نتیجه ارزانتر در AWS استفاده کنم.
تیم aws configure set default.s3.max_concurrent_requests 50 تعداد رشته هایی را که AWS CLI استفاده می کند بسیار افزایش داد (به طور پیش فرض 10 عدد وجود دارد).
من به یک نمونه EC2 که برای سرعت شبکه بهینه شده بود، با حرف n در نام تغییر کردم. من متوجه شده ام که از دست دادن قدرت پردازش در هنگام استفاده از n نمونه بیشتر از افزایش سرعت بارگذاری جبران می شود. برای اکثر کارها از c5n.4xl استفاده کردم.
تغییر کرد gzip بر pigz، این یک ابزار gzip است که می تواند کارهای جالبی را برای موازی کردن کار اولیه غیر موازی فشرده سازی فایل ها انجام دهد (این کمترین کمک را به شما کمک کرد).
# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50
for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do
aws s3 cp s3://$batch_loc$chunk_file - |
pigz -dc |
parallel --block 100M --pipe
"awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"
# Combine all the parallel process chunks to single files
ls chunked/ |
cut -d '_' -f 2 |
sort -u |
parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
# Clean up intermediate data
rm chunked/*
done
این مراحل با یکدیگر ترکیب می شوند تا همه چیز خیلی سریع کار کند. با افزایش سرعت دانلود و حذف نوشتن دیسک، اکنون می توانم یک بسته 5 ترابایتی را تنها در چند ساعت پردازش کنم.
هیچ چیز شیرینتر از این نیست که تمام هستههایی را که در AWS هزینه میکنید استفاده کنید. به لطف gnu-parallel می توانم یک csv 19 گیگ را از حالت فشرده خارج کرده و به همان سرعتی که می توانم آن را دانلود کنم تقسیم کنم. من حتی نتوانستم جرقه ای برای اجرای این کار داشته باشم. # علم داده# لینوکسpic.twitter.com/Nqyba2zqEk
چه آموخته ام: Spark داده های فشرده نشده را دوست دارد و از ترکیب پارتیشن ها خوشش نمی آید.
اکنون داده ها در S3 در قالب بدون بسته (بخوانید: اشتراک گذاری شده) و نیمه مرتب شده بودند و من می توانستم دوباره به Spark برگردم. غافلگیری در انتظارم بود: باز هم نتوانستم به آنچه می خواستم برسم! بسیار دشوار بود که به Spark دقیقاً نحوه پارتیشن بندی داده ها را بگویید. و حتی وقتی این کار را انجام دادم، معلوم شد که پارتیشن های زیادی وجود دارد (95 هزار)، و زمانی که استفاده کردم coalesce تعداد آنها را به حد معقول کاهش داد، این پارتیشن بندی من را از بین برد. من مطمئن هستم که می توان این مشکل را برطرف کرد، اما بعد از چند روز جستجو نتوانستم راه حلی پیدا کنم. من در نهایت تمام وظایف را در Spark به پایان رساندم، اگرچه مدتی طول کشید و فایل های اسپلیت پارکت من خیلی کم نبود (~200 کیلوبایت). با این حال، داده ها در جایی بود که لازم بود.
خیلی کوچک و ناهموار، فوق العاده است!
تست پرس و جوهای محلی Spark
چه آموخته ام: اسپارک هنگام حل مسائل ساده سربار زیادی دارد.
با دانلود داده ها در قالبی هوشمندانه، توانستم سرعت را آزمایش کنم. یک اسکریپت R را برای اجرای یک سرور محلی Spark تنظیم کنید و سپس یک قاب داده Spark را از ذخیره سازی گروه پارکت مشخص شده (bin) بارگیری کنید. من سعی کردم همه داده ها را بارگیری کنم اما نتوانستم Sparklyr پارتیشن بندی را تشخیص دهد.
sc <- Spark_connect(master = "local")
desired_snp <- 'rs34771739'
# Start a timer
start_time <- Sys.time()
# Load the desired bin into Spark
intensity_data <- sc %>%
Spark_read_Parquet(
name = 'intensity_data',
path = get_snp_location(desired_snp),
memory = FALSE )
# Subset bin to snp and then collect to local
test_subset <- intensity_data %>%
filter(SNP_Name == desired_snp) %>%
collect()
print(Sys.time() - start_time)
اعدام 29,415 ثانیه طول کشید. خیلی بهتر است، اما برای آزمایش انبوه هر چیزی خیلی خوب نیست. علاوه بر این، من نمیتوانستم کارها را با کش سرعت دهم، زیرا وقتی سعی میکردم یک قاب داده را در حافظه پنهان کنم، Spark همیشه خراب میشد، حتی زمانی که بیش از 50 گیگابایت حافظه را به مجموعه دادهای با وزن کمتر از 15 اختصاص دادم.
بازگشت به AWK
چه آموخته ام: آرایه های انجمنی در AWK بسیار کارآمد هستند.
متوجه شدم که می توانم به سرعت های بالاتری برسم. من که در یک فوق العاده به یاد آورد آموزش AWK توسط بروس بارنت من در مورد یک ویژگی جالب به نام "آرایه های انجمنی" در اصل ، اینها جفت های کلید-مقدار هستند که به دلایلی در AWK به طور متفاوتی نامیده می شدند و بنابراین من به نوعی به آنها زیاد فکر نکردم. رومن چپلیاکا به یاد آورد که اصطلاح "آرایه های انجمنی" بسیار قدیمی تر از اصطلاح "جفت کلید-مقدار" است. حتی اگر شما کلید-مقدار را در Google Ngram جستجو کنید، این عبارت را در آنجا نخواهید دید، اما آرایه های انجمنی را خواهید یافت! علاوه بر این، "جفت کلید-مقدار" اغلب با پایگاه های داده مرتبط است، بنابراین مقایسه آن با نقشه هشم بسیار منطقی تر است. متوجه شدم که میتوانم از این آرایههای انجمنی برای مرتبط کردن SNPهای خود با جدول bin و دادههای خام بدون استفاده از Spark استفاده کنم.
برای انجام این کار، در اسکریپت AWK از بلوک استفاده کردم BEGIN. این قطعه کدی است که قبل از ارسال اولین خط داده به بدنه اصلی اسکریپت اجرا می شود.
تیم while(getline...) همه سطرها را از گروه CSV بارگیری کرد (bin)، ستون اول (نام SNP) را به عنوان کلید آرایه انجمنی تنظیم کرد. bin و مقدار دوم (گروه) به عنوان مقدار. سپس در بلوک {}، که در تمام خطوط فایل اصلی اجرا می شود، هر خط به فایل خروجی ارسال می شود که بسته به گروه خود (bin) یک نام منحصر به فرد دریافت می کند: ..._bin_"bin[$1]"_....
متغیرها batch_num и chunk_id با داده های ارائه شده توسط خط لوله مطابقت داشت، از شرایط مسابقه اجتناب کرد، و هر رشته اجرایی در حال اجرا بود parallel، در فایل منحصر به فرد خود نوشت.
از آنجایی که من تمام دادههای خام را در پوشههای کروموزومهای باقیمانده از آزمایش قبلی خود با AWK پراکنده کردم، اکنون میتوانم اسکریپت Bash دیگری برای پردازش یک کروموزوم در یک زمان بنویسم و دادههای پارتیشن بندی شده عمیقتری را به S3 ارسال کنم.
DESIRED_CHR='13'
# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"
# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*
فیلمنامه دو بخش دارد parallel.
در بخش اول، دادهها از تمام فایلهای حاوی اطلاعات کروموزوم مورد نظر خوانده میشوند، سپس این دادهها در سراسر رشتهها توزیع میشوند که فایلها را در گروههای مناسب (bin) توزیع میکنند. برای جلوگیری از شرایط مسابقه زمانی که چندین رشته در یک فایل می نویسند، AWK نام فایل ها را برای نوشتن داده ها به مکان های مختلف ارسال می کند، به عنوان مثال. chr_10_bin_52_batch_2_aa.csv. در نتیجه، بسیاری از فایلهای کوچک روی دیسک ایجاد میشوند (برای این کار از حجمهای EBS ترابایتی استفاده کردم).
نوار نقاله از بخش دوم parallel از گروه ها (bin) عبور می کند و فایل های فردی آنها را در CSV c مشترک ترکیب می کند catو سپس آنها را برای صادرات ارسال می کند.
پخش در R؟
چه آموخته ام: شما می توانید تماس بگیرید stdin и stdout از یک اسکریپت R، و بنابراین از آن در خط لوله استفاده کنید.
ممکن است متوجه این خط در اسکریپت Bash شده باشید: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... تمام فایل های گروهی به هم پیوسته (bin) را به اسکریپت R زیر ترجمه می کند. {} یک تکنیک خاص است parallel، که هر داده ای را که به جریان مشخص شده ارسال می کند مستقیماً در خود فرمان وارد می کند. گزینه {#} یک شناسه موضوع منحصر به فرد را ارائه می دهد و {%} شماره شکاف شغلی را نشان می دهد (تکرار شده، اما هرگز به طور همزمان). لیستی از همه گزینه ها را می توان در آن یافت مستندات.
وقتی یک متغیر file("stdin") منتقل شده به readr::read_csv، داده های ترجمه شده به اسکریپت R در یک فریم بارگذاری می شود که سپس در فرم قرار می گیرد .rds-فایل با استفاده از aws.s3 مستقیماً روی S3 نوشته شده است.
RDS چیزی شبیه یک نسخه جوان پارکت است، بدون زواید فضای ذخیرهسازی بلندگو.
پس از اتمام اسکریپت Bash من یک بسته نرم افزاری دریافت کردم .rds-فایل های واقع در S3، که به من امکان استفاده از فشرده سازی کارآمد و انواع داخلی را می دهد.
با وجود استفاده از ترمز R، همه چیز خیلی سریع کار کرد. جای تعجب نیست که بخش هایی از R که داده ها را می خوانند و می نویسند بسیار بهینه شده اند. پس از آزمایش بر روی یک کروموزوم با اندازه متوسط، کار در یک نمونه C5n.4xl در حدود دو ساعت تکمیل شد.
محدودیت های S3
چه آموخته ام: به لطف پیاده سازی مسیر هوشمند، S3 می تواند فایل های زیادی را مدیریت کند.
من نگران بودم که آیا S3 بتواند بسیاری از فایلهایی را که به آن منتقل میشوند مدیریت کند. من میتوانم نام فایلها را معنا کنم، اما S3 چگونه آنها را جستجو میکند؟
پوشه ها در S3 فقط برای نمایش هستند، در واقع سیستم علاقه ای به نماد ندارد /. از صفحه پرسش و پاسخ S3.
به نظر می رسد که S3 مسیر یک فایل خاص را به عنوان یک کلید ساده در نوعی جدول هش یا پایگاه داده مبتنی بر سند نشان می دهد. یک سطل را می توان به عنوان یک جدول در نظر گرفت و فایل ها را می توان رکوردهای موجود در آن جدول در نظر گرفت.
از آنجایی که سرعت و کارایی برای کسب درآمد در آمازون مهم است، جای تعجب نیست که این سیستم مسیر کلیدی بهعنوان یک فایل بهشدت بهینهسازی شده است. سعی کردم تعادلی پیدا کنم: به طوری که مجبور نباشم درخواست های دریافت زیادی داشته باشم، اما درخواست ها به سرعت اجرا شوند. معلوم شد که بهتر است حدود 20 هزار فایل bin درست کنید. فکر میکنم اگر به بهینهسازی ادامه دهیم، میتوانیم به افزایش سرعت برسیم (مثلاً یک سطل مخصوص فقط برای دادهها بسازیم و در نتیجه اندازه جدول جستجو را کاهش دهیم). اما زمان و هزینه ای برای آزمایش های بیشتر وجود نداشت.
در مورد سازگاری متقابل چطور؟
چیزی که یاد گرفتم: علت شماره یک اتلاف وقت، بهینه سازی روش ذخیره سازی پیش از موعد است.
در این مرحله، بسیار مهم است که از خود بپرسید: "چرا از یک فرمت فایل اختصاصی استفاده کنیم؟" دلیل آن در سرعت بارگذاری (بارگیری فایل های CSV زیپ شده ۷ برابر زمان بیشتری طول کشید) و سازگاری با گردش کار ما نهفته است. ممکن است در مورد اینکه R بتواند به راحتی فایل های Parquet (یا Arrow) را بدون بارگذاری Spark بارگذاری کند، تجدید نظر کنم. همه افراد در آزمایشگاه ما از R استفاده می کنند، و اگر من نیاز به تبدیل داده ها به فرمت دیگری داشته باشم، هنوز داده های متن اصلی را دارم، بنابراین می توانم دوباره خط لوله را اجرا کنم.
تقسیم کار
چه آموخته ام: سعی نکنید کارها را به صورت دستی بهینه کنید، بگذارید رایانه این کار را انجام دهد.
من جریان کار را در یک کروموزوم اشکال زدایی کرده ام، اکنون باید تمام داده های دیگر را پردازش کنم.
من میخواستم چندین نمونه EC2 را برای تبدیل بالا ببرم، اما در همان زمان میترسیدم بار بسیار نامتعادلی را در کارهای پردازشی مختلف دریافت کنم (همانطور که Spark از پارتیشنهای نامتعادل رنج میبرد). علاوه بر این، من علاقه ای به افزایش یک نمونه در هر کروموزوم نداشتم، زیرا برای حساب های AWS محدودیت پیش فرض 10 نمونه وجود دارد.
سپس تصمیم گرفتم یک اسکریپت به زبان R بنویسم تا کارهای پردازشی را بهینه کنم.
ابتدا از S3 خواستم محاسبه کند که هر کروموزوم چقدر فضای ذخیره سازی را اشغال کرده است.
library(aws.s3)
library(tidyverse)
chr_sizes <- get_bucket_df(
bucket = '...', prefix = '...', max = Inf
) %>%
mutate(Size = as.numeric(Size)) %>%
filter(Size != 0) %>%
mutate(
# Extract chromosome from the file name
chr = str_extract(Key, 'chr.{1,4}.csv') %>%
str_remove_all('chr|.csv')
) %>%
group_by(chr) %>%
summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB
# A tibble: 27 x 2
chr total_size
<chr> <dbl>
1 0 163.
2 1 967.
3 10 541.
4 11 611.
5 12 542.
6 13 364.
7 14 375.
8 15 372.
9 16 434.
10 17 443.
# … with 17 more rows
سپس تابعی نوشتم که اندازه کل را می گیرد، ترتیب کروموزوم ها را به هم می زند، آنها را به گروه ها تقسیم می کند. num_jobs و به شما می گوید که اندازه همه کارهای پردازش چقدر متفاوت است.
num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7
shuffle_job <- function(i){
chr_sizes %>%
sample_frac() %>%
mutate(
cum_size = cumsum(total_size),
job_num = ceiling(cum_size/job_size)
) %>%
group_by(job_num) %>%
summarise(
job_chrs = paste(chr, collapse = ','),
total_job_size = sum(total_size)
) %>%
mutate(sd = sd(total_job_size)) %>%
nest(-sd)
}
shuffle_job(1)
# A tibble: 1 x 2
sd data
<dbl> <list>
1 153. <tibble [7 × 3]>
سپس هزار بار با استفاده از purrr دویدم و بهترین را انتخاب کردم.
بنابراین من با مجموعه ای از کارها که از نظر اندازه بسیار شبیه بودند، به پایان رسیدم. سپس تنها چیزی که باقی ماند این بود که اسکریپت قبلی Bash را در یک حلقه بزرگ بپیچانم for. نوشتن این بهینه سازی حدود 10 دقیقه طول کشید. و این بسیار کمتر از آن چیزی است که من برای ایجاد دستی کارها در صورت نامتعادل بودن هزینه می کنم. بنابراین، من فکر می کنم که حق با این بهینه سازی اولیه بود.
for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi
در پایان دستور shutdown را اضافه می کنم:
sudo shutdown -h now
... و همه چیز درست شد! با استفاده از AWS CLI، نمونه هایی را با استفاده از گزینه مطرح کردم user_data اسکریپت های Bash از وظایفشان را برای پردازش به آنها داد. آنها به طور خودکار اجرا می شوند و خاموش می شوند، بنابراین من برای قدرت پردازش اضافی پولی نمی پرداختم.
چه آموخته ام: برای سهولت و انعطاف پذیری استفاده، API باید ساده باشد.
در نهایت من داده ها را در مکان و فرم مناسب دریافت کردم. تنها چیزی که باقی ماند این بود که فرآیند استفاده از داده ها را تا حد امکان ساده کنم تا کار را برای همکارانم آسان کنم. من می خواستم یک API ساده برای ایجاد پرس و جو بسازم. اگر در آینده تصمیم بگیرم از .rds به فایل های پارکت، پس این باید برای من مشکل داشته باشد، نه برای همکارانم. برای این کار تصمیم گرفتم یک بسته R داخلی بسازم.
یک بسته بسیار ساده را بسازید و مستند کنید که فقط شامل چند تابع دسترسی به داده است که حول یک تابع سازماندهی شده اند get_snp. یک سایت هم برای همکارانم درست کردم pkgdown، بنابراین آنها می توانند به راحتی نمونه ها و مستندات را ببینند.
حافظه پنهان هوشمند
چه آموخته ام: اگر داده های شما به خوبی آماده شده باشد، کش کردن آسان خواهد بود!
از آنجایی که یکی از گردشهای کاری اصلی همان مدل تحلیلی را در بسته SNP اعمال کرد، تصمیم گرفتم از binning به نفع خود استفاده کنم. هنگام انتقال داده ها از طریق SNP، تمام اطلاعات گروه (bin) به شی برگشتی متصل می شود. یعنی پرس و جوهای قدیمی می توانند (در تئوری) پردازش پرس و جوهای جدید را سرعت بخشند.
# Part of get_snp()
...
# Test if our current snp data has the desired snp.
already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin
if(!already_have_snp){
# Grab info on the bin of the desired snp
snp_results <- get_snp_bin(desired_snp)
# Download the snp's bin data
snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
} else {
# The previous snp data contained the right bin so just use it
snp_results <- prev_snp_results
}
...
هنگام ساخت بسته، معیارهای زیادی را برای مقایسه سرعت هنگام استفاده از روشهای مختلف اجرا کردم. توصیه می کنم از این موضوع غافل نشوید، زیرا گاهی اوقات نتایج غیرمنتظره است. مثلا، dplyr::filter بسیار سریعتر از گرفتن ردیفها با استفاده از فیلترینگ مبتنی بر نمایهسازی بود، و بازیابی یک ستون از یک قاب داده فیلتر شده بسیار سریعتر از استفاده از نحو نمایهسازی بود.
لطفا توجه داشته باشید که شی prev_snp_results حاوی کلید است snps_in_bin. این آرایه ای از تمام SNP های منحصر به فرد در یک گروه (bin) است که به شما این امکان را می دهد تا به سرعت بررسی کنید که آیا قبلاً داده هایی از یک پرس و جو قبلی دارید یا خیر. همچنین حلقه زدن تمام SNP های یک گروه (bin) را با این کد آسان می کند:
# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin
for(current_snp in snps_in_bin){
my_snp_results <- get_snp(current_snp, my_snp_results)
# Do something with results
}
یافته ها
اکنون می توانیم (و به طور جدی شروع به اجرای آن کرده ایم) مدل ها و سناریوهایی را اجرا کنیم که قبلاً برای ما غیرقابل دسترسی بودند. بهترین چیز این است که همکاران آزمایشگاهی من مجبور نیستند به هیچ عارضه ای فکر کنند. آنها فقط عملکردی دارند که کار می کند.
و اگرچه بسته جزئیات را در اختیار آنها قرار نمی دهد، من سعی کردم قالب داده را به اندازه کافی ساده کنم که اگر فردا ناگهان ناپدید شدم، بتوانند آن را بفهمند...
سرعت به طرز محسوسی افزایش یافته است. ما معمولاً قطعات ژنوم از نظر عملکردی مهم را اسکن می کنیم. قبلاً نمیتوانستیم این کار را انجام دهیم (معلوم شد که خیلی گران است)، اما اکنون، به لطف ساختار گروه (bin) و حافظه پنهان، درخواست یک SNP به طور متوسط کمتر از 0,1 ثانیه طول میکشد و میزان استفاده از دادهها بسیار زیاد است. کم است که هزینه های S3 بادام زمینی است.
اخیراً 25+ TB داده خام ژنوتایپینگ را برای آزمایشگاهم تغییر دادم. وقتی شروع کردم، استفاده از اسپارک 8 دقیقه طول کشید و 20 دلار برای درخواست SNP هزینه داشت. پس از استفاده از AWK+ #rstats برای پردازش، اکنون کمتر از یک دهم ثانیه طول می کشد و 10 دلار هزینه دارد. شخصی من #اطلاعات بزرگ پیروزی. pic.twitter.com/ANOXVGrmkk
این مقاله اصلاً راهنما نیست. معلوم شد که راه حل فردی است و تقریباً مطمئناً مطلوب نیست. بلکه سفرنامه است. میخواهم دیگران بفهمند که چنین تصمیمهایی در ذهن کاملاً شکلگرفته به نظر نمیرسند، آنها نتیجه آزمون و خطا هستند. همچنین، اگر به دنبال یک دانشمند داده هستید، به خاطر داشته باشید که استفاده از این ابزارها به طور موثر نیاز به تجربه دارد و تجربه هزینه دارد. من خوشحالم که امکانات پرداخت را داشتم، اما بسیاری دیگر که می توانند همان کار را بهتر از من انجام دهند، به دلیل کمبود پول هرگز فرصتی برای تلاش ندارند.
ابزارهای کلان داده همه کاره هستند. اگر وقت دارید، تقریباً مطمئناً می توانید با استفاده از تکنیک های تمیز کردن، ذخیره سازی و استخراج داده های هوشمند راه حل سریع تری بنویسید. در نهایت به تجزیه و تحلیل هزینه و فایده می رسد.
چیزی که یاد گرفتم:
هیچ راه ارزانی برای تجزیه 25 ترابایت در یک زمان وجود ندارد.
مراقب حجم فایل های پارکت و سازماندهی آنها باشید.
پارتیشن ها در Spark باید متعادل باشند.
به طور کلی هرگز سعی نکنید 2,5 میلیون پارتیشن بسازید.
مرتب سازی همچنان دشوار است، مانند راه اندازی Spark.
گاهی اوقات داده های خاص به راه حل های خاصی نیاز دارند.
تجمع جرقه سریع است، اما پارتیشن بندی هنوز گران است.
وقتی اصول را به شما می آموزند نخوابید، احتمالاً کسی قبلاً مشکل شما را در دهه 1980 حل کرده است.
gnu parallel - این یک چیز جادویی است، همه باید از آن استفاده کنند.
Spark داده های فشرده نشده را دوست دارد و از ترکیب پارتیشن ها خوشش نمی آید.
اسپارک هنگام حل مسائل ساده، سربار زیادی دارد.
آرایه های انجمنی AWK بسیار کارآمد هستند.
شما می توانید تماس بگیرید stdin и stdout از یک اسکریپت R، و بنابراین از آن در خط لوله استفاده کنید.
به لطف پیاده سازی مسیر هوشمند، S3 می تواند بسیاری از فایل ها را پردازش کند.
دلیل اصلی اتلاف وقت، بهینه سازی زودرس روش ذخیره سازی است.
سعی نکنید کارها را به صورت دستی بهینه کنید، اجازه دهید رایانه این کار را انجام دهد.
API باید برای سهولت و انعطاف پذیری استفاده ساده باشد.
اگر داده های شما به خوبی آماده شده باشد، ذخیره سازی در حافظه پنهان آسان خواهد بود!