تجزیه 25 ترابایت با استفاده از AWK و R

تجزیه 25 ترابایت با استفاده از AWK و R
چگونه این مقاله را بخوانیم: از طولانی بودن و آشفتگی متن عذرخواهی می کنم. برای صرفه جویی در وقت شما، هر فصل را با مقدمه ای با عنوان «آنچه یاد گرفتم» شروع می کنم که ماهیت فصل را در یک یا دو جمله خلاصه می کند.

"فقط راه حل را به من نشان بده!" اگر فقط می‌خواهید ببینید از کجا آمده‌ام، به بخش «مخترع‌تر شدن» بروید، اما فکر می‌کنم خواندن درباره شکست جالب‌تر و مفیدتر است.

من اخیراً وظیفه راه‌اندازی فرآیندی برای پردازش حجم زیادی از توالی‌های DNA خام (از نظر فنی یک تراشه SNP) را بر عهده داشتم. نیاز به دستیابی سریع داده‌ها در مورد یک مکان ژنتیکی معین (به نام SNP) برای مدل‌سازی بعدی و کارهای دیگر بود. با استفاده از R و AWK، من توانستم داده ها را به روشی طبیعی تمیز و سازماندهی کنم و سرعت پردازش پرس و جو را بسیار افزایش داد. این برای من آسان نبود و نیاز به تکرارهای متعدد داشت. این مقاله به شما کمک می کند تا از برخی اشتباهات من اجتناب کنید و به شما نشان دهد که در نهایت با چه چیزی به پایان رسیدم.

ابتدا چند توضیح مقدماتی.

اطلاعات

مرکز پردازش اطلاعات ژنتیکی دانشگاه ما داده هایی را در قالب یک TSV 25 ترابایتی در اختیار ما قرار داد. من آنها را در 5 بسته فشرده شده توسط Gzip دریافت کردم که هر کدام شامل 240 فایل چهار گیگابایتی بود. هر ردیف حاوی داده هایی برای یک SNP از یک فرد بود. در مجموع، داده‌های 2,5 میلیون SNP و 60 هزار نفر منتقل شد. علاوه بر اطلاعات SNP، فایل ها حاوی ستون های متعددی با اعدادی بودند که ویژگی های مختلفی مانند شدت خواندن، فراوانی آلل های مختلف و غیره را منعکس می کردند. در مجموع حدود 30 ستون با مقادیر منحصر به فرد وجود داشت.

هدف

مانند هر پروژه مدیریت داده، مهمترین چیز تعیین نحوه استفاده از داده ها بود. در این مورد ما بیشتر مدل‌ها و گردش‌های کاری را برای SNP بر اساس SNP انتخاب می‌کنیم. یعنی در هر زمان فقط به داده های یک SNP نیاز خواهیم داشت. من باید یاد می گرفتم که چگونه تمام رکوردهای مرتبط با یکی از 2,5 میلیون SNP را به آسانی، سریع و ارزان ترین حد ممکن بازیابی کنم.

چگونه این کار را نکنیم

برای نقل یک کلیشه مناسب:

من هزاران بار شکست نخوردم، فقط هزار راه برای جلوگیری از تجزیه دسته ای از داده ها در قالبی مناسب برای پرس و جو کشف کردم.

اولین تلاش

چه آموخته ام: هیچ راه ارزانی برای تجزیه 25 ترابایت در یک زمان وجود ندارد.

پس از گذراندن دوره "روش های پیشرفته برای پردازش داده های بزرگ" در دانشگاه واندربیلت، مطمئن شدم که این ترفند در کیف است. راه اندازی سرور Hive احتمالاً یک یا دو ساعت طول می کشد تا تمام داده ها را اجرا کند و نتیجه را گزارش کند. از آنجایی که داده های ما در AWS S3 ذخیره شده است، از این سرویس استفاده کردم الههء عقل و زیبایی، که به شما امکان می دهد کوئری های Hive SQL را روی داده های S3 اعمال کنید. شما نیازی به راه‌اندازی/بالا بردن یک خوشه Hive ندارید، و همچنین فقط برای داده‌هایی که به دنبال آن هستید پول می‌پردازید.

بعد از اینکه داده‌ها و قالب آن را به آتنا نشان دادم، آزمایش‌هایی را با پرس و جوهایی مانند این انجام دادم:

select * from intensityData limit 10;

و به سرعت نتایج با ساختار خوبی دریافت کرد. آماده.

تا اینکه سعی کردیم از داده ها در کارمان استفاده کنیم...

از من خواسته شد که تمام اطلاعات SNP را برای آزمایش مدل بردارم. من پرس و جو را اجرا کردم:


select * from intensityData 
where snp = 'rs123456';

... و شروع به انتظار كردن كرد. پس از هشت دقیقه و بیش از 4 ترابایت اطلاعات درخواستی، نتیجه را دریافت کردم. آتنا بر اساس حجم داده های یافت شده، 5 دلار به ازای هر ترابایت هزینه می گیرد. بنابراین این درخواست تنها 20 دلار و هشت دقیقه انتظار هزینه دارد. برای اجرای مدل بر روی تمام داده ها، باید 38 سال صبر می کردیم و 50 میلیون دلار پرداخت می کردیم، بدیهی است که این برای ما مناسب نبود.

استفاده از پارکت ضروری بود...

چه آموخته ام: مراقب حجم فایل های پارکت و سازماندهی آنها باشید.

من ابتدا سعی کردم با تبدیل همه TSV ها به این وضعیت درست کنم فایل های پارکت. آنها برای کار با مجموعه داده های بزرگ راحت هستند زیرا اطلاعات موجود در آنها به شکل ستونی ذخیره می شود: هر ستون در بخش حافظه / دیسک خود قرار دارد، برخلاف فایل های متنی، که در آن ردیف ها حاوی عناصر هر ستون هستند. و اگر نیاز به پیدا کردن چیزی دارید، فقط ستون مورد نیاز را بخوانید. علاوه بر این، هر فایل محدوده ای از مقادیر را در یک ستون ذخیره می کند، بنابراین اگر مقدار مورد نظر شما در محدوده ستون نباشد، Spark زمان را برای اسکن کل فایل تلف نمی کند.

من یک کار ساده را اجرا کردم چسب AWS تا TSV های خود را به پارکت تبدیل کنیم و فایل های جدید را به آتنا رها کنیم. حدود 5 ساعت طول کشید. اما وقتی درخواست را اجرا کردم، تقریباً به همان مقدار زمان و کمی پول کمتر برای تکمیل نیاز داشت. واقعیت این است که Spark، در تلاش برای بهینه سازی کار، به سادگی یک تکه TSV را باز کرد و آن را در قطعه پارکت خودش قرار داد. و از آنجایی که هر تکه به اندازه کافی بزرگ بود که کل سوابق افراد زیادی را در خود جای دهد، هر فایل حاوی تمام SNP ها بود، بنابراین Spark مجبور بود تمام فایل ها را باز کند تا اطلاعات مورد نیاز خود را استخراج کند.

جالب اینجاست که نوع فشرده سازی پیش فرض (و توصیه شده) پارکت، Snappy، قابل تقسیم نیست. بنابراین، هر مجری در وظیفه باز کردن و دانلود مجموعه داده کامل 3,5 گیگابایتی مانده بود.

تجزیه 25 ترابایت با استفاده از AWK و R

بیایید مشکل را درک کنیم

چه آموخته ام: مرتب سازی دشوار است، به خصوص اگر داده ها توزیع شده باشند.

به نظرم رسید که اکنون اصل مشکل را درک کرده ام. من فقط باید داده ها را بر اساس ستون SNP مرتب کنم، نه بر اساس افراد. سپس چندین SNP در یک تکه داده جداگانه ذخیره می‌شوند و سپس تابع «هوشمند» پارکت «فقط در صورتی باز شود که مقدار در محدوده باشد» با شکوه خود را نشان می‌دهد. متأسفانه، مرتب کردن میلیاردها ردیف پراکنده در یک خوشه کار دشواری بود.

AWS قطعاً به دلیل "من دانش آموز پریشان هستم" نمی خواهد بازپرداختی را صادر کند. بعد از مرتب سازی روی چسب آمازون، 2 روز کار کرد و خراب شد.

در مورد پارتیشن بندی چطور؟

چه آموخته ام: پارتیشن ها در اسپارک باید متعادل باشند.

سپس به ایده پارتیشن بندی داده ها در کروموزوم ها رسیدم. 23 مورد از آنها وجود دارد (و اگر DNA میتوکندری و مناطق نقشه برداری نشده را در نظر بگیرید، چندین مورد دیگر وجود دارد).
این به شما امکان می دهد داده ها را به قطعات کوچکتر تقسیم کنید. اگر فقط یک خط به تابع صادرات Spark در اسکریپت Glue اضافه کنید partition_by = "chr"، سپس داده ها باید به سطل ها تقسیم شوند.

تجزیه 25 ترابایت با استفاده از AWK و R
ژنوم از قطعات متعددی به نام کروموزوم تشکیل شده است.

متاسفانه کار نکرد. کروموزوم ها اندازه های متفاوتی دارند که به معنای مقادیر متفاوتی از اطلاعات است. این بدان معنی است که وظایفی که اسپارک برای کارگران ارسال می کرد متعادل نبودند و به آرامی تکمیل می شدند زیرا برخی از گره ها زود به پایان می رسیدند و بیکار بودند. با این حال، وظایف تکمیل شد. اما هنگام درخواست یک SNP، عدم تعادل دوباره مشکلاتی را ایجاد کرد. هزینه پردازش SNP در کروموزوم های بزرگتر (یعنی جایی که می خواهیم داده ها را دریافت کنیم) تنها حدود 10 برابر کاهش یافته است. زیاد، اما کافی نیست.

اگر آن را به قسمت های کوچکتر تقسیم کنیم چه؟

چه آموخته ام: هرگز سعی نکنید 2,5 میلیون پارتیشن انجام دهید.

تصمیم گرفتم همه چیز را انجام دهم و هر SNP را پارتیشن بندی کنم. این اطمینان حاصل می کند که پارتیشن ها اندازه یکسانی دارند. این یک ایده بد بود. من از چسب استفاده کردم و یک خط بی گناه اضافه کردم partition_by = 'snp'. کار شروع شد و شروع به اجرا شد. یک روز بعد چک کردم دیدم هنوز چیزی روی S3 نوشته نشده است، بنابراین کار را کشتم. به نظر می رسد که Glue در حال نوشتن فایل های میانی در یک مکان مخفی در S3 بوده است، فایل های زیادی، شاید چند میلیون. در نتیجه اشتباه من بیش از هزار دلار هزینه داشت و مربی من را خوشحال نکرد.

پارتیشن بندی + مرتب سازی

چه آموخته ام: مرتب سازی همچنان دشوار است، مانند تنظیم Spark.

آخرین تلاش من برای پارتیشن بندی شامل تقسیم کروموزوم ها و مرتب سازی هر پارتیشن بود. در تئوری، این امر هر پرس و جو را سرعت می بخشد زیرا داده های SNP مورد نظر باید در چند قطعه پارکت در یک محدوده معین قرار می گرفتند. متأسفانه، مرتب سازی حتی داده های پارتیشن بندی شده کار دشواری بود. در نتیجه، من برای یک کلاستر سفارشی به EMR تغییر مکان دادم و از هشت نمونه قدرتمند (C5.4xl) و Sparklyr برای ایجاد یک گردش کاری انعطاف‌پذیرتر استفاده کردم.

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

... با این حال، کار هنوز تکمیل نشده است. من آن را به روش‌های مختلفی پیکربندی کردم: تخصیص حافظه را برای هر اجراکننده پرس‌وجو افزایش دادم، از گره‌هایی با مقدار زیادی حافظه استفاده کردم، از متغیرهای پخش (متغیرهای پخش) استفاده کردم، اما هر بار اینها نیمه اندازه‌گیری شدند و به تدریج مجری‌ها شروع به کار کردند. شکست خوردن تا زمانی که همه چیز متوقف شود.

دارم خلاق تر میشم

چه آموخته ام: گاهی اوقات داده های خاص راه حل های خاصی را می طلبد.

هر SNP یک مقدار موقعیت دارد. این عدد مربوط به تعداد بازها در امتداد کروموزوم آن است. این یک روش خوب و طبیعی برای سازماندهی داده های ما است. در ابتدا می خواستم بر اساس مناطق هر کروموزوم تقسیم بندی کنم. به عنوان مثال، موقعیت های 1 - 2000، 2001 - 4000 و غیره. اما مشکل این است که SNP ها به طور مساوی در بین کروموزوم ها توزیع نمی شوند، بنابراین اندازه گروه ها بسیار متفاوت خواهد بود.

تجزیه 25 ترابایت با استفاده از AWK و R

در نتیجه، من به تقسیم موقعیت ها به دسته ها (رتبه) رسیدم. با استفاده از داده‌های دانلود شده قبلی، درخواستی برای به دست آوردن فهرستی از SNP‌های منحصربه‌فرد، موقعیت‌ها و کروموزوم‌های آن‌ها ارائه کردم. سپس داده‌های درون هر کروموزوم را مرتب کردم و SNP‌ها را در گروه‌هایی با اندازه معین جمع‌آوری کردم. فرض کنید 1000 SNP هر کدام. این به من رابطه SNP به گروه در کروموزوم داد.

در پایان گروه های (bin) 75 SNP را ساختم که دلیل آن در ادامه توضیح داده می شود.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

ابتدا با اسپارک امتحان کنید

چه آموخته ام: تجمع جرقه سریع است، اما پارتیشن بندی هنوز گران است.

من می‌خواستم این قاب داده کوچک (2,5 میلیون ردیف) را در Spark بخوانم، آن را با داده‌های خام ترکیب کنم و سپس آن را با ستون جدید اضافه شده تقسیم کنم. bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

من استفاده کردم sdf_broadcast()، بنابراین Spark می داند که باید چارچوب داده را به همه گره ها ارسال کند. این در صورتی مفید است که داده ها از نظر اندازه کوچک باشند و برای همه وظایف مورد نیاز باشند. در غیر این صورت، Spark سعی می کند هوشمند باشد و داده ها را در صورت نیاز توزیع می کند که می تواند باعث کاهش سرعت شود.

و دوباره، ایده من کار نکرد: وظایف برای مدتی کار کرد، اتحادیه را تکمیل کرد، و سپس، مانند مجریانی که با پارتیشن بندی راه اندازی شدند، شروع به شکست کردند.

اضافه کردن AWK

چه آموخته ام: وقتی اصول اولیه را به شما می آموزند نخوابید. مطمئناً کسی قبلاً مشکل شما را در دهه 1980 حل کرده است.

تا این مرحله، دلیل تمام شکست های من با Spark به هم ریختگی داده ها در خوشه بود. شاید بتوان با پیش درمان وضعیت را بهبود بخشید. تصمیم گرفتم داده‌های متن خام را به ستون‌های کروموزوم تقسیم کنم، بنابراین امیدوار بودم که داده‌های «پیش‌پارتیشن‌شده» را به Spark ارائه دهم.

من در StackOverflow برای نحوه تقسیم بر اساس مقادیر ستون جستجو کردم و پیدا کردم چنین پاسخ عالی با AWK می توانید یک فایل متنی را با نوشتن آن در یک اسکریپت به جای ارسال نتایج به مقادیر ستون تقسیم کنید. stdout.

من یک اسکریپت Bash نوشتم تا آن را امتحان کنم. یکی از TSV های بسته بندی شده را دانلود کرد، سپس آن را با استفاده از بسته بندی باز کرد gzip و ارسال شد به awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

کار کرد!

پر کردن هسته ها

چه آموخته ام: gnu parallel - این یک چیز جادویی است، همه باید از آن استفاده کنند.

جدایی خیلی کند بود و وقتی شروع کردم htopبرای بررسی استفاده از یک نمونه قدرتمند (و گران قیمت) EC2، معلوم شد که من فقط از یک هسته و حدود 200 مگابایت حافظه استفاده می کردم. برای حل مشکل و از دست ندادن پول زیاد، باید دریابیم که چگونه کار را موازی کنیم. خوشبختانه، در یک کتاب کاملا شگفت انگیز علم داده در خط فرمان من فصلی از جرون یانسنز در مورد موازی سازی پیدا کردم. از آن یاد گرفتم gnu parallel، یک روش بسیار منعطف برای پیاده سازی multithreading در یونیکس.

تجزیه 25 ترابایت با استفاده از AWK و R
وقتی پارتیشن بندی را با استفاده از فرآیند جدید شروع کردم، همه چیز خوب بود، اما هنوز یک تنگنا وجود داشت - دانلود اشیاء S3 روی دیسک خیلی سریع نبود و کاملاً موازی نبود. برای رفع این مشکل، این کار را انجام دادم:

  1. متوجه شدم که می‌توان مرحله دانلود S3 را مستقیماً در خط لوله پیاده‌سازی کرد و فضای ذخیره‌سازی میانی روی دیسک را کاملاً حذف کرد. این بدان معنی است که می توانم از نوشتن داده های خام روی دیسک اجتناب کنم و از فضای ذخیره سازی کوچکتر و در نتیجه ارزانتر در AWS استفاده کنم.
  2. تیم aws configure set default.s3.max_concurrent_requests 50 تعداد رشته هایی را که AWS CLI استفاده می کند بسیار افزایش داد (به طور پیش فرض 10 عدد وجود دارد).
  3. من به یک نمونه EC2 که برای سرعت شبکه بهینه شده بود، با حرف n در نام تغییر کردم. من متوجه شده ام که از دست دادن قدرت پردازش در هنگام استفاده از n نمونه بیشتر از افزایش سرعت بارگذاری جبران می شود. برای اکثر کارها از c5n.4xl استفاده کردم.
  4. تغییر کرد gzip بر pigz، این یک ابزار gzip است که می تواند کارهای جالبی را برای موازی کردن کار اولیه غیر موازی فشرده سازی فایل ها انجام دهد (این کمترین کمک را به شما کمک کرد).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

این مراحل با یکدیگر ترکیب می شوند تا همه چیز خیلی سریع کار کند. با افزایش سرعت دانلود و حذف نوشتن دیسک، اکنون می توانم یک بسته 5 ترابایتی را تنها در چند ساعت پردازش کنم.

این توییت باید «TSV» را ذکر می کرد. افسوس.

استفاده از داده های تازه تجزیه شده

چه آموخته ام: Spark داده های فشرده نشده را دوست دارد و از ترکیب پارتیشن ها خوشش نمی آید.

اکنون داده ها در S3 در قالب بدون بسته (بخوانید: اشتراک گذاری شده) و نیمه مرتب شده بودند و من می توانستم دوباره به Spark برگردم. غافلگیری در انتظارم بود: باز هم نتوانستم به آنچه می خواستم برسم! بسیار دشوار بود که به Spark دقیقاً نحوه پارتیشن بندی داده ها را بگویید. و حتی وقتی این کار را انجام دادم، معلوم شد که پارتیشن های زیادی وجود دارد (95 هزار)، و زمانی که استفاده کردم coalesce تعداد آنها را به حد معقول کاهش داد، این پارتیشن بندی من را از بین برد. من مطمئن هستم که می توان این مشکل را برطرف کرد، اما بعد از چند روز جستجو نتوانستم راه حلی پیدا کنم. من در نهایت تمام وظایف را در Spark به پایان رساندم، اگرچه مدتی طول کشید و فایل های اسپلیت پارکت من خیلی کم نبود (~200 کیلوبایت). با این حال، داده ها در جایی بود که لازم بود.

تجزیه 25 ترابایت با استفاده از AWK و R
خیلی کوچک و ناهموار، فوق العاده است!

تست پرس و جوهای محلی Spark

چه آموخته ام: اسپارک هنگام حل مسائل ساده سربار زیادی دارد.

با دانلود داده ها در قالبی هوشمندانه، توانستم سرعت را آزمایش کنم. یک اسکریپت R را برای اجرای یک سرور محلی Spark تنظیم کنید و سپس یک قاب داده Spark را از ذخیره سازی گروه پارکت مشخص شده (bin) بارگیری کنید. من سعی کردم همه داده ها را بارگیری کنم اما نتوانستم Sparklyr پارتیشن بندی را تشخیص دهد.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

اعدام 29,415 ثانیه طول کشید. خیلی بهتر است، اما برای آزمایش انبوه هر چیزی خیلی خوب نیست. علاوه بر این، من نمی‌توانستم کارها را با کش سرعت دهم، زیرا وقتی سعی می‌کردم یک قاب داده را در حافظه پنهان کنم، Spark همیشه خراب می‌شد، حتی زمانی که بیش از 50 گیگابایت حافظه را به مجموعه داده‌ای با وزن کمتر از 15 اختصاص دادم.

بازگشت به AWK

چه آموخته ام: آرایه های انجمنی در AWK بسیار کارآمد هستند.

متوجه شدم که می توانم به سرعت های بالاتری برسم. من که در یک فوق العاده به یاد آورد آموزش AWK توسط بروس بارنت من در مورد یک ویژگی جالب به نام "آرایه های انجمنی" در اصل ، اینها جفت های کلید-مقدار هستند که به دلایلی در AWK به طور متفاوتی نامیده می شدند و بنابراین من به نوعی به آنها زیاد فکر نکردم. رومن چپلیاکا به یاد آورد که اصطلاح "آرایه های انجمنی" بسیار قدیمی تر از اصطلاح "جفت کلید-مقدار" است. حتی اگر شما کلید-مقدار را در Google Ngram جستجو کنید، این عبارت را در آنجا نخواهید دید، اما آرایه های انجمنی را خواهید یافت! علاوه بر این، "جفت کلید-مقدار" اغلب با پایگاه های داده مرتبط است، بنابراین مقایسه آن با نقشه هشم بسیار منطقی تر است. متوجه شدم که می‌توانم از این آرایه‌های انجمنی برای مرتبط کردن SNP‌های خود با جدول bin و داده‌های خام بدون استفاده از Spark استفاده کنم.

برای انجام این کار، در اسکریپت AWK از بلوک استفاده کردم BEGIN. این قطعه کدی است که قبل از ارسال اولین خط داده به بدنه اصلی اسکریپت اجرا می شود.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

تیم while(getline...) همه سطرها را از گروه CSV بارگیری کرد (bin)، ستون اول (نام SNP) را به عنوان کلید آرایه انجمنی تنظیم کرد. bin و مقدار دوم (گروه) به عنوان مقدار. سپس در بلوک { }، که در تمام خطوط فایل اصلی اجرا می شود، هر خط به فایل خروجی ارسال می شود که بسته به گروه خود (bin) یک نام منحصر به فرد دریافت می کند: ..._bin_"bin[$1]"_....

متغیرها batch_num и chunk_id با داده های ارائه شده توسط خط لوله مطابقت داشت، از شرایط مسابقه اجتناب کرد، و هر رشته اجرایی در حال اجرا بود parallel، در فایل منحصر به فرد خود نوشت.

از آنجایی که من تمام داده‌های خام را در پوشه‌های کروموزوم‌های باقی‌مانده از آزمایش قبلی خود با AWK پراکنده کردم، اکنون می‌توانم اسکریپت Bash دیگری برای پردازش یک کروموزوم در یک زمان بنویسم و ​​داده‌های پارتیشن بندی شده عمیق‌تری را به S3 ارسال کنم.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

فیلمنامه دو بخش دارد parallel.

در بخش اول، داده‌ها از تمام فایل‌های حاوی اطلاعات کروموزوم مورد نظر خوانده می‌شوند، سپس این داده‌ها در سراسر رشته‌ها توزیع می‌شوند که فایل‌ها را در گروه‌های مناسب (bin) توزیع می‌کنند. برای جلوگیری از شرایط مسابقه زمانی که چندین رشته در یک فایل می نویسند، AWK نام فایل ها را برای نوشتن داده ها به مکان های مختلف ارسال می کند، به عنوان مثال. chr_10_bin_52_batch_2_aa.csv. در نتیجه، بسیاری از فایل‌های کوچک روی دیسک ایجاد می‌شوند (برای این کار از حجم‌های EBS ترابایتی استفاده کردم).

نوار نقاله از بخش دوم parallel از گروه ها (bin) عبور می کند و فایل های فردی آنها را در CSV c مشترک ترکیب می کند catو سپس آنها را برای صادرات ارسال می کند.

پخش در R؟

چه آموخته ام: شما می توانید تماس بگیرید stdin и stdout از یک اسکریپت R، و بنابراین از آن در خط لوله استفاده کنید.

ممکن است متوجه این خط در اسکریپت Bash شده باشید: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... تمام فایل های گروهی به هم پیوسته (bin) را به اسکریپت R زیر ترجمه می کند. {} یک تکنیک خاص است parallel، که هر داده ای را که به جریان مشخص شده ارسال می کند مستقیماً در خود فرمان وارد می کند. گزینه {#} یک شناسه موضوع منحصر به فرد را ارائه می دهد و {%} شماره شکاف شغلی را نشان می دهد (تکرار شده، اما هرگز به طور همزمان). لیستی از همه گزینه ها را می توان در آن یافت مستندات.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

وقتی یک متغیر file("stdin") منتقل شده به readr::read_csv، داده های ترجمه شده به اسکریپت R در یک فریم بارگذاری می شود که سپس در فرم قرار می گیرد .rds-فایل با استفاده از aws.s3 مستقیماً روی S3 نوشته شده است.

RDS چیزی شبیه یک نسخه جوان پارکت است، بدون زواید فضای ذخیره‌سازی بلندگو.

پس از اتمام اسکریپت Bash من یک بسته نرم افزاری دریافت کردم .rds-فایل های واقع در S3، که به من امکان استفاده از فشرده سازی کارآمد و انواع داخلی را می دهد.

با وجود استفاده از ترمز R، همه چیز خیلی سریع کار کرد. جای تعجب نیست که بخش هایی از R که داده ها را می خوانند و می نویسند بسیار بهینه شده اند. پس از آزمایش بر روی یک کروموزوم با اندازه متوسط، کار در یک نمونه C5n.4xl در حدود دو ساعت تکمیل شد.

محدودیت های S3

چه آموخته ام: به لطف پیاده سازی مسیر هوشمند، S3 می تواند فایل های زیادی را مدیریت کند.

من نگران بودم که آیا S3 بتواند بسیاری از فایل‌هایی را که به آن منتقل می‌شوند مدیریت کند. من می‌توانم نام فایل‌ها را معنا کنم، اما S3 چگونه آنها را جستجو می‌کند؟

تجزیه 25 ترابایت با استفاده از AWK و R
پوشه ها در S3 فقط برای نمایش هستند، در واقع سیستم علاقه ای به نماد ندارد /. از صفحه پرسش و پاسخ S3.

به نظر می رسد که S3 مسیر یک فایل خاص را به عنوان یک کلید ساده در نوعی جدول هش یا پایگاه داده مبتنی بر سند نشان می دهد. یک سطل را می توان به عنوان یک جدول در نظر گرفت و فایل ها را می توان رکوردهای موجود در آن جدول در نظر گرفت.

از آنجایی که سرعت و کارایی برای کسب درآمد در آمازون مهم است، جای تعجب نیست که این سیستم مسیر کلیدی به‌عنوان یک فایل به‌شدت بهینه‌سازی شده است. سعی کردم تعادلی پیدا کنم: به طوری که مجبور نباشم درخواست های دریافت زیادی داشته باشم، اما درخواست ها به سرعت اجرا شوند. معلوم شد که بهتر است حدود 20 هزار فایل bin درست کنید. فکر می‌کنم اگر به بهینه‌سازی ادامه دهیم، می‌توانیم به افزایش سرعت برسیم (مثلاً یک سطل مخصوص فقط برای داده‌ها بسازیم و در نتیجه اندازه جدول جستجو را کاهش دهیم). اما زمان و هزینه ای برای آزمایش های بیشتر وجود نداشت.

در مورد سازگاری متقابل چطور؟

چیزی که یاد گرفتم: علت شماره یک اتلاف وقت، بهینه سازی روش ذخیره سازی پیش از موعد است.

در این مرحله، بسیار مهم است که از خود بپرسید: "چرا از یک فرمت فایل اختصاصی استفاده کنیم؟" دلیل آن در سرعت بارگذاری (بارگیری فایل های CSV زیپ شده ۷ برابر زمان بیشتری طول کشید) و سازگاری با گردش کار ما نهفته است. ممکن است در مورد اینکه R بتواند به راحتی فایل های Parquet (یا Arrow) را بدون بارگذاری Spark بارگذاری کند، تجدید نظر کنم. همه افراد در آزمایشگاه ما از R استفاده می کنند، و اگر من نیاز به تبدیل داده ها به فرمت دیگری داشته باشم، هنوز داده های متن اصلی را دارم، بنابراین می توانم دوباره خط لوله را اجرا کنم.

تقسیم کار

چه آموخته ام: سعی نکنید کارها را به صورت دستی بهینه کنید، بگذارید رایانه این کار را انجام دهد.

من جریان کار را در یک کروموزوم اشکال زدایی کرده ام، اکنون باید تمام داده های دیگر را پردازش کنم.
من می‌خواستم چندین نمونه EC2 را برای تبدیل بالا ببرم، اما در همان زمان می‌ترسیدم بار بسیار نامتعادلی را در کارهای پردازشی مختلف دریافت کنم (همانطور که Spark از پارتیشن‌های نامتعادل رنج می‌برد). علاوه بر این، من علاقه ای به افزایش یک نمونه در هر کروموزوم نداشتم، زیرا برای حساب های AWS محدودیت پیش فرض 10 نمونه وجود دارد.

سپس تصمیم گرفتم یک اسکریپت به زبان R بنویسم تا کارهای پردازشی را بهینه کنم.

ابتدا از S3 خواستم محاسبه کند که هر کروموزوم چقدر فضای ذخیره سازی را اشغال کرده است.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

سپس تابعی نوشتم که اندازه کل را می گیرد، ترتیب کروموزوم ها را به هم می زند، آنها را به گروه ها تقسیم می کند. num_jobs و به شما می گوید که اندازه همه کارهای پردازش چقدر متفاوت است.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

سپس هزار بار با استفاده از purrr دویدم و بهترین را انتخاب کردم.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

بنابراین من با مجموعه ای از کارها که از نظر اندازه بسیار شبیه بودند، به پایان رسیدم. سپس تنها چیزی که باقی ماند این بود که اسکریپت قبلی Bash را در یک حلقه بزرگ بپیچانم for. نوشتن این بهینه سازی حدود 10 دقیقه طول کشید. و این بسیار کمتر از آن چیزی است که من برای ایجاد دستی کارها در صورت نامتعادل بودن هزینه می کنم. بنابراین، من فکر می کنم که حق با این بهینه سازی اولیه بود.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

در پایان دستور shutdown را اضافه می کنم:

sudo shutdown -h now

... و همه چیز درست شد! با استفاده از AWS CLI، نمونه هایی را با استفاده از گزینه مطرح کردم user_data اسکریپت های Bash از وظایفشان را برای پردازش به آنها داد. آنها به طور خودکار اجرا می شوند و خاموش می شوند، بنابراین من برای قدرت پردازش اضافی پولی نمی پرداختم.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

بیایید بسته بندی کنیم!

چه آموخته ام: برای سهولت و انعطاف پذیری استفاده، API باید ساده باشد.

در نهایت من داده ها را در مکان و فرم مناسب دریافت کردم. تنها چیزی که باقی ماند این بود که فرآیند استفاده از داده ها را تا حد امکان ساده کنم تا کار را برای همکارانم آسان کنم. من می خواستم یک API ساده برای ایجاد پرس و جو بسازم. اگر در آینده تصمیم بگیرم از .rds به فایل های پارکت، پس این باید برای من مشکل داشته باشد، نه برای همکارانم. برای این کار تصمیم گرفتم یک بسته R داخلی بسازم.

یک بسته بسیار ساده را بسازید و مستند کنید که فقط شامل چند تابع دسترسی به داده است که حول یک تابع سازماندهی شده اند get_snp. یک سایت هم برای همکارانم درست کردم pkgdown، بنابراین آنها می توانند به راحتی نمونه ها و مستندات را ببینند.

تجزیه 25 ترابایت با استفاده از AWK و R

حافظه پنهان هوشمند

چه آموخته ام: اگر داده های شما به خوبی آماده شده باشد، کش کردن آسان خواهد بود!

از آنجایی که یکی از گردش‌های کاری اصلی همان مدل تحلیلی را در بسته SNP اعمال کرد، تصمیم گرفتم از binning به نفع خود استفاده کنم. هنگام انتقال داده ها از طریق SNP، تمام اطلاعات گروه (bin) به شی برگشتی متصل می شود. یعنی پرس و جوهای قدیمی می توانند (در تئوری) پردازش پرس و جوهای جدید را سرعت بخشند.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

هنگام ساخت بسته، معیارهای زیادی را برای مقایسه سرعت هنگام استفاده از روش‌های مختلف اجرا کردم. توصیه می کنم از این موضوع غافل نشوید، زیرا گاهی اوقات نتایج غیرمنتظره است. مثلا، dplyr::filter بسیار سریع‌تر از گرفتن ردیف‌ها با استفاده از فیلترینگ مبتنی بر نمایه‌سازی بود، و بازیابی یک ستون از یک قاب داده فیلتر شده بسیار سریع‌تر از استفاده از نحو نمایه‌سازی بود.

لطفا توجه داشته باشید که شی prev_snp_results حاوی کلید است snps_in_bin. این آرایه ای از تمام SNP های منحصر به فرد در یک گروه (bin) است که به شما این امکان را می دهد تا به سرعت بررسی کنید که آیا قبلاً داده هایی از یک پرس و جو قبلی دارید یا خیر. همچنین حلقه زدن تمام SNP های یک گروه (bin) را با این کد آسان می کند:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

یافته ها

اکنون می توانیم (و به طور جدی شروع به اجرای آن کرده ایم) مدل ها و سناریوهایی را اجرا کنیم که قبلاً برای ما غیرقابل دسترسی بودند. بهترین چیز این است که همکاران آزمایشگاهی من مجبور نیستند به هیچ عارضه ای فکر کنند. آنها فقط عملکردی دارند که کار می کند.

و اگرچه بسته جزئیات را در اختیار آنها قرار نمی دهد، من سعی کردم قالب داده را به اندازه کافی ساده کنم که اگر فردا ناگهان ناپدید شدم، بتوانند آن را بفهمند...

سرعت به طرز محسوسی افزایش یافته است. ما معمولاً قطعات ژنوم از نظر عملکردی مهم را اسکن می کنیم. قبلاً نمی‌توانستیم این کار را انجام دهیم (معلوم شد که خیلی گران است)، اما اکنون، به لطف ساختار گروه (bin) و حافظه پنهان، درخواست یک SNP به طور متوسط ​​کمتر از 0,1 ثانیه طول می‌کشد و میزان استفاده از داده‌ها بسیار زیاد است. کم است که هزینه های S3 بادام زمینی است.

نتیجه

این مقاله اصلاً راهنما نیست. معلوم شد که راه حل فردی است و تقریباً مطمئناً مطلوب نیست. بلکه سفرنامه است. می‌خواهم دیگران بفهمند که چنین تصمیم‌هایی در ذهن کاملاً شکل‌گرفته به نظر نمی‌رسند، آنها نتیجه آزمون و خطا هستند. همچنین، اگر به دنبال یک دانشمند داده هستید، به خاطر داشته باشید که استفاده از این ابزارها به طور موثر نیاز به تجربه دارد و تجربه هزینه دارد. من خوشحالم که امکانات پرداخت را داشتم، اما بسیاری دیگر که می توانند همان کار را بهتر از من انجام دهند، به دلیل کمبود پول هرگز فرصتی برای تلاش ندارند.

ابزارهای کلان داده همه کاره هستند. اگر وقت دارید، تقریباً مطمئناً می توانید با استفاده از تکنیک های تمیز کردن، ذخیره سازی و استخراج داده های هوشمند راه حل سریع تری بنویسید. در نهایت به تجزیه و تحلیل هزینه و فایده می رسد.

چیزی که یاد گرفتم:

  • هیچ راه ارزانی برای تجزیه 25 ترابایت در یک زمان وجود ندارد.
  • مراقب حجم فایل های پارکت و سازماندهی آنها باشید.
  • پارتیشن ها در Spark باید متعادل باشند.
  • به طور کلی هرگز سعی نکنید 2,5 میلیون پارتیشن بسازید.
  • مرتب سازی همچنان دشوار است، مانند راه اندازی Spark.
  • گاهی اوقات داده های خاص به راه حل های خاصی نیاز دارند.
  • تجمع جرقه سریع است، اما پارتیشن بندی هنوز گران است.
  • وقتی اصول را به شما می آموزند نخوابید، احتمالاً کسی قبلاً مشکل شما را در دهه 1980 حل کرده است.
  • gnu parallel - این یک چیز جادویی است، همه باید از آن استفاده کنند.
  • Spark داده های فشرده نشده را دوست دارد و از ترکیب پارتیشن ها خوشش نمی آید.
  • اسپارک هنگام حل مسائل ساده، سربار زیادی دارد.
  • آرایه های انجمنی AWK بسیار کارآمد هستند.
  • شما می توانید تماس بگیرید stdin и stdout از یک اسکریپت R، و بنابراین از آن در خط لوله استفاده کنید.
  • به لطف پیاده سازی مسیر هوشمند، S3 می تواند بسیاری از فایل ها را پردازش کند.
  • دلیل اصلی اتلاف وقت، بهینه سازی زودرس روش ذخیره سازی است.
  • سعی نکنید کارها را به صورت دستی بهینه کنید، اجازه دهید رایانه این کار را انجام دهد.
  • API باید برای سهولت و انعطاف پذیری استفاده ساده باشد.
  • اگر داده های شما به خوبی آماده شده باشد، ذخیره سازی در حافظه پنهان آسان خواهد بود!

منبع: www.habr.com

اضافه کردن نظر