Хабре, привіт! Вчора на
Невелика вступна - як ми використовуємо Spark. У нас є тримісячна програма
Особливість нашого використання полягає в тому, що кількість людей, що одночасно працюють на Spark, може бути однаковою всій групі. Наприклад, на семінарі, коли всі одночасно щось пробують та повторюють за нашим викладачем. А це небагато — під 40 осіб часом. Напевно, не так багато компаній у світі, які стикаються із таким сценарієм використання.
Далі я розповім, як і чому ми вибирали ті чи інші параметри конфігу.
Почнемо із самого початку. У Spark є 3 варіанти працювати на кластері: standalone, з використанням Mesos та з використанням YARN. Ми вирішили вибрати третій варіант, тому що для нас він був логічним. Ми вже маємо hadoop-кластер. Наші учасники вже добре знайомі з його архітектурою. Давайте юзати YARN.
spark.master=yarn
Далі цікавіше. У кожного з цих трьох варіантів розгортання є два варіанти деплою: client і cluster. Виходячи з
spark.deploy-mode=client
Загалом із цього моменту Spark вже якось працюватиме на YARN, але нам цього не було достатньо. Оскільки ми маємо програму про великі дані, то часом учасникам не вистачало того, що виходило в рамках рівномірної нарізки ресурсів. І тут ми знайшли цікаву річ — динамічну алокацію ресурсів. Якщо коротко, то суть в наступному: якщо у вас важке завдання і кластер вільний (наприклад, з ранку), то за допомогою цієї опції Spark може видати додаткові ресурси. Необхідність вважається там за хитрою формулою. Вдаватися до подробиць не будемо — вона непогано працює.
spark.dynamicAllocation.enabled=true
Ми поставили цей параметр, і при запуску Spark лаявся і не запустився. Правильно, бо треба було читати
spark.shuffle.service.enabled=true
Навіщо він потрібен? Коли наш джоб більше не потребує такої кількості ресурсів, то Spark має повернути їх до загального пулу. Найбільш трудомістка стадія майже в будь-якій MapReduce задачі - це стадія Shuffle. Цей параметр дозволяє зберігати дані, які утворюються на цій стадії і відповідно звільняти executors. А executor це процес, який на воркері все обраховує. Він має якусь кількість процесорних ядер і якусь кількість пам'яті.
Додали цей параметр. Все начебто заробило б. Стало помітно, що учасникам реально почало видаватися більше ресурсів, коли їм було потрібно. Але виникла інша проблема — колись інші учасники прокидалися і теж хотіли використовувати Spark, а там все зайнято, і вони були незадоволені. Їх можна збагнути. Почали дивитись у документацію. Там виявилося, що є ще певна кількість параметрів, за допомогою яких можна вплинути на процес. Наприклад, якщо executor перебуває в режимі очікування, через який час у нього можна забрати ресурси?
spark.dynamicAllocation.executorIdleTimeout=120s
У нашому випадку — якщо ваші executors нічого не роблять протягом двох хвилин, то будьте ласкаві, поверніть їх у загальний пул. Але й цього параметра не завжди вистачало. Було видно, що людина вже давно нічого не робить, а ресурси не звільняються. Виявилося, що є ще спеціальний параметр — після якогось часу відбирати executors, які містять закешовані дані. По дефолту цей параметр стояв – infinity! Ми його виправили.
spark.dynamicAllocation.cachedExecutorIdleTimeout=600s
Тобто якщо протягом 5 хвилин ваші executors нічого не роблять, віддайте їх у загальний пул. У такому режимі швидкість звільнення та видачі ресурсів для великої кількості користувачів стала гідною. Кількість незадоволення скоротилася. Але ми вирішили піти далі і обмежити максимальну кількість executors на один application - по суті на одного учасника програми.
spark.dynamicAllocation.maxExecutors=19
Тепер, звичайно, з'явилися невдоволені з іншого боку - "кластер простоює, а в мене всього лише 19 executors", але що вдієш - потрібен якийсь правильний баланс. Усіх зробити щасливими не вдасться.
І ще одна невелика історія, пов'язана із специфікою нашого кейсу. Якось на практичне заняття запізнилися кілька людей, і вони Spark чомусь не стартував. Ми подивилися на кількість вільних ресурсів — начебто є. Spark має стартувати. Добре, що на той момент документація вже десь записалася на підкірку, і ми згадали, що при запуску Spark шукає собі порт, на якому стартуватиме. Якщо перший порт з діапазону зайнятий, він переходить до наступного по порядку. Якщо вона вільна, то захоплює. І є параметр, який свідчить про максимальну кількість спроб цього. За замовчуванням це 16. Число менше, ніж людей у нашій групі на занятті. Відповідно, після 16 спроб Spark кидав цю справу та казав, що не можу стартанути. Ми виправили цей параметр.
spark.port.maxRetries=50
Далі розповім про деякі налаштування, які вже не сильно пов'язані зі специфікою нашого кейсу.
Для швидшого старту Spark є рекомендація папку jars, що лежить у домашній директорії SPARK_HOME, заархівувати та покласти на HDFS. Тоді він не витрачатиме часу на завантаження цих джарників за воркерами.
spark.yarn.archive=hdfs:///tmp/spark-archive.zip
Також для більш швидкої роботи рекомендується як серіалайзер використовувати kryo. Він більш оптимізований, ніж той, що за умовчанням.
spark.serializer=org.apache.spark.serializer.KryoSerializer
І є ще давня проблема Spark, що він часто валиться по пам'яті. Часто це відбувається тоді, коли воркери все порахували і відправляють результат на драйвер. Ми зробили собі цей параметр більше. За замовчуванням він 1Гб, ми зробили - 3.
spark.driver.maxResultSize=3072
І останнє, як десерт. Як оновити Spark до версії 2.1 на HortonWorks дистрибутиві HDP 2.5.3.0. Ця версія HDP містить у собі встановлену версію 2.0, але ми одного разу для себе вирішили, що Spark досить активно розвивається, і кожна нова версія фіксує якісь баги плюс дає додаткові можливості, в тому числі і для python API, тому вирішили що потрібно робити апдейт.
Завантажили версію з офіційного сайту під Hadoop 2.7. Розархівували, закинули до папки з HDP. Поставили симлінки як слід. Запускаємо – не стартує. Пише дуже незрозумілу помилку.
java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
Гуляючи, з'ясували, що Spark вирішив не чекати поки Hadoop розродиться, і вирішили використовувати нову версію jersey. Вони самі там один з одним лаються на цю тему у JIRA. Рішенням було скачати
Цю помилку ми обійшли, але виникла нова і досить-таки обтічна.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master
При цьому пробуємо запускати версію 2.0 - все прибл. Спробуй здогадайся, в чому річ. Ми залізли у логі цього application та побачили щось таке:
/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar
Загалом, через якісь причини hdp.version не резолвілася. Погугливши, знайшли рішення. Потрібно в Ambari зайти в налаштування YARN і додати там параметр у custom yarn-site:
hdp.version=2.5.3.0-37
Ця магія допомогла, і Spark злетів. Протестили декілька наших jupyter-ноутбуків. Все працює. До першого заняття з Spark у суботу (вже завтра) ми готові!
UPD. На занятті з'ясувалась ще одна проблема. Якогось моменту YARN перестав видавати контейнери для Spark. У YARN потрібно було виправити параметр, який по дефолту стояв 0.2:
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
Тобто лише 20% ресурсів брали участь у роздачі ресурсів. Змінивши параметри, перезавантажили YARN. Проблема була вирішена, і решта учасників також змогли запустити spark context.
Джерело: habr.com