تحليل 25 تيرابايت باستخدام AWK وR

تحليل 25 تيرابايت باستخدام AWK وR
كيف تقرأ هذا المقال: أعتذر عن كون النص طويلًا وفوضويًا. لتوفير الوقت، أبدأ كل فصل بمقدمة "ما تعلمته"، والتي تلخص جوهر الفصل في جملة أو جملتين.

"فقط أرني الحل!" إذا كنت تريد فقط أن تعرف من أين أتيت، فانتقل إلى الفصل "أن تصبح أكثر ابتكارًا"، لكنني أعتقد أنه من المثير للاهتمام والمفيد أن تقرأ عن الفشل.

تم تكليفي مؤخرًا بإعداد عملية لمعالجة كمية كبيرة من تسلسلات الحمض النووي الخام (من الناحية الفنية شريحة SNP). كانت الحاجة هي الحصول بسرعة على بيانات حول موقع جيني معين (يسمى SNP) للنمذجة اللاحقة والمهام الأخرى. باستخدام R وAWK، تمكنت من تنظيف البيانات وتنظيمها بطريقة طبيعية، مما أدى إلى تسريع عملية معالجة الاستعلامات بشكل كبير. لم يكن الأمر سهلاً بالنسبة لي وتطلب العديد من التكرارات. ستساعدك هذه المقالة على تجنب بعض أخطائي وستوضح لك ما انتهى بي الأمر إليه.

أولاً، بعض التوضيحات التمهيدية.

معطيات

زودنا مركز معالجة المعلومات الوراثية في جامعتنا ببيانات على شكل 25 تيرابايت من TSV. لقد استلمتها مقسمة إلى 5 حزم مضغوطة بصيغة Gzip، تحتوي كل منها على حوالي 240 ملفًا بحجم أربعة جيجابايت. يحتوي كل صف على بيانات لـ SNP واحد من فرد واحد. في المجمل، تم إرسال بيانات حول 2,5 مليون SNPs وحوالي 60 ألف شخص. بالإضافة إلى معلومات SNP، تحتوي الملفات على العديد من الأعمدة بأرقام تعكس خصائص مختلفة، مثل كثافة القراءة، وتكرار الأليلات المختلفة، وما إلى ذلك. في المجمل كان هناك حوالي 30 عمودًا بقيم فريدة.

الهدف

كما هو الحال مع أي مشروع لإدارة البيانات، كان الشيء الأكثر أهمية هو تحديد كيفية استخدام البيانات. في هذه الحالة سنقوم في الغالب باختيار النماذج وسير العمل لـ SNP بناءً على SNP. وهذا يعني أننا سنحتاج فقط إلى بيانات حول SNP واحد في كل مرة. كان عليّ أن أتعلم كيفية استرجاع جميع السجلات المرتبطة بواحد من 2,5 مليون من أشكال SNPs بسهولة وسرعة وبتكلفة منخفضة قدر الإمكان.

كيف لا تفعل هذا

على حد تعبير كليشيهات مناسبة:

لم أفشل ألف مرة، بل اكتشفت للتو ألف طريقة لتجنب تحليل مجموعة من البيانات بتنسيق سهل الاستخدام للاستعلام.

أول محاولة

ماذا تعلمت: لا توجد طريقة رخيصة لتحليل 25 تيرابايت في المرة الواحدة.

بعد أن التحقت بدورة "الأساليب المتقدمة لمعالجة البيانات الضخمة" في جامعة فاندربيلت، كنت على يقين من أن الخدعة كانت في متناول اليد. من المحتمل أن يستغرق الأمر ساعة أو ساعتين لإعداد خادم Hive ليتم تشغيله عبر جميع البيانات والإبلاغ عن النتيجة. وبما أن بياناتنا مخزنة في AWS S3، فقد استخدمت الخدمة أثينا، والذي يسمح لك بتطبيق استعلامات Hive SQL على بيانات S3. لا تحتاج إلى إعداد/رفع مجموعة Hive، كما أنك تدفع فقط مقابل البيانات التي تبحث عنها.

بعد أن عرضت على أثينا بياناتي وتنسيقها، أجريت بعض الاختبارات باستعلامات مثل هذا:

select * from intensityData limit 10;

وسرعان ما حصلت على نتائج جيدة التنظيم. مستعد.

حتى حاولنا استخدام البيانات في عملنا...

لقد طُلب مني سحب جميع معلومات SNP لاختبار النموذج عليه. قمت بتشغيل الاستعلام:


select * from intensityData 
where snp = 'rs123456';

...وبدأ بالانتظار. وبعد ثماني دقائق وأكثر من 4 تيرابايت من البيانات المطلوبة، تلقيت النتيجة. تتقاضى Athena رسومًا وفقًا لحجم البيانات التي يتم العثور عليها، وهي 5 دولارات لكل تيرابايت. لذا فإن هذا الطلب الواحد يكلف 20 دولارًا وثماني دقائق من الانتظار. ولتشغيل النموذج على كافة البيانات، كان علينا أن ننتظر 38 عاماً وندفع 50 مليون دولار. ومن الواضح أن هذا لم يكن مناسباً لنا.

وكان من الضروري استخدام الباركيه...

ماذا تعلمت: كن حذرا مع حجم ملفات الباركيه وتنظيمها.

لقد حاولت أولاً إصلاح الموقف عن طريق تحويل جميع ملفات TSV إلى ملفات الباركيه. وهي مناسبة للعمل مع مجموعات كبيرة من البيانات لأن المعلومات الموجودة فيها يتم تخزينها في شكل عمودي: كل عمود يقع في شريحة الذاكرة/القرص الخاصة به، على عكس الملفات النصية، حيث تحتوي الصفوف على عناصر من كل عمود. وإذا كنت بحاجة إلى العثور على شيء ما، فما عليك سوى قراءة العمود المطلوب. بالإضافة إلى ذلك، يقوم كل ملف بتخزين نطاق من القيم في عمود، لذلك إذا كانت القيمة التي تبحث عنها ليست في نطاق العمود، فلن يضيع Spark الوقت في فحص الملف بأكمله.

لقد قمت بمهمة بسيطة غراء AWS لتحويل ملفات TSV الخاصة بنا إلى Parquet وإسقاط الملفات الجديدة في Athena. استغرق الأمر حوالي 5 ساعات. ولكن عندما قمت بتشغيل الطلب، استغرق إكماله نفس القدر من الوقت وقليلًا من المال. الحقيقة هي أن Spark، في محاولة لتحسين المهمة، قامت ببساطة بتفكيك قطعة TSV واحدة ووضعها في قطعة الباركيه الخاصة بها. ولأن كل قطعة كانت كبيرة بما يكفي لاحتواء السجلات الكاملة للعديد من الأشخاص، كان كل ملف يحتوي على جميع أشكال SNP، لذلك كان على Spark فتح جميع الملفات لاستخراج المعلومات التي تحتاجها.

ومن المثير للاهتمام، أن نوع الضغط الافتراضي (والموصى به) لـ Parquet، وهو سريع، غير قابل للتقسيم. ولذلك، كان كل منفذ عالقًا في مهمة تفريغ وتنزيل مجموعة البيانات الكاملة البالغة 3,5 جيجابايت.

تحليل 25 تيرابايت باستخدام AWK وR

دعونا نفهم المشكلة

ماذا تعلمت: الفرز أمر صعب، خاصة إذا كانت البيانات موزعة.

بدا لي أنني فهمت الآن جوهر المشكلة. كنت بحاجة فقط لفرز البيانات حسب عمود SNP، وليس حسب الأشخاص. بعد ذلك، سيتم تخزين العديد من أشكال SNP في مجموعة بيانات منفصلة، ​​وبعد ذلك ستظهر وظيفة Parquet "الذكية" "التي تفتح فقط إذا كانت القيمة في النطاق" نفسها بكل مجدها. لسوء الحظ، ثبت أن فرز مليارات الصفوف المنتشرة عبر مجموعة ما كان مهمة صعبة.

من المؤكد أن AWS لا ترغب في إصدار استرداد بسبب السبب "أنا طالب مشتت". بعد أن قمت بإجراء الفرز على Amazon Glue، استمر لمدة يومين وتعطل.

ماذا عن التقسيم؟

ماذا تعلمت: يجب أن تكون الأقسام في Spark متوازنة.

ثم خطرت ببالي فكرة تقسيم البيانات في الكروموسومات. هناك 23 منها (والعديد منها إذا أخذت في الاعتبار الحمض النووي للميتوكوندريا والمناطق غير المعينة).
سيسمح لك ذلك بتقسيم البيانات إلى أجزاء أصغر. إذا قمت بإضافة سطر واحد فقط إلى وظيفة التصدير Spark في البرنامج النصي Glue partition_by = "chr"، فيجب تقسيم البيانات إلى مجموعات.

تحليل 25 تيرابايت باستخدام AWK وR
يتكون الجينوم من أجزاء عديدة تسمى الكروموسومات.

لسوء الحظ، لم ينجح الأمر. للكروموسومات أحجام مختلفة، مما يعني كميات مختلفة من المعلومات. وهذا يعني أن المهام التي أرسلتها Spark إلى العمال لم تكن متوازنة وتم إكمالها ببطء لأن بعض العقد انتهت مبكرًا وكانت خاملة. ومع ذلك، تم الانتهاء من المهام. ولكن عند طلب SNP واحد، تسبب عدم التوازن في حدوث مشكلات مرة أخرى. لقد انخفضت تكلفة معالجة تعدد الأشكال على الكروموسومات الأكبر حجمًا (أي المكان الذي نريد الحصول على البيانات منه) بمقدار 10 فقط. الكثير، ولكن ليس بما فيه الكفاية.

ماذا لو قسمناها إلى أجزاء أصغر؟

ماذا تعلمت: لا تحاول أبدًا عمل 2,5 مليون قسم على الإطلاق.

قررت أن أبذل قصارى جهدي وأقسم كل SNP. هذا يضمن أن الأقسام كانت متساوية الحجم. لقد كانت فكرة سيئة. لقد استخدمت الغراء وأضفت خطًا بريئًا partition_by = 'snp'. بدأت المهمة وبدأت في تنفيذها. وبعد يوم واحد، تحققت ورأيت أنه لا يوجد أي شيء مكتوب إلى S3، لذلك أنهيت المهمة. يبدو أن Glue كان يكتب ملفات وسيطة إلى موقع مخفي في S3، الكثير من الملفات، ربما بضعة ملايين. ونتيجة لذلك، كلف خطأي أكثر من ألف دولار ولم يرضي معلمي.

التقسيم + الفرز

ماذا تعلمت: لا يزال الفرز صعبًا، كما هو الحال مع ضبط سبارك.

محاولتي الأخيرة للتقسيم تضمنت تقسيم الكروموسومات ثم فرز كل قسم. من الناحية النظرية، سيؤدي هذا إلى تسريع كل استعلام لأن بيانات SNP المطلوبة يجب أن تكون ضمن عدد قليل من قطع الباركيه ضمن نطاق معين. لسوء الحظ، تبين أن فرز البيانات المقسمة مهمة صعبة. ونتيجة لذلك، قمت بالتبديل إلى EMR لمجموعة مخصصة واستخدمت ثماني مثيلات قوية (C5.4xl) وSparklyr لإنشاء سير عمل أكثر مرونة...

# Sparklyr snippet to partition by chr and sort w/in partition
# Join the raw data with the snp bins
raw_data
  group_by(chr) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr')
  )

...ومع ذلك، فإن المهمة لم تكتمل بعد. لقد قمت بتكوينه بطرق مختلفة: زيادة تخصيص الذاكرة لكل منفذ استعلام، واستخدام العقد ذات كمية كبيرة من الذاكرة، واستخدام متغيرات البث (متغيرات البث)، ولكن في كل مرة تبين أن هذه أنصاف التدابير، وبدأ المنفذون تدريجيًا بالفشل حتى يتوقف كل شيء.

لقد أصبحت أكثر إبداعًا

ماذا تعلمت: في بعض الأحيان تتطلب البيانات الخاصة حلولاً خاصة.

كل SNP له قيمة موضع. وهذا رقم يتوافق مع عدد القواعد الموجودة على طول الكروموسوم الخاص به. هذه طريقة لطيفة وطبيعية لتنظيم بياناتنا. في البداية كنت أرغب في التقسيم حسب مناطق كل كروموسوم. على سبيل المثال، المواضع 1 - 2000، 2001 - 4000، إلخ. لكن المشكلة هي أن تعدد الأشكال (SNPs) لا يتم توزيعه بالتساوي عبر الكروموسومات، وبالتالي فإن أحجام المجموعة ستختلف بشكل كبير.

تحليل 25 تيرابايت باستخدام AWK وR

ونتيجة لذلك، توصلت إلى تقسيم المناصب إلى فئات (رتبة). باستخدام البيانات التي تم تنزيلها بالفعل، قمت بتنفيذ طلب للحصول على قائمة بأشكال SNP الفريدة ومواقعها وكروموسوماتها. ثم قمت بفرز البيانات الموجودة داخل كل كروموسوم وجمعت أشكال SNP في مجموعات (صندوق) بحجم معين. لنفترض أن كل 1000 SNPs. لقد أعطاني هذا العلاقة بين SNP والمجموعة لكل كروموسوم.

في النهاية، قمت بإنشاء مجموعات (bin) مكونة من 75 SNPs، وسيتم شرح السبب أدناه.

snp_to_bin <- unique_snps %>% 
  group_by(chr) %>% 
  arrange(position) %>% 
  mutate(
    rank = 1:n()
    bin = floor(rank/snps_per_bin)
  ) %>% 
  ungroup()

حاول أولاً مع Spark

ماذا تعلمت: تجميع الشرارة سريع، ولكن التقسيم لا يزال مكلفًا.

كنت أرغب في قراءة إطار البيانات الصغير هذا (2,5 مليون صف) في Spark، ودمجه مع البيانات الأولية، ثم تقسيمه حسب العمود المضاف حديثًا bin.


# Join the raw data with the snp bins
data_w_bin <- raw_data %>%
  left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%
  group_by(chr_bin) %>%
  arrange(Position) %>% 
  Spark_write_Parquet(
    path = DUMP_LOC,
    mode = 'overwrite',
    partition_by = c('chr_bin')
  )

انا إستعملت sdf_broadcast()لذلك تعلم Spark أنه يجب عليها إرسال إطار البيانات إلى جميع العقد. يكون هذا مفيدًا إذا كانت البيانات صغيرة الحجم ومطلوبة لجميع المهام. وبخلاف ذلك، تحاول Spark أن تكون ذكية وتقوم بتوزيع البيانات حسب الحاجة، مما قد يتسبب في حدوث تباطؤ.

ومرة أخرى، لم تنجح فكرتي: لقد عملت المهام لبعض الوقت، وأكملت الاتحاد، وبعد ذلك، مثل المنفذين الذين أطلقوا التقسيم، بدأوا بالفشل.

إضافة أوك

ماذا تعلمت: لا تنام عندما تتعلم الأساسيات. من المؤكد أن شخصًا ما قد حل مشكلتك بالفعل في الثمانينات.

حتى هذه اللحظة، كان السبب وراء كل إخفاقاتي مع Spark هو خليط البيانات في المجموعة. ربما يمكن تحسين الوضع مع العلاج المسبق. قررت أن أحاول تقسيم بيانات النص الخام إلى أعمدة من الكروموسومات، لذلك كنت آمل أن أزود Spark ببيانات "مقسمة مسبقًا".

لقد بحثت في StackOverflow عن كيفية التقسيم حسب قيم الأعمدة ووجدت هذه إجابة عظيمة. باستخدام AWK يمكنك تقسيم ملف نصي حسب قيم الأعمدة عن طريق كتابته في برنامج نصي بدلاً من إرسال النتائج إليه stdout.

لقد كتبت نص Bash لتجربته. قمت بتنزيل أحد حزم TSV، ثم قمت بفك ضغطها باستخدام gzip وأرسلت إلى awk.

gzip -dc path/to/chunk/file.gz |
awk -F 't' 
'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

انها عملت!

ملء النوى

ماذا تعلمت: gnu parallel - إنه شيء سحري، يجب على الجميع استخدامه.

كان الانفصال بطيئًا جدًا وعندما بدأت htopللتحقق من استخدام مثيل EC2 القوي (والمكلف)، اتضح أنني كنت أستخدم نواة واحدة فقط وحوالي 200 ميجابايت من الذاكرة. لحل المشكلة وعدم خسارة الكثير من المال، كان علينا معرفة كيفية موازنة العمل. ولحسن الحظ، في كتاب رائع للغاية علم البيانات في سطر الأوامر لقد وجدت فصلاً لجيرون يانسنس حول الموازاة. منه تعلمت عنه gnu parallel، طريقة مرنة جدًا لتنفيذ تعدد العمليات في Unix.

تحليل 25 تيرابايت باستخدام AWK وR
عندما بدأت التقسيم باستخدام العملية الجديدة، كان كل شيء على ما يرام، ولكن لا يزال هناك عنق الزجاجة - لم يكن تنزيل كائنات S3 على القرص سريعًا جدًا ولم يكن متوازيًا تمامًا. لإصلاح هذا، فعلت هذا:

  1. اكتشفت أنه من الممكن تنفيذ مرحلة تنزيل S3 مباشرة في المسار، مما يؤدي إلى التخلص تمامًا من التخزين الوسيط على القرص. وهذا يعني أنه يمكنني تجنب كتابة البيانات الأولية على القرص واستخدام مساحة تخزين أصغر، وبالتالي أرخص، على AWS.
  2. فريق aws configure set default.s3.max_concurrent_requests 50 زاد بشكل كبير عدد سلاسل الرسائل التي يستخدمها AWS CLI (افتراضيًا هناك 10).
  3. لقد قمت بالتبديل إلى مثيل EC2 المحسّن لسرعة الشبكة، مع الحرف n في الاسم. لقد وجدت أن فقدان قوة المعالجة عند استخدام مثيلات n يتم تعويضه عن طريق زيادة سرعة التحميل. بالنسبة لمعظم المهام، استخدمت c5n.4xl.
  4. تغير gzip في pigz، هذه أداة gzip يمكنها القيام بأشياء رائعة لموازاة المهمة غير المتوازية في البداية المتمثلة في فك ضغط الملفات (وهذا ساعد على الأقل).

# Let S3 use as many threads as it wants
aws configure set default.s3.max_concurrent_requests 50

for chunk_file in $(aws s3 ls $DATA_LOC | awk '{print $4}' | grep 'chr'$DESIRED_CHR'.csv') ; do

        aws s3 cp s3://$batch_loc$chunk_file - |
        pigz -dc |
        parallel --block 100M --pipe  
        "awk -F 't' '{print $1",..."$30">"chunked/{#}_chr"$15".csv"}'"

       # Combine all the parallel process chunks to single files
        ls chunked/ |
        cut -d '_' -f 2 |
        sort -u |
        parallel 'cat chunked/*_{} | sort -k5 -n -S 80% -t, | aws s3 cp - '$s3_dest'/batch_'$batch_num'_{}'
        
         # Clean up intermediate data
       rm chunked/*
done

يتم دمج هذه الخطوات مع بعضها البعض لجعل كل شيء يعمل بسرعة كبيرة. ومن خلال زيادة سرعات التنزيل والقضاء على عمليات الكتابة على القرص، أصبح بإمكاني الآن معالجة حزمة بحجم 5 تيرابايت في غضون ساعات قليلة.

كان ينبغي لهذه التغريدة أن تذكر "TSV". واحسرتاه.

باستخدام البيانات التي تم تحليلها حديثا

ماذا تعلمت: يحب Spark البيانات غير المضغوطة ولا يحب دمج الأقسام.

أصبحت البيانات الآن في S3 بتنسيق غير مضغوط (اقرأ: مشترك) وشبه مرتب، ويمكنني العودة إلى Spark مرة أخرى. وكانت المفاجأة تنتظرني: لقد فشلت مرة أخرى في تحقيق ما أردت! كان من الصعب جدًا إخبار Spark بالضبط بكيفية تقسيم البيانات. وحتى عندما فعلت ذلك، اتضح أن هناك الكثير من الأقسام (95 ألف)، وعندما استخدمتها coalesce خفضت عددهم إلى حدود معقولة، وهذا دمر تقسيمي. أنا متأكد من إمكانية حل هذه المشكلة، ولكن بعد يومين من البحث لم أتمكن من العثور على حل. لقد انتهيت أخيرًا من جميع المهام في Spark، على الرغم من أن الأمر استغرق بعض الوقت ولم تكن ملفات Parquet المنقسمة صغيرة جدًا (حوالي 200 كيلوبايت). ومع ذلك، كانت البيانات حيث كانت هناك حاجة إليها.

تحليل 25 تيرابايت باستخدام AWK وR
صغيرة جدًا وغير متساوية، رائعة!

اختبار استعلامات Spark المحلية

ماذا تعلمت: لدى Spark الكثير من النفقات العامة عند حل المشكلات البسيطة.

ومن خلال تنزيل البيانات بتنسيق ذكي، تمكنت من اختبار السرعة. قم بإعداد برنامج نصي R لتشغيل خادم Spark محلي، ثم قم بتحميل إطار بيانات Spark من مخزن مجموعة Parquet المحدد (صندوق). لقد حاولت تحميل جميع البيانات ولكن لم أتمكن من جعل Sparklyr يتعرف على التقسيم.

sc <- Spark_connect(master = "local")

desired_snp <- 'rs34771739'

# Start a timer
start_time <- Sys.time()

# Load the desired bin into Spark
intensity_data <- sc %>% 
  Spark_read_Parquet(
    name = 'intensity_data', 
    path = get_snp_location(desired_snp),
    memory = FALSE )

# Subset bin to snp and then collect to local
test_subset <- intensity_data %>% 
  filter(SNP_Name == desired_snp) %>% 
  collect()

print(Sys.time() - start_time)

استغرق التنفيذ 29,415 ثانية. أفضل بكثير، لكنه ليس جيدًا للاختبار الشامل لأي شيء. بالإضافة إلى ذلك، لم أتمكن من تسريع الأمور باستخدام التخزين المؤقت لأنه عندما حاولت تخزين إطار بيانات مؤقتًا في الذاكرة، كان Spark يتعطل دائمًا، حتى عندما خصصت أكثر من 50 جيجابايت من الذاكرة لمجموعة بيانات يقل وزنها عن 15.

العودة إلى أوك

ماذا تعلمت: المصفوفات الترابطية في AWK فعالة للغاية.

أدركت أنه يمكنني تحقيق سرعات أعلى. تذكرت ذلك بطريقة رائعة البرنامج التعليمي AWK بواسطة بروس بارنيت قرأت عن ميزة رائعة تسمى "المصفوفات الترابطية" في الأساس، هذه هي أزواج القيمة الرئيسية، والتي لسبب ما تم استدعاؤها بشكل مختلف في AWK، وبالتالي لم أفكر كثيرًا فيها بطريقة أو بأخرى. رومان تشيبلياكا أشار إلى أن مصطلح "المصفوفات الترابطية" أقدم بكثير من مصطلح "زوج القيمة الرئيسية". حتى لو كنت ابحث عن القيمة الرئيسية في Google Ngram، لن ترى هذا المصطلح هناك، ولكنك ستجد صفائف ترابطية! بالإضافة إلى ذلك، غالبًا ما يرتبط "زوج القيمة الرئيسية" بقواعد البيانات، لذلك من المنطقي أكثر مقارنته بالهاشماب. أدركت أنه يمكنني استخدام هذه المصفوفات الترابطية لربط تعدد الأشكال (SNP) الخاص بي بجدول سلة وبيانات أولية دون استخدام Spark.

للقيام بذلك، في البرنامج النصي AWK استخدمت الكتلة BEGIN. هذا جزء من التعليمات البرمجية يتم تنفيذه قبل تمرير السطر الأول من البيانات إلى النص الرئيسي للبرنامج النصي.

join_data.awk
BEGIN {
  FS=",";
  batch_num=substr(chunk,7,1);
  chunk_id=substr(chunk,15,2);
  while(getline < "snp_to_bin.csv") {bin[$1] = $2}
}
{
  print $0 > "chunked/chr_"chr"_bin_"bin[$1]"_"batch_num"_"chunk_id".csv"
}

فريق while(getline...) تحميل جميع الصفوف من مجموعة CSV (bin)، قم بتعيين العمود الأول (اسم SNP) كمفتاح للمصفوفة النقابية bin والقيمة الثانية (المجموعة) كقيمة. ثم في الكتلة { }، والذي يتم تنفيذه على جميع أسطر الملف الرئيسي، يتم إرسال كل سطر إلى ملف الإخراج، الذي يتلقى اسمًا فريدًا اعتمادًا على مجموعته (bin): ..._bin_"bin[$1]"_....

المتغيرات batch_num и chunk_id مطابقة البيانات المقدمة من خط الأنابيب، وتجنب حالة السباق، وتشغيل كل مؤشر ترابط تنفيذ parallel، كتب إلى ملف فريد خاص به.

نظرًا لأنني قمت بتوزيع جميع البيانات الأولية في مجلدات على الكروموسومات المتبقية من تجربتي السابقة مع AWK، يمكنني الآن كتابة نص Bash آخر لمعالجة كروموسوم واحد في كل مرة وإرسال بيانات مقسمة أعمق إلى S3.

DESIRED_CHR='13'

# Download chromosome data from s3 and split into bins
aws s3 ls $DATA_LOC |
awk '{print $4}' |
grep 'chr'$DESIRED_CHR'.csv' |
parallel "echo 'reading {}'; aws s3 cp "$DATA_LOC"{} - | awk -v chr=""$DESIRED_CHR"" -v chunk="{}" -f split_on_chr_bin.awk"

# Combine all the parallel process chunks to single files and upload to rds using R
ls chunked/ |
cut -d '_' -f 4 |
sort -u |
parallel "echo 'zipping bin {}'; cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R '$S3_DEST'/chr_'$DESIRED_CHR'_bin_{}.rds"
rm chunked/*

يحتوي البرنامج النصي على قسمين parallel.

في القسم الأول تتم قراءة البيانات من كافة الملفات التي تحتوي على معلومات عن الكروموسوم المطلوب، ثم يتم توزيع هذه البيانات عبر الخيوط التي تقوم بتوزيع الملفات على المجموعات المناسبة (bin). لتجنب حالات السباق عندما تكتب عدة سلاسل رسائل إلى نفس الملف، يقوم AWK بتمرير أسماء الملفات لكتابة البيانات إلى أماكن مختلفة، على سبيل المثال. chr_10_bin_52_batch_2_aa.csv. نتيجة لذلك، يتم إنشاء العديد من الملفات الصغيرة على القرص (لهذا استخدمت وحدات تخزين EBS تيرابايت).

الناقل من القسم الثاني parallel يمر عبر المجموعات (bin) ويجمع ملفاتها الفردية في ملف CSV مشترك catومن ثم يرسلها للتصدير.

البث في R؟

ماذا تعلمت: بامكانك الاتصال stdin и stdout من برنامج نصي R، وبالتالي استخدامه في خط الأنابيب.

ربما لاحظت هذا السطر في برنامج Bash النصي الخاص بك: ...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R.... يقوم بترجمة جميع ملفات المجموعة المتسلسلة (bin) إلى البرنامج النصي R أدناه. {} هي تقنية خاصة parallel، والذي يقوم بإدراج أي بيانات يرسلها إلى الدفق المحدد مباشرة في الأمر نفسه. خيار {#} يوفر معرف مؤشر ترابط فريد، و {%} يمثل رقم فتحة المهمة (متكرر، ولكن ليس في وقت واحد أبدًا). يمكن العثور على قائمة بجميع الخيارات في توثيق.

#!/usr/bin/env Rscript
library(readr)
library(aws.s3)

# Read first command line argument
data_destination <- commandArgs(trailingOnly = TRUE)[1]

data_cols <- list(SNP_Name = 'c', ...)

s3saveRDS(
  read_csv(
        file("stdin"), 
        col_names = names(data_cols),
        col_types = data_cols 
    ),
  object = data_destination
)

عندما يكون متغير file("stdin") تنتقل إلى readr::read_csv، يتم تحميل البيانات المترجمة إلى البرنامج النصي R في إطار، ثم يتم وضعها في النموذج .rds-ملف باستخدام aws.s3 مكتوبة مباشرة إلى S3.

يشبه RDS إصدارًا صغيرًا من Parquet، بدون زخرفة تخزين السماعات.

بعد الانتهاء من نص Bash حصلت على حزمة .rds-الملفات الموجودة في S3، والتي سمحت لي باستخدام الضغط الفعال والأنواع المضمنة.

على الرغم من استخدام الفرامل R، كل شيء يعمل بسرعة كبيرة. ليس من المستغرب أن تكون أجزاء R التي تقرأ البيانات وتكتبها محسنة للغاية. بعد الاختبار على كروموسوم واحد متوسط ​​الحجم، اكتملت المهمة على مثيل C5n.4xl في حوالي ساعتين.

قيود S3

ماذا تعلمت: بفضل تطبيق المسار الذكي، يستطيع S3 التعامل مع العديد من الملفات.

كنت قلقًا بشأن ما إذا كان S3 سيكون قادرًا على التعامل مع الملفات العديدة التي تم نقلها إليه. يمكنني أن أجعل أسماء الملفات منطقية، ولكن كيف سيبحث عنها S3؟

تحليل 25 تيرابايت باستخدام AWK وR
المجلدات الموجودة في S3 مخصصة للعرض فقط، وفي الواقع فإن النظام غير مهتم بالرمز /. من صفحة الأسئلة الشائعة الخاصة بـ S3.

يبدو أن S3 يمثل المسار إلى ملف معين كمفتاح بسيط في نوع من جدول التجزئة أو قاعدة البيانات المستندة إلى المستندات. يمكن اعتبار الدلو بمثابة جدول، ويمكن اعتبار الملفات سجلات في هذا الجدول.

نظرًا لأن السرعة والكفاءة مهمتان لتحقيق الربح في أمازون، فليس من المستغرب أن يكون نظام مسار المفتاح كملف هذا محسّنًا بشكل غريب. حاولت إيجاد التوازن: بحيث لا أضطر إلى تقديم الكثير من طلبات الحصول، ولكن يتم تنفيذ الطلبات بسرعة. اتضح أنه من الأفضل إنشاء حوالي 20 ألف ملف بن. أعتقد أنه إذا واصلنا التحسين، فيمكننا تحقيق زيادة في السرعة (على سبيل المثال، إنشاء مجموعة خاصة للبيانات فقط، وبالتالي تقليل حجم جدول البحث). لكن لم يكن هناك وقت أو مال لإجراء المزيد من التجارب.

ماذا عن التوافق المتقاطع؟

ما تعلمته: السبب الأول لإهدار الوقت هو تحسين طريقة التخزين لديك قبل الأوان.

في هذه المرحلة، من المهم جدًا أن تسأل نفسك: "لماذا نستخدم تنسيق ملف خاصًا؟" ويكمن السبب في سرعة التحميل (استغرق تحميل ملفات بتنسيق gzipped CSV وقتًا أطول بـ 7 مرات) والتوافق مع سير العمل لدينا. قد أعيد النظر فيما إذا كان بإمكان R تحميل ملفات Parquet (أو Arrow) بسهولة دون تحميل Spark. يستخدم كل شخص في مختبرنا لغة R، وإذا كنت بحاجة إلى تحويل البيانات إلى تنسيق آخر، فلا يزال لدي البيانات النصية الأصلية، لذا يمكنني تشغيل المسار مرة أخرى.

تقسيم العمل

ماذا تعلمت: لا تحاول تحسين المهام يدويًا، دع الكمبيوتر يقوم بذلك.

لقد قمت بتصحيح سير العمل على كروموسوم واحد، والآن أحتاج إلى معالجة جميع البيانات الأخرى.
كنت أرغب في رفع العديد من مثيلات EC2 للتحويل، ولكن في الوقت نفسه كنت خائفًا من الحصول على حمل غير متوازن للغاية عبر مهام المعالجة المختلفة (تمامًا كما عانت Spark من الأقسام غير المتوازنة). بالإضافة إلى ذلك، لم أكن مهتمًا برفع مثيل واحد لكل كروموسوم، لأنه بالنسبة لحسابات AWS هناك حد افتراضي يبلغ 10 مثيلات.

ثم قررت أن أكتب نصًا بلغة R لتحسين مهام المعالجة.

أولاً، طلبت من S3 حساب مقدار مساحة التخزين التي يشغلها كل كروموسوم.

library(aws.s3)
library(tidyverse)

chr_sizes <- get_bucket_df(
  bucket = '...', prefix = '...', max = Inf
) %>% 
  mutate(Size = as.numeric(Size)) %>% 
  filter(Size != 0) %>% 
  mutate(
    # Extract chromosome from the file name 
    chr = str_extract(Key, 'chr.{1,4}.csv') %>%
             str_remove_all('chr|.csv')
  ) %>% 
  group_by(chr) %>% 
  summarise(total_size = sum(Size)/1e+9) # Divide to get value in GB



# A tibble: 27 x 2
   chr   total_size
   <chr>      <dbl>
 1 0           163.
 2 1           967.
 3 10          541.
 4 11          611.
 5 12          542.
 6 13          364.
 7 14          375.
 8 15          372.
 9 16          434.
10 17          443.
# … with 17 more rows

ثم كتبت دالة تأخذ الحجم الإجمالي، وتخلط ترتيب الكروموسومات، وتقسمها إلى مجموعات num_jobs ويخبرك بمدى اختلاف أحجام جميع مهام المعالجة.

num_jobs <- 7
# How big would each job be if perfectly split?
job_size <- sum(chr_sizes$total_size)/7

shuffle_job <- function(i){
  chr_sizes %>%
    sample_frac() %>% 
    mutate(
      cum_size = cumsum(total_size),
      job_num = ceiling(cum_size/job_size)
    ) %>% 
    group_by(job_num) %>% 
    summarise(
      job_chrs = paste(chr, collapse = ','),
      total_job_size = sum(total_size)
    ) %>% 
    mutate(sd = sd(total_job_size)) %>% 
    nest(-sd)
}

shuffle_job(1)



# A tibble: 1 x 2
     sd data            
  <dbl> <list>          
1  153. <tibble [7 × 3]>

ثم أجريت آلاف المراوغات باستخدام Purrr واخترت الأفضل.

1:1000 %>% 
  map_df(shuffle_job) %>% 
  filter(sd == min(sd)) %>% 
  pull(data) %>% 
  pluck(1)

لذلك انتهى بي الأمر بمجموعة من المهام المتشابهة جدًا في الحجم. ثم كل ما تبقى هو تغليف نص Bash السابق الخاص بي في حلقة كبيرة for. استغرق هذا التحسين حوالي 10 دقائق للكتابة. وهذا أقل بكثير مما كنت سأنفقه على إنشاء المهام يدويًا إذا كانت غير متوازنة. ولذلك، أعتقد أنني كنت على حق في هذا التحسين الأولي.

for DESIRED_CHR in "16" "9" "7" "21" "MT"
do
# Code for processing a single chromosome
fi

في النهاية أقوم بإضافة أمر إيقاف التشغيل:

sudo shutdown -h now

... وكل شيء سار على ما يرام! باستخدام AWS CLI، قمت برفع المثيلات باستخدام الخيار user_data أعطاهم باش سكريبتات لمهامهم للمعالجة. لقد تم تشغيلها وإيقاف تشغيلها تلقائيًا، لذلك لم أدفع مقابل قوة المعالجة الإضافية.

aws ec2 run-instances ...
--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" 
--user-data file://<<job_script_loc>>

دعونا حزمة!

ماذا تعلمت: يجب أن تكون واجهة برمجة التطبيقات (API) بسيطة من أجل سهولة ومرونة الاستخدام.

وأخيراً حصلت على البيانات في المكان والشكل المناسبين. كل ما بقي هو تبسيط عملية استخدام البيانات قدر الإمكان لتسهيل الأمر على زملائي. كنت أرغب في إنشاء واجهة برمجة تطبيقات بسيطة لإنشاء الطلبات. إذا قررت في المستقبل التبديل من .rds لملفات الباركيه، فيجب أن تكون هذه مشكلة بالنسبة لي، وليس لزملائي. لهذا قررت إنشاء حزمة R داخلية.

قم ببناء وتوثيق حزمة بسيطة جدًا تحتوي فقط على عدد قليل من وظائف الوصول إلى البيانات المنظمة حول وظيفة ما get_snp. لقد قمت أيضًا بإنشاء موقع ويب لزملائي com.pkgdown، حتى يتمكنوا من رؤية الأمثلة والوثائق بسهولة.

تحليل 25 تيرابايت باستخدام AWK وR

التخزين المؤقت الذكي

ماذا تعلمت: إذا تم إعداد بياناتك بشكل جيد، فسيكون التخزين المؤقت سهلاً!

نظرًا لأن أحد مسارات العمل الرئيسية طبق نفس نموذج التحليل على حزمة SNP، فقد قررت استخدام binning لصالحي. عند إرسال البيانات عبر SNP، يتم إرفاق جميع المعلومات من المجموعة (bin) بالكائن الذي تم إرجاعه. أي أن الاستعلامات القديمة يمكنها (نظريًا) تسريع معالجة الاستعلامات الجديدة.

# Part of get_snp()
...
  # Test if our current snp data has the desired snp.
  already_have_snp <- desired_snp %in% prev_snp_results$snps_in_bin

  if(!already_have_snp){
    # Grab info on the bin of the desired snp
    snp_results <- get_snp_bin(desired_snp)

    # Download the snp's bin data
    snp_results$bin_data <- aws.s3::s3readRDS(object = snp_results$data_loc)
  } else {
    # The previous snp data contained the right bin so just use it
    snp_results <- prev_snp_results
  }
...

عند إنشاء الحزمة، قمت بإجراء العديد من المعايير لمقارنة السرعة عند استخدام طرق مختلفة. وأنصح بعدم إهمال ذلك، لأن النتائج في بعض الأحيان تكون غير متوقعة. على سبيل المثال، dplyr::filter كان أسرع بكثير من التقاط الصفوف باستخدام التصفية القائمة على الفهرسة، وكان استرداد عمود واحد من إطار بيانات تمت تصفيته أسرع بكثير من استخدام بناء جملة الفهرسة.

يرجى ملاحظة أن الكائن prev_snp_results يحتوي على المفتاح snps_in_bin. هذه عبارة عن مصفوفة من جميع أشكال SNP الفريدة في مجموعة (سلة)، مما يسمح لك بالتحقق بسرعة مما إذا كان لديك بالفعل بيانات من استعلام سابق. كما أنه يجعل من السهل تكرار جميع SNPs في مجموعة (bin) باستخدام هذا الرمز:

# Get bin-mates
snps_in_bin <- my_snp_results$snps_in_bin

for(current_snp in snps_in_bin){
  my_snp_results <- get_snp(current_snp, my_snp_results)
  # Do something with results 
}

النتائج

الآن يمكننا (وبدأنا بجدية) تشغيل النماذج والسيناريوهات التي لم يكن من الممكن لنا الوصول إليها في السابق. أفضل شيء هو أن زملائي في المختبر لا يضطرون إلى التفكير في أي مضاعفات. لديهم فقط وظيفة تعمل.

وعلى الرغم من أن الحزمة توفر لهم التفاصيل، إلا أنني حاولت أن أجعل تنسيق البيانات بسيطًا بما يكفي حتى يتمكنوا من اكتشافه إذا اختفيت فجأة غدًا...

وزادت السرعة بشكل ملحوظ. نقوم عادة بمسح أجزاء الجينوم ذات الأهمية الوظيفية. في السابق، لم نتمكن من القيام بذلك (اتضح أنه مكلف للغاية)، ولكن الآن، بفضل بنية المجموعة (bin) والتخزين المؤقت، يستغرق طلب SNP واحد في المتوسط ​​أقل من 0,1 ثانية، واستخدام البيانات كذلك منخفض أن تكاليف S3 هي الفول السوداني.

اختتام

هذه المقالة ليست دليلا على الإطلاق. وتبين أن الحل كان فرديًا، ومن المؤكد تقريبًا أنه ليس الحل الأمثل. بل هي بالأحرى رواية رحلة. أريد أن يفهم الآخرون أن مثل هذه القرارات لا تبدو مكتملة في الرأس، بل هي نتيجة التجربة والخطأ. أيضًا، إذا كنت تبحث عن عالم بيانات، فضع في اعتبارك أن استخدام هذه الأدوات بشكل فعال يتطلب خبرة، والخبرة تكلف أموالاً. أنا سعيد لأنه كان لدي القدرة على الدفع، لكن العديد من الأشخاص الآخرين الذين يمكنهم القيام بنفس العمل بشكل أفضل مني لن تتاح لهم الفرصة أبدًا بسبب نقص المال حتى للمحاولة.

أدوات البيانات الضخمة متعددة الاستخدامات. إذا كان لديك الوقت، فمن المؤكد تقريبًا أنه يمكنك كتابة حل أسرع باستخدام تقنيات تنظيف البيانات وتخزينها واستخراجها الذكية. في النهاية يتعلق الأمر بتحليل التكلفة والعائد.

ما تعلمته:

  • ولا توجد طريقة رخيصة لتحليل 25 تيرابايت في المرة الواحدة؛
  • كن حذرًا بشأن حجم ملفات الباركيه وتنظيمها؛
  • يجب أن تكون الأقسام في Spark متوازنة؛
  • بشكل عام، لا تحاول أبدًا إنشاء 2,5 مليون قسم؛
  • لا يزال الفرز صعبًا، كما هو الحال مع إعداد Spark؛
  • في بعض الأحيان تتطلب البيانات الخاصة حلولاً خاصة؛
  • يعد تجميع الشرارة سريعًا، لكن التقسيم لا يزال مكلفًا؛
  • لا تنام عندما يعلمونك الأساسيات، ربما قام شخص ما بحل مشكلتك بالفعل في الثمانينات؛
  • gnu parallel - هذا شيء سحري، يجب على الجميع استخدامه؛
  • يحب Spark البيانات غير المضغوطة ولا يحب دمج الأقسام؛
  • لدى Spark الكثير من النفقات العامة عند حل المشكلات البسيطة؛
  • تعتبر المصفوفات الترابطية الخاصة بـ AWK فعالة للغاية؛
  • بامكانك الاتصال stdin и stdout من برنامج نصي R، وبالتالي استخدامه في المسار؛
  • بفضل تنفيذ المسار الذكي، يستطيع S3 معالجة العديد من الملفات؛
  • السبب الرئيسي لإضاعة الوقت هو تحسين طريقة التخزين الخاصة بك قبل الأوان؛
  • لا تحاول تحسين المهام يدويًا، دع الكمبيوتر يقوم بذلك؛
  • يجب أن تكون واجهة برمجة التطبيقات (API) بسيطة من أجل سهولة ومرونة الاستخدام؛
  • إذا تم إعداد بياناتك بشكل جيد، فسيكون التخزين المؤقت سهلاً!

المصدر: www.habr.com

إضافة تعليق