Habr, witaj! Wczoraj dalej
Małe wprowadzenie do tego, jak używamy Sparka. Mamy trzymiesięczny program
Osobliwością naszego zastosowania jest to, że liczba osób pracujących jednocześnie nad Sparkiem może być równa całej grupie. Na przykład na seminarium, kiedy wszyscy próbują czegoś w tym samym czasie i powtarzają za naszym nauczycielem. A to nie jest dużo – czasem nawet 40 osób. Prawdopodobnie niewiele jest firm na świecie, które stoją przed takim przypadkiem użycia.
Następnie opowiem Ci, jak i dlaczego wybraliśmy określone parametry konfiguracyjne.
Zacznijmy od samego początku. Spark ma 3 opcje działania w klastrze: autonomiczny, przy użyciu Mesos i przy użyciu YARN. Zdecydowaliśmy się wybrać trzecią opcję, ponieważ miała ona dla nas sens. Mamy już klaster hadoop. Nasi uczestnicy są już dobrze zaznajomieni z jego architekturą. Użyjmy PRZĘDZY.
spark.master=yarn
Dalej ciekawiej. Każda z tych 3 opcji wdrożenia ma 2 opcje wdrożenia: klient i klaster. Na podstawie
spark.deploy-mode=client
Ogólnie rzecz biorąc, od teraz Spark będzie jakoś działał na YARN, ale to nam nie wystarczyło. Ponieważ mamy program o big data, czasami uczestnikom nie wystarczało to, co udało się uzyskać w ramach równomiernego podziału zasobów. A potem znaleźliśmy interesującą rzecz - dynamiczną alokację zasobów. W skrócie chodzi o to, że jeśli masz trudne zadanie i klaster jest wolny (na przykład rano), to korzystając z tej opcji Spark może zapewnić Ci dodatkowe zasoby. Konieczność oblicza się tam według przebiegłego wzoru. Nie będziemy wdawać się w szczegóły – to działa dobrze.
spark.dynamicAllocation.enabled=true
Ustawiamy ten parametr, a po uruchomieniu Spark uległ awarii i nie uruchomił się. Zgadza się, bo musiałem to przeczytać
spark.shuffle.service.enabled=true
Dlaczego jest to potrzebne? Gdy nasze zadanie nie będzie już wymagało tak wielu zasobów, Spark powinien zwrócić je do wspólnej puli. Najbardziej czasochłonnym etapem w prawie każdym zadaniu MapReduce jest etap losowania. Parametr ten pozwala na zapisanie danych wygenerowanych na tym etapie i odpowiednie zwolnienie executorów. A wykonawca to proces, który oblicza wszystko w procesie roboczym. Ma określoną liczbę rdzeni procesora i określoną ilość pamięci.
Ten parametr został dodany. Wszystko wydawało się działać. Stało się zauważalne, że uczestnicy faktycznie otrzymali więcej zasobów, kiedy ich potrzebowali. Ale pojawił się inny problem - w pewnym momencie obudzili się inni uczestnicy i też chcieli skorzystać ze Sparka, ale tam było wszystko zajęte i byli niezadowoleni. Można je zrozumieć. Zaczęliśmy przeglądać dokumentację. Okazało się, że istnieje szereg innych parametrów, które można wykorzystać, aby wpłynąć na proces. Na przykład, jeśli executor jest w trybie gotowości, po jakim czasie można pobrać z niego zasoby?
spark.dynamicAllocation.executorIdleTimeout=120s
W naszym przypadku, jeśli twoi executorzy nie zrobią nic przez dwie minuty, prosimy o zwrócenie ich do wspólnej puli. Ale ten parametr nie zawsze był wystarczający. Było jasne, że dana osoba przez długi czas nic nie robiła, a zasoby nie były uwalniane. Okazało się, że istnieje również specjalny parametr - po jakim czasie wybrać executory zawierające dane z pamięci podręcznej. Domyślnie parametrem tym była nieskończoność! Poprawiliśmy to.
spark.dynamicAllocation.cachedExecutorIdleTimeout=600s
Oznacza to, że jeśli twoi executorzy nie zrobią nic przez 5 minut, przekaż ich do wspólnej puli. W tym trybie prędkość uwalniania i wydawania zasobów dla dużej liczby użytkowników stała się przyzwoita. Poziom niezadowolenia spadł. Postanowiliśmy jednak pójść dalej i ograniczyć maksymalną liczbę wykonawców na aplikację - zasadniczo na uczestnika programu.
spark.dynamicAllocation.maxExecutors=19
Teraz oczywiście po drugiej stronie są niezadowoleni ludzie – „klaster jest bezczynny, a ja mam tylko 19 wykonawców”, ale co zrobić? Potrzebujemy jakiegoś odpowiedniego balansu. Nie możesz uszczęśliwić wszystkich.
I jeszcze jedna krótka historia związana ze specyfiką naszej sprawy. Jakimś cudem kilka osób spóźniło się na lekcję praktyczną i z jakiegoś powodu Spark nie zaczął dla nich. Przyjrzeliśmy się ilości wolnych zasobów – wydaje się, że tam są. Powinna rozpocząć się iskra. Na szczęście do tego czasu dokumentacja była już gdzieś dodana do podkory i przypomnieliśmy sobie, że po uruchomieniu Spark szuka portu, na którym ma zacząć. Jeśli pierwszy port w zakresie jest zajęty, następuje przejście do następnego w kolejności. Jeśli jest darmowy, przechwytuje. Istnieje parametr wskazujący maksymalną liczbę prób tego. Wartość domyślna to 16. Liczba jest mniejsza niż liczba osób w naszej grupie w klasie. W związku z tym po 16 próbach Spark poddał się i powiedział, że nie mogę zacząć. Poprawiliśmy ten parametr.
spark.port.maxRetries=50
Następnie opowiem o kilku ustawieniach, które nie są zbyt powiązane ze specyfiką naszego przypadku.
Aby szybciej uruchomić Sparka, zaleca się zarchiwizować folder jars znajdujący się w katalogu domowym SPARK_HOME i umieścić go na HDFS. Wtedy nie będzie tracił czasu na ładowanie tych jarników przez robotników.
spark.yarn.archive=hdfs:///tmp/spark-archive.zip
Zaleca się również użycie kryo jako serializatora w celu szybszego działania. Jest bardziej zoptymalizowany niż domyślny.
spark.serializer=org.apache.spark.serializer.KryoSerializer
Istnieje również długotrwały problem ze Sparkiem, polegający na tym, że często ulega awarii z pamięci. Często dzieje się tak w momencie, gdy pracownicy wszystko obliczyli i przesyłają wynik kierowcy. Sami powiększyliśmy ten parametr. Domyślnie jest to 1 GB, my zrobiliśmy 3.
spark.driver.maxResultSize=3072
I na koniec jako deser. Jak zaktualizować Spark do wersji 2.1 na dystrybucji HortonWorks - HDP 2.5.3.0. Ta wersja HDP zawiera preinstalowaną wersję 2.0, ale kiedyś sami zdecydowaliśmy, że Spark rozwija się dość aktywnie, a każda nowa wersja naprawia niektóre błędy i zapewnia dodatkowe funkcje, w tym dla API Pythona, więc zdecydowaliśmy, co musi zostać zrobione, to aktualizacja.
Pobrano wersję z oficjalnej strony internetowej Hadoop 2.7. Rozpakuj go i umieść w folderze HDP. W razie potrzeby zainstalowaliśmy dowiązania symboliczne. Uruchamiamy go - nie uruchamia się. Zapisuje bardzo dziwny błąd.
java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
Po googlowaniu dowiedzieliśmy się, że Spark postanowił nie czekać do narodzin Hadoopa i zdecydował się zastosować nową wersję koszulki. Sami kłócą się ze sobą na ten temat w JIRA. Rozwiązaniem było pobranie
Obeszliśmy ten błąd, ale pojawił się nowy, raczej usprawniony.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master
Jednocześnie staramy się uruchomić wersję 2.0 - wszystko jest w porządku. Spróbuj zgadnąć, co się dzieje. Zajrzeliśmy do dzienników tej aplikacji i zobaczyliśmy coś takiego:
/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar
Ogólnie rzecz biorąc, z jakiegoś powodu plik hdp.version nie został rozwiązany. Po googlowaniu znaleźliśmy rozwiązanie. Musisz przejść do ustawień YARN w Ambari i dodać tam parametr do niestandardowej witryny przędzy:
hdp.version=2.5.3.0-37
Ta magia pomogła i Spark wystartował. Przetestowaliśmy kilka naszych laptopów jupyter. Wszystko działa. Jesteśmy gotowi na pierwszą lekcję Sparka w sobotę (jutro)!
UPD. Podczas lekcji pojawił się kolejny problem. W pewnym momencie YARN przestał dostarczać kontenery dla Sparka. W YARN trzeba było poprawić parametr, który domyślnie wynosił 0.2:
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
Oznacza to, że tylko 20% zasobów uczestniczyło w dystrybucji zasobów. Po zmianie parametrów ponownie załadowaliśmy PRZĘDĘ. Problem został rozwiązany, a pozostali uczestnicy również mogli uruchomić kontekst iskry.
Źródło: www.habr.com