«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Пропоную ознайомитися з розшифровкою лекції "Hadoop. ZooKeeper" із серії "Методи розподіленої обробки великих обсягів даних у Hadoop"

Що таке ZooKeeper, його місце у екосистемі Hadoop. Неправда про розподілені обчислення. Схема стандартної розподіленої системи. Складність координації розподілених систем. Типові проблеми координації. Принципи, закладені у дизайн ZooKeeper. Модель даних ZooKeeper. Прапори znode. Сесії. Клієнтський API. Примітиви (configuration, group membership, simple locks, leader election, locking без herd effect). Архітектура ZooKeeper. ZooKeeper DB. ZAB. Оброблювач запитів.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Сьогодні поговоримо про ZooKeeper. Ця штука дуже корисна. У нього, як і у будь-якого продукту Apache Hadoop, є логотип. На ньому зображено людину.

До цього ми говорили в основному, як там можна обробляти дані, як їх зберігати, тобто вже ними якось скористатися і якось з ними працювати. А сьогодні хотілося б трохи поговорити про побудову розподілених додатків. І ZooKeeper – це одна з тих речей, яка дозволяє цю справу спростити. Це якийсь сервіс, який призначений для певної координації взаємодії процесів у розподілених системах, у розподілених додатках.

Потреба таких додатків стає дедалі більше з кожним днем, це те, про що наш курс. З одного боку, MapReduce і цей готовий фреймворк дозволяє нівелювати цю складність і звільнити програміста від написання таких примітивів, як взаємодія, координація процесів. Але з іншого боку, ніхто не гарантує, що цього все одно не доведеться робити. Не завжди MapReduce або інші готові фреймворки повністю замінюють якісь випадки, які не можна реалізувати на цьому. У тому числі і сам MapReduce і купа інших апачових проектів, вони, по суті, теж є розподіленими додатками. І щоб писати було простіше, написали ZooKeeper.

Як усі програми, пов'язані з Hadoop, він був розроблений у компанії Yahoo! Зараз це також офіційний додаток Apache. Воно не настільки активно розвивається як HBase. Якщо ви зайдете в JIRA HBase, то там щодня з'являється купа тяган на баги, купа пропозицій щось оптимізувати, тобто постійно йде життя в проекті. А ZooKeeper, з одного боку, є відносно простим продуктом, а з іншого боку, це забезпечує його надійність. І його досить легко використовувати, тому він став стандартом у додатках у рамках екосистеми Hadoop. Тому я подумав, що було б корисно розглянути його, щоб зрозуміти, як він працює і як ним користуватися.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Це картинка з якоїсь лекції, яка у нас була. Можна сміливо сказати, що він ортогональний усьому з того що ми раніше розглядали. І все, що тут зазначено, тією чи іншою мірою працює з ZooKeeper, тобто це сервіс, який використовує всі ці продукти. Ні HDFS, ні MapReduce не пишуть своїх схожих сервісів, які б для них спеціально працювали. Відповідно, використовується ZooKeeper. І це спрощує розробку та якісь речі, пов'язані з помилками.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Звідки це все з'являється? Здавалося б, запустили дві програми паралельно на різних комп'ютерах, з'єднали їх шнурком або в сітку, і все працює. Але проблема в тому, що Мережа ненадійна, а якби ви зніффили трафік або подивилися, що там відбувається на низькому рівні, як взаємодіють клієнти в Мережі, то можна часто побачити, що якісь пакети губляться, пересилаються. Недарма придумали протоколи TCP, які дозволяють встановити сесію, гарантувати доставку повідомлень. Але у будь-якому разі не завжди навіть TCP може рятувати. У всього є таймаут. Мережа може просто на якийсь час відвалюватись. Вона може просто блимати. І це все призводить до того, що не можна покладатися на те, що мережа надійна. Це основна відмінність від написання паралельних додатків, які працюють на одному комп'ютері або на одному суперкомп'ютері, де Мережі немає, де є більш надійна шина обміну даними в пам'яті. І це принципова відмінність.

Крім усього іншого, використовуючи Мережу, є завжди якась латина. У диска вона теж є, але в Мережі вона більша. Latency - це якийсь час затримки, який може бути як невеликим, так і досить суттєвим.

Топологія Мережі змінюється. Що таке топологія - це розміщення нашого мережного обладнання. Є дата-центри, стійки, які там стоять, є свічки. Все це може перепідключатися, переїжджати і т. д. Це теж треба враховувати. Змінюються айпішники, змінюються роутинг, яким у нас ходить трафік. Це також треба враховувати.

Мережа також може змінюватися щодо обладнання. З практики можу сказати, що наші мережеві інженери дуже люблять періодично оновлювати щось на свічках. Несподівано вийшла нова прошивка, і їх не дуже цікавить якийсь кластер Hadoop. Вони мають свою роботу. Для них головне, щоб мережа працювала. Відповідно, вони хочуть там щось перезалити, зробити перепрошивку на своєму залозі, при цьому залозки також періодично змінюються. Все це якимось чином треба враховувати. Все це впливає на наш розподілений додаток.

Зазвичай люди, які починають працювати з великим обсягом даних, чомусь вважають, що Мережа безмежна. Якщо там лежить файл у кілька терабайт, то його можна взяти до себе на сервер або на комп'ютер, відкрити за допомогою як і дивитися. Ще одна з помилок - це в Vim дивитися логи. Ніколи такого не робіть, бо це погано. Тому що Vim все намагається буферизувати, все вантажити на згадку, особливо, коли ми починаємо по цьому логу переміщатися і щось шукати. Це такі речі, про які забувають, але їх варто враховувати.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Простіше написати одну програму, яка працює на одному комп'ютері з одним процесором.

Коли у нас наша система зростає, ми хочемо все це паралелити, причому паралелити не лише на комп'ютері, а й на кластері. Виникає питання – як це діло координувати? Наші програми можуть навіть не взаємодіяти між собою, але ми паралельно на кількох серверах запустили кілька процесів. І як моніторити, що все йде добре? Вони, наприклад, щось відправляють по Мережі. Вони мають кудись писати про свій стан, наприклад, у якусь базу даних або в лог, потім цей лог агрегувати і потім десь його аналізувати. Плюс треба враховувати, що процес працював-працював, раптово в ньому з'явилася якась помилка чи він упав, то як швидко ми про це дізнаємось?

Зрозуміло, що це швидко можна замоніторити. Це теж добре, але моніторинг – це обмежена річ, яка дозволяє моніторити якісь речі на найвищому рівні.

Коли ми хочемо, щоб наші процеси почали взаємодіяти між собою, наприклад, надсилати один одному якісь дані, то тут також постає питання – як це відбуватиметься? Чи не буде якогось race condition, чи не будуть вони перезаписувати один одного, чи доходять дані правильно, чи не втрачається щось дорогою? Потрібно розробляти якийсь протокол тощо.

Координація всіх цих процесів – річ нетривіальна. І змушує розробника опускатися на рівень ще нижче, і писати системи або з нуля або не зовсім з нуля, але це не так просто.

Якщо ви вигадали криптографічний алгоритм або навіть його реалізували, то візьміть і викиньте його відразу, тому що, швидше за все, він у вас не працюватиме. У ньому, найімовірніше, знайдеться купа помилок, які ви забули передбачити. У жодному разі не використовуйте його для чогось серйозного, тому що, швидше за все, він буде нестійким. Тому що всі алгоритми є, вони дуже довго перевіряються часом. У ньому шукаються помилки спільнотою. Це окрема тема. І те саме і тут. Якщо є можливість не реалізовувати якусь синхронізацію процесів самостійно, краще цього не робити, тому що це досить складно і заводить вас на хибний шлях постійних пошуків помилок.

Ми сьогодні говоримо про ZooKeeper. З одного боку – це фреймворк, з іншого боку, це сервіс, який дозволяє полегшити розробнику життя та максимально спростити реалізацію логіки та координацію наших процесів.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Згадуємо, як може бути стандартна розподілена система. Це те, що ми говорили – це HDFS, HBase. Є Майстер-процес, який керує воркерами, slave-процесами. Він займається координацією та розподілом завдань, перезапуском воркерів, запуском нових, розподілом навантаження.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Більш просунута річ – це Coordination Service, тобто саме завдання координації винести в окремий процес плюс паралельно запускати якийсь backup або stanby Майстра, тому що Майстер може впасти. А якщо Майстер впаде, то наша система не працюватиме. Ми запускаємо backup. Якісь стани, які є у Майстра, потрібно реплікувати на backup. Це також можна доручити Coordination Service. Але на цій схемі координацією воркерів займається сам Майстер, тут сервіс займається координацією дій щодо реплікації даних.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Найбільш просунутий варіант - це коли всією координацією займається наш сервіс, як завжди і робиться. Він бере на себе відповідальність за те, що все працює. А якщо щось не працює, ми дізнаємося про це і намагаємося цю ситуацію обійти. У будь-якому випадку у нас залишається Майстер, який якось взаємодіє ще зі slaves, через якийсь сервіс може надсилати дані, інформацію, повідомлення і т.д.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Є ще більш просунута схема, коли у нас немає Майстра, всі ноди є майстер-slaves, різнозначними у своїй поведінці. Але все одно їм потрібно між собою взаємодіяти, тому все одно залишається сервіс для координації цих дій. Напевно під цю схему підходить Cassandra, яка за таким принципом працює.

Важко сказати, яка із цих схем краще працює. У кожної є свої мінуси та плюси.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

І не треба боятися якихось речей з Майстром, тому що, як показує практика, не так він схильний до того, щоб постійно подати. Тут головне – вибирати правильне рішення для розміщення цього сервісу на окремій потужній ноді, щоб у неї було достатньо ресурсів, щоб по можливості у користувачів не було туди доступу, щоб вони випадково не вбили цей процес. Але при цьому в такій схемі набагато простіше з Майстер-процесу управляти воркерами, тобто ця схема простіше з погляду реалізації.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

А ця схема (вище), напевно, складніша, але надійніша.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Основна проблема – це partial failures. Наприклад, коли ми відправляємо повідомлення по Мережі, відбувається якась аварія, і той, хто відправив повідомлення, не дізнається чи дійшло його повідомлення і що сталося на стороні приймача, чи не дізнається, чи правильно повідомлення обробилося, тобто він не отримає ніякого підтвердження.

Відповідно, ми маємо цю ситуацію опрацювати. І найпростіше - це перенаправити це повідомлення і чекати, поки нам прийде відповідь. При цьому ніде не враховується чи змінився стан приймача. Можливо, ми надішлемо повідомлення і двічі додали ті самі дані.

ZooKeeper пропонує способи боротьби з такими відмовими, що також спрощує життя.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Як раніше говорилося, це схоже на написання багатопотокових програм, але основна відмінність у тому, що в розподілених додатках, які ми будуємо на різних машинах, способом взаємодії є лише Мережа. По суті це архітектура shared-nothing. Кожен процес чи сервіс, що працює на одній машині, має свою пам'ять, свій диск, свій процесор, яким він ні з ким не ділиться.

Якщо ми пишемо багатопотокову програму на одному комп'ютері, то ми можемо використовувати shared memory для обміну даними. У нас там є context switch, процеси можуть перемикатися. Це впливає на продуктивність. З одного боку, у програмі на кластері такого немає, але є проблеми із Мережею.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Відповідно основні проблеми, які виникають при написанні розподілених систем, це конфігурація. Ми пишемо якусь програму. Якщо воно просте, то ми хардкодимо всякі циферки в коді, але це незручно, тому що якщо ми вирішуємо, що замість таймууту в півсекунди ми хочемо таймувати в одну секунду, то треба перекомпілювати додаток, все заново розкотити. Одна справа, коли це на одній машині, коли можна просто перезапустити, а коли у нас багато машин, треба все постійно копіювати. Треба намагатися, щоб програма була конфігурована.

Тут йдеться про статичну конфігурацію для системних процесів. Це не зовсім, можливо, з погляду операційної системи, це може бути статична конфігурація для наших процесів, тобто це конфігурація, яку не можна просто взяти та оновити.

Також є динамічна конфігурація. Це параметри, які ми хочемо змінювати на льоту, щоб вони там підхоплювалися.

У чому тут проблема? Поновили ми конфіг, викотили і що? Проблема може бути в тому, що з одного боку ми викотили конфіг, а про обновку забули, там залишився конфіг. По-друге, поки ми викочували, десь конфігурація оновилася, а десь ні. І якісь процеси нашого додатку, які працюють на одній машині, перезапустилися з новим конфігом, а десь зі старим. Це може призвести до того, що наш розподілений додаток буде не в консистентному стані з погляду конфігурації. Ця проблема є загальною. Для динамічної конфігурації вона більш актуальна, тому що мається на увазі, що її можна змінювати на льоту.

Ще одна проблема – це group membership. Завжди ми маємо якийсь набір воркерів, ми завжди хочемо знати – хто з них живий, хто з них мертвий. Якщо є Майстер, то він повинен розуміти, на які воркери можна перенаправляти клієнтів, щоб вони запускали обчислення або працювали з даними, а на які не можна. Постійно постає проблема, що треба знати, хто в нашому кластері працює.

Ще одна типова проблема – це leader election, коли ми хочемо знати, хто є відповідальним. Один із прикладів – це реплікація, коли ми маємо якийсь процес, який приймає операції на запис і потім реплікує їх щодо інших процесів. Він буде лідером, всі інші йому підкорятимуться, будуть за ним іти. Потрібно вибирати такий процес, щоб він був однозначним для всіх, щоб не вийшло так, що вибралося двоє лідерів.

Ще є mutually exclusive access. Тут проблема складніша. Є таке поняття як м'ютекс, коли ви пишите багатопотокові програми і хочете, щоб доступ до якогось ресурсу, наприклад, до комірки пам'яті, був обмежений і здійснювався лише одним потоком. Тут ресурсом може бути щось абстрактніше. І різні програми з різних нод нашої Мережі повинні отримувати лише ексклюзивний доступ для даного ресурсу, а не так, щоб усі могли його змінювати чи писати щось туди. Це так звані locks.

Всі ці проблеми ZooKeeper дозволяє в тій чи іншій мірі вирішити. І я покажу на прикладах, як він дає змогу це зробити.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Немає блокуючих примітивів. Коли ми починаємо щось використовувати, цей примітив не чекатиме настання якоїсь події. Швидше за все, ця штука буде працювати асинхронно, тим самим дозволяючи процесам не зависати в тому, що вони чогось чекають. Це дуже корисна річ.

Всі запити клієнтів обробляються в порядку загальної черги.

І клієнти мають можливість отримувати нотифікацію про зміни якогось стану, зміни даних, перш ніж клієнт побачить самі змінені дані.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

ZooKeeper може працювати у двох режимах. Перший – це standalone, на одній ноді. Це зручно для тестування. Також він може працювати в режимі кластера, на будь-якій кількості серверів. Якщо у нас кластер – 100 машин, то необов'язково, щоби він працював на 100 машинах. Достатньо виділити кілька машинок, де можна запустити ZooKeeper. І сповідує принцип високої можливості. На кожному запущеному інстансі ZooKeeper зберігає всю копію даних. Потім розповім, як він це робить. Він не шардить дані, не партиціонує їх. З одного боку – це мінус, що ми не можемо зберігати багато, з іншого – це робити і не потрібно. Він не для цього призначений, це не база даних.

Дані можна кешувати на стороні клієнта. Це стандартний принцип, щоб не смикати сервіс, не навантажувати його однаковими запитами. Розумний клієнт зазвичай знає про це та кешує у себе.

Наприклад, щось у нас змінилося. Є якась програма. Вибрали нового лідера, який відповідає, наприклад, за обробку операцій запису. І ми хочемо реплікувати дані. Одне із рішень – поставити циклік. І постійно опитуємо наш сервіс – чи щось не змінилося? Другий варіант більш оптимальний. Це watch mechanism, який дозволяє повідомляти клієнтів про те, що щось змінилося. Це менш витратний спосіб ресурсів і зручніший для клієнтів.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Client – ​​це користувач, який користується ZooKeeper.

Server – це процес ZooKeeper.

Znode – це ключова штука у ZooKeeper. Усі znode зберігаються у пам'яті у ZooKeeper і організовуються як ієрархічної схеми, як дерева.

Є два типи операцій. Перший – це update/write, коли якась операція змінює стан нашого дерева. Дерево спільне.

І можливо, що клієнт не виконує один запит і відрубується, а може встановити сесію, за допомогою якої взаємодіє із ZooKeeper.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Модель даних ZooKeeper нагадує файлову систему. Є стандартний корінь і далі ми пішли начебто каталогами, які йдуть від кореня. І далі каталог першого рівня, другого рівня. Це все є znodes.

Кожна znode може зберігати в собі якісь дані, зазвичай, не дуже великого обсягу, наприклад, 10 кілобайт. І у кожної znode може бути якась кількість дітей.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Znode буває кількох типів. Їх можна створювати. І під час створення znode ми вказуємо тип, якого вона має ставитися.

Є два типи. Перший – це ephemeral flag. Znode живе у рамках якоїсь сесії. Наприклад, клієнт встановив сесію. І доки ця сесія жива, вона існуватиме. Це потрібно для того, щоб не плодити чогось зайвого. Це також підходить для таких моментів, коли нам важливо в рамках сесії зберігати примітиви даних.

Другий тип – це sequential flag. Він збільшує лічильник на шляху до znode. Наприклад, у нас був каталог із додатком 1_5. І ми створили першу ноду, вона отримала p_1, друга – p_2. І коли ми щоразу викликаємо цей метод, ми передаємо повний шлях, вказуючи лише частину пусти, а цей номер автоматично инкрементится, оскільки ми вказуємо тип ноди – sequential.

Регулярний знод. Вона житиме завжди і матиме те ім'я, яке ми їй скажемо.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Ще одна корисна річ – це watch flag. Якщо ми його встановлюємо, клієнт може підписуватися на якісь події для певної ноди. Покажу на прикладі, як це робиться. Сам ZooKeeper повідомляє клієнта про те, що дані на ноді змінилися. При цьому, повідомлення не гарантують, що це якісь нові дані приїхали. Вони просто говорять про те, що щось змінилося, тому порівнювати дані потім доведеться однаково окремими викликами.

І, як я вже казав, порядок даних визначається кілобайтами. Там не потрібно зберігати великі текстові дані, тому що це не база даних, це сервер координації дій.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Трохи розповім про сесії. Якщо ми маємо кілька серверів, то ми можемо прозоро переходити від сервера до сервера, використовуючи ідентифікатор сесії. Це досить зручно.

Кожна сесія має якийсь тайм. Сесія визначається тим, чи передає щось клієнт під час цієї сесії серверу. Якщо він за час таймуту нічого не передав, сесія відвалюється або клієнт сам її може закрити.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Він не має так багато функцій, але за допомогою цього API можна робити різні штуки. Той виклик, який ми бачили, create створює znode та приймає три параметри. Це шлях до znode, причому його потрібно вказувати повним від кореня. А також це якісь дані, які ми хочемо передати туди. І тип прапора. І після створення він повертає шлях до znode.

Друге – можна видалити. Тут фішка в тому, що другим параметром, окрім шляху до znode, можна вказати версію. Відповідно буде видалена та znode, якщо її версія, яку ми передали, буде еквівалентна тій, яка насправді.

Якщо ми хочемо не перевіряти цю версію, то просто передаємо аргумент «-1».

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Третє – здійснює перевірку існування znode. Повертає true, якщо існує нода, інакше false.

І тут з'являється flag watch, який дає змогу встановити спостереження за цією нодою.

Можна встановити цей flag навіть на неіснуючу ноду і отримати повідомлення у тому випадку, коли вона з'явиться. Це теж корисно.

Ще парочка викликів – це getData. Зрозуміло, що ми з znode можемо отримати дані. Також можна використовувати flag watch. У разі він не встановиться, якщо ноди немає. Тому треба зрозуміти, що вона існує, а потім уже одержувати дані.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

А також є SetData. Тут ми передаємо версію. І якщо ми це передамо, то буде оновлено дані на znode певної версії.

Також можна вказати «-1», щоб виключити цю перевірку.

Ще один корисний метод – це getChildren. Також ми можемо отримати список всіх znode, які належать їй. Можемо за цим спостерігати, встановивши flag watch.

І метод синхронізувати дозволяє всі зміни відправити одразу, тим самим гарантуючи те, що вони збереглися та всі дані повністю змінилися.

Якщо проводити аналогії зі звичайним програмуванням, то, коли ви використовуєте такі методи, як write, які пишуть щось на диск, і після того як він повернув вам відповідь, немає гарантії, що у вас дані записалися на диск. І навіть коли операційна система впевнена, що все записалося, у самому диску є механізми, де процес проходить через рівні буферів, і лише після цього дані розміщуються на диску.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

В основному використовуються асинхронні дзвінки. Це дозволяє клієнту працювати паралельно із різними запитами. Можна використовувати синхронний підхід, але менш продуктивний.

Дві операції, про які ми говорили, це update/write, які змінюють дані. Це create, setData, sync, delete. І read - це exists, getData, getChildren.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Тепер кілька прикладів, як можна зробити примітиви роботи у розподіленій системі. Наприклад, пов'язаний із конфігурацією чогось. З'явився новий воркер. Ми додали машину, запустили процес. І є такі три питання. Як він запитує конфігурацію у ZooKeeper? І якщо ми хочемо змінити конфігурацію, то як її змінюємо? І після того, як ми її змінили, як ті воркери, які у нас були, одержують її?

У ZooKeeper це можна зробити досить просто. Наприклад, є наше дерево znode. Тут є нода для нашої програми, ми створюємо в ній додаткову ноду, де містяться дані конфігурації. Це може бути як окремі параметри, і немає. Оскільки розмір невеликий, зазвичай розмір конфігурації теж невеликий, тому тут цілком можна її зберігати.

Ви використовуєте метод getData для отримання конфігурації для воркера з ноди. Встановлюємо true. Якщо цієї ноди з якоїсь причини немає, нас про це поінформують, коли вона з'явиться, або коли вона зміниться. Якщо хочемо знати, що щось змінилося, то ставимо true. І у разі зміни даних у цій ноді, ми про це дізнаємося.

SetData. Задаємо дані, встановлюємо "-1", тобто не перевіряємо версію, вважаємо, що конфігурація у нас завжди одна, нам не потрібно зберігати багато конфігурацій. Якщо потрібно багато зберігати, потрібно буде додати ще один рівень. Тут вважаємо, що вона одна, тому оновлюємо лише останню, тож версію не перевіряємо. У цей момент усі клієнти, які перед цим підписалися, вони отримують повідомлення про те, що щось змінилося в цій ноді. І після того, як вони його отримали, вони також повинні запитати дані ще раз. Повідомлення полягає в тому, що вони отримують не самі дані, а лише повідомлення про зміни. Після цього вони мають попросити нові дані.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Другий варіант застосування примітиву – це членство в групі. У нас розподілена програма, є купа воркерів і ми хочемо розуміти, що всі вони на місці. Тому вони повинні зареєструвати себе, що вони працюють у нашому додатку. І також ми хочемо або з Майстер-процесу, або ще десь дізнатися про всіх активних воркерів, які у нас зараз є.

Як ми це робимо? Для програми створюємо ноду workers і додаємо туди рівень за допомогою методу create. У мене є помилка на слайді. Тут потрібно послідовний вказувати, тоді всі воркери створюватимуться по одному. І програма запитуючи всі дані про children цієї ноди, отримує всі активні воркери, які є.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Ось така страшна імплементація того, як це може бути зроблено Java-код. Почнемо з кінця, з методу main. Оце наш клас, створюємо його метод. Як перший аргумент використовуємо host, куди приєднуємося, тобто задаємо його аргументом. А як другий аргумент – ім'я групи.

Як відбувається з'єднання? Це найпростіший приклад API, який використовується. Тут усе відносно просто. Існує стандартний клас ZooKeeper. Ми передаємо йому hosts. І задаємо тайм, наприклад, в 5 секунд. І у нас є такий член, як connectedsignal. По суті, ми створюємо групу по шляху, що передається. Дані туди не пишемо, хоча щось можна було написати. І нода тут має тип persistent. По суті це звичайна регулярна нода, яка існуватиме весь час. Тут утворюється сесія. Це імплементація клієнта. Наш клієнт здійснюватиме періодично повідомлення про те, що сесія жива. А коли ми сесію завершуємо, ми викликаємо close і все, сесія відвалюється. Це на той випадок, якщо у нас щось відвалиться, щоб ZooKeeper дізнався про це і сесію відрубав.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Як залити якийсь ресурс? Тут усе дещо складніше. У нас є набір воркерів, є якийсь ресурс, який хочемо залочити. Для цього ми створюємо окрему ноду, наприклад під назвою lock1. Якщо ми змогли її створити, то ми отримали lock сюди. А якщо ми не змогли її створити, то воркер намагається отримати getData звідси, а тому що нода вже створена, то ми ставимо сюди watcher і в той момент, коли стан цієї ноди зміниться, ми про це дізнаємося. І ми можемо постаратися встигнути перетворити її. Якщо ми забрали цю ноду, забрали цей lock, то після того, як lock нам більше не потрібен, ми від нього відмовимося, тому що нода існує лише в рамках сесії. Відповідно вона зникне. І інший клієнт в рамках іншої сесії зможе забрати lock на цю ноду, точніше він отримає повідомлення про те, що щось змінилося, і він може спробувати встигнути це встигнути зробити.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Інший приклад, як можна обирати головного лідера. Тут трохи складніше, але теж відносно просто. Що тут у нас відбувається? Є основна нода, яка агрегує у собі всіх воркерів. Ми намагаємось отримати дані про лідера. Якщо це успішно сталося, тобто ми отримали якісь дані, то наш воркер починає слідувати за цим лідером. Він вважає, що лідер уже є.

Якщо лідер із якоїсь причини помер, наприклад, відвалився, то ми намагаємося створити нового лідера. І якщо у нас це вийшло, то наш воркер стає лідером. А якщо хтось у цей момент встиг створити нового лідера, то ми намагаємося зрозуміти, хто це і потім за ним слідуватиме.

Тут виникає так званий herd effect, тобто ефект стада, тому що коли лідер помирає, то той, хто перший встигне, той і стане лідером.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

При захопленні ресурсу можна спробувати використати дещо інший підхід, який ось у чому. Наприклад, ми хочемо отримати lock, але без hert effect. Полягатиме він у тому, що наша програма запитує списки всіх id нод для вже існуючої ноди з lock. І якщо перед цим ту ноду, lock для якої ми створили, вона буде мінімальною з того набору, який ми отримали, це означає, що ми захопили lock. Ми перевіряємо, що ми отримали lock. Як перевірка буде умова, що той id, який ми отримали при створенні нового lock, він є мінімальним. І якщо ми отримали, то працюємо далі.

Якщо існує якийсь id, який менший за наш lock, то ми ставимо на цю подію watcher і чекаємо на нотифікацію, поки щось не зміниться. Т. е. ми отримали цей lock. І поки той не відвалиться, ми не станемо мінімальним id і не отримаємо мінімальний lock, і тим самим зможемо залізти. А якщо ця умова не виконується, тоді ми одразу йдемо сюди і намагаємося ще раз отримати цей lock, бо за цей час могло щось змінитись.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

З чого складається ZooKeeper? Є 4 основні речі. Це обробка процесів – Request. А також ZooKeeper Atomic Broadcast. Є Commit Log, куди заносяться всі операції. І сама In-memory Replicated DB, тобто сама база даних, де зберігається все це дерево.

Всі операції на запис проходять через Request Processor. А операції на читання йдуть одразу до In-memory бази.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Сама база повністю реплікована. На всіх параметрах ZooKeeper зберігається повна копія даних.

Для того, щоб відновити бд після падіння, існує Commit log. Стандартна практика, коли дані перш ніж потрапляють у пам'ять, вони пишуться туди, щоб у разі падіння цей log можна було програти та відновити стан системи. І також використовуються періодичні снапшоти бази.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

ZooKeeper Atomic Broadcast – це така штука, яка використовується для підтримки реплікованих даних.

ZAB у собі вибирає лідера з погляду ноди ZooKeeper. Інші ноди стають її фоловерами, чекають від неї якихось дій. Якщо на них приходять записи, то всі їх перенаправляють лідеру. Той попередньо виконує операцію запису і потім розсилає повідомлення, що змінилося своїм фоловерам. Це, по суті, повинно виконуватися атомарно, тобто операція запису і broadcasting всієї цієї справи повинні виконуватися атомарно, тим самим гарантується консистентність даних.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop» Він обробляє лише write request. Основне його завдання полягає в тому, що він трансформує операцію в transactional update. Це спеціально сформований запит.

І тут слід зазначити, що гарантується ідемпотентність апдейтів для однієї і тієї ж операції. Що це таке? Ця річ, якщо яку виконати двічі, вона матиме той самий стан, т. е. сам запит від цього не зміниться. І зробити це потрібно для того, щоб у разі падіння можна було знову запустити операцію, тим самим накотити зміни, які відвалилися на даний момент. При цьому стан системи стане таким же, тобто не повинно бути такого, що низка тих самих, наприклад, процесів оновлень, призводила до різних підсумкових станів системи.

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

«Hadoop. ZooKeeper» із серії Техностріму Mail.Ru Group «Методи розподіленої обробки великих обсягів даних у Hadoop»

Джерело: habr.com

Додати коментар або відгук