Configuring Spark on YARN

Habr, hello! Yesterday on Apache Spark meetup, from the guys from Rambler&Co, there were quite a few questions from participants related to configuring this tool. We decided to follow in his footsteps and share our experience. The topic is not easy - therefore, we offer to share our experience in the comments too, maybe we also understand something wrong and use it.

A small introduction - how we use Spark. We have a XNUMX month program “Big Data Specialist”, and the entire second module, our participants work on this tool. Accordingly, our task, as organizers, is to prepare the cluster for use within the framework of such a case.

The peculiarity of our use is that the number of people simultaneously working on Spark can be equal to the entire group. For example, at a seminar, when everyone tries something at the same time and repeats after our teacher. And this is a little, a lot - sometimes under 40 people. There are probably not many companies in the world that face such a use case.

Next, I will tell you how and why we selected certain config parameters.

Let's start from the very beginning. Spark has 3 options to run on a cluster: standalone, using Mesos, and using YARN. We decided to choose the third option, because for us it was logical. We already have a hadoop cluster. Our participants are already familiar with its architecture. Let's use YARN.

spark.master=yarn

Further more interesting. Each of these 3 deployment options has 2 deployment options: client and cluster. Based documentation and various links on the Internet, we can conclude that the client is suitable for interactive work - for example, through jupyter notebook, and cluster is more suitable for production solutions. In our case, we were interested in interactive work, therefore:

spark.deploy-mode=client

In general, from now on, Spark will already somehow work on YARN, but this was not enough for us. Since we have a program about big data, sometimes the participants did not have enough of what was obtained within the framework of a uniform slicing of resources. And then we found an interesting thing - dynamic resource allocation. In short, the essence is as follows: if you have a difficult task and the cluster is free (for example, in the morning), then using this option, Spark can give you additional resources. Necessity is considered there according to a cunning formula. We will not go into details - it works well.

spark.dynamicAllocation.enabled=true

We set this parameter, and at startup, Spark cursed and did not start. That's right, because I had to read documentation more carefully. It says that in order for everything to be ok, you also need to enable an additional parameter.

spark.shuffle.service.enabled=true

Why is it needed? When our job no longer requires that many resources, then Spark should return them to the general pool. The most time-consuming stage in almost any MapReduce task is the Shuffle stage. This option allows you to save the data that is generated at this stage and release the executors accordingly. And executor is a process that calculates everything on the worker. It has a certain number of processor cores and a certain amount of memory.

Added this option. Everything seemed to work. It became noticeable that the participants actually began to be given more resources when they needed it. But another problem arose - at some point, other participants woke up and also wanted to use Spark, but everything was busy there, and they were unhappy. They can be understood. We started looking at the documentation. It turned out that there are still some number of parameters with which you can influence the process. For example, if the executor is in standby mode, after what time can resources be taken from it?

spark.dynamicAllocation.executorIdleTimeout=120s

In our case, if your executors do nothing for two minutes, then please return them to the common pool. But this parameter was not always enough. It was clear that a person has not been doing anything for a long time, and resources are not being released. It turned out that there is another special parameter - after what time to select executors that contain cached data. By default, this parameter was - infinity! We have corrected it.

spark.dynamicAllocation.cachedExecutorIdleTimeout=600s

That is, if your executors do nothing for 5 minutes, give them to the general pool. In this mode, the speed of releasing and issuing resources for a large number of users has become decent. The amount of dissatisfaction has decreased. But we decided to go further and limit the maximum number of executors per application - in fact, per program participant.

spark.dynamicAllocation.maxExecutors=19

Now, of course, there are those who are dissatisfied on the other side - “the cluster is idle, and I have only 19 executors”, but what to do - some kind of right balance is needed. You can't make everyone happy.

And one more small story related to the specifics of our case. Somehow, several people were late for a practical lesson, and for some reason Spark did not start for them. We looked at the amount of free resources - it seems to be there. Spark should start. Fortunately, by that time the documentation had already signed up somewhere for the subcortex, and we remembered that when starting Spark, it was looking for a port on which to start. If the first port in the range is busy, then it goes to the next one in order. If it is free, then it captures. And there is a parameter that indicates the maximum number of attempts for this. The default is 16. The number is less than the number of people in our group in the class. Accordingly, after 16 attempts, Spark abandoned this case and said that I could not start. We have corrected this setting.

spark.port.maxRetries=50

Next, I’ll talk about some settings that are no longer strongly related to the specifics of our case.

For a faster start of Spark, there is a recommendation to archive the jars folder located in the SPARK_HOME home directory and put it on HDFS. Then he will not waste time loading these jars by worker.

spark.yarn.archive=hdfs:///tmp/spark-archive.zip

Also, for faster work, it is recommended to use kryo as a serializer. It is more optimized than the default one.

spark.serializer=org.apache.spark.serializer.KryoSerializer

And there is still a long-standing problem with Spark, that it often falls from memory. Often this happens at the moment when the workers have calculated everything and send the result to the driver. We made this parameter bigger for ourselves. By default, it is 1GB, we did - 3.

spark.driver.maxResultSize=3072

And lastly, for dessert. How to update Spark to version 2.1 on HortonWorks distribution - HDP 2.5.3.0. This version of HDP contains a pre-installed version 2.0, but once we decided for ourselves that Spark is developing quite actively, and each new version fixes some bugs plus provides additional features, including for the python API, so we decided that you need to do an update.

We downloaded the version from the official site for Hadoop 2.7. Unzipped, thrown into a folder with HDP. We put symlinks as it should be. We start - it does not start. Writes a very incomprehensible error.

java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Googling, we found out that Spark decided not to wait until Hadoop was born, and decided to use the new version of jersey. They themselves there with each other swear on this topic in JIRA. The solution was to download jersey version 1.17.1. Throw it in the jars folder in SPARK_HOME, zip it again and upload it to HDFS.

We got around this error, but a new and rather streamlined one arose.

org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master

At the same time, we try to run version 2.0 - everything is ok. Try to guess what it is. We got into the logs of this application and saw something like this:

/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar

In general, for some reason hdp.version did not resolve. Googling, we found a solution. You need to go to the YARN settings in Ambari and add a parameter there to custom yarn-site:

hdp.version=2.5.3.0-37

This magic helped and Spark took off. We tested several of our jupyter laptops. Everything is working. We are ready for the first Spark class on Saturday (already tomorrow)!

UPD. Another problem came up in class. At some point, YARN stopped issuing containers for Spark. In YARN, it was necessary to correct the parameter, which was 0.2 by default:

yarn.scheduler.capacity.maximum-am-resource-percent=0.8

That is, only 20% of the resources participated in the distribution of resources. After changing the parameters, we reloaded YARN. The issue was resolved and the rest of the contributors were also able to run the spark context.

Source: habr.com

Add a comment