Configurando o Spark no YARN

Habr, olá! Ontem em encontro dedicado ao Apache Spark, do pessoal da Rambler&Co, houve muitas dúvidas dos participantes relacionadas à configuração desta ferramenta. Decidimos seguir seus passos e compartilhar nossa experiência. O tema não é fácil - por isso convidamos você a compartilhar sua experiência nos comentários, talvez também entendamos e usemos algo errado.

Uma pequena introdução sobre como usamos o Spark. Temos um programa de três meses “Especialista em Big Data”, e ao longo do segundo módulo nossos participantes trabalham neste instrumento. Conseqüentemente, nossa tarefa, como organizadores, é preparar o cluster para uso nesse caso.

A peculiaridade de nossa utilização é que o número de pessoas trabalhando simultaneamente no Spark pode ser igual a todo o grupo. Por exemplo, em um seminário, quando todos tentam algo ao mesmo tempo e repetem depois do nosso professor. E isso não é muito - às vezes até 40 pessoas. Provavelmente não existem muitas empresas no mundo que enfrentem tal caso de uso.

A seguir, direi como e por que selecionamos determinados parâmetros de configuração.

Vamos começar desde o início. O Spark tem 3 opções para rodar em um cluster: autônomo, usando Mesos e usando YARN. Decidimos escolher a terceira opção porque fazia sentido para nós. Já temos um cluster hadoop. Nossos participantes já conhecem bem sua arquitetura. Vamos usar o FIO.

spark.master=yarn

Ainda mais interessante. Cada uma dessas três opções de implantação possui duas opções de implantação: cliente e cluster. Baseado documentação e vários links na Internet, podemos concluir que o cliente é adequado para trabalhos interativos - por exemplo, através do Jupyter Notebook, e o cluster é mais adequado para soluções de produção. No nosso caso, estávamos interessados ​​em trabalhos interativos, portanto:

spark.deploy-mode=client

Em geral, a partir de agora o Spark funcionará de alguma forma no YARN, mas isso não foi suficiente para nós. Como temos um programa sobre big data, às vezes os participantes não tinham o suficiente do que foi obtido no âmbito de uma divisão uniforme dos recursos. E então descobrimos uma coisa interessante: a alocação dinâmica de recursos. Resumindo, a questão é esta: se você tiver uma tarefa difícil e o cluster estiver livre (por exemplo, pela manhã), usar esta opção do Spark pode fornecer recursos adicionais. A necessidade é calculada ali de acordo com uma fórmula astuta. Não entraremos em detalhes – funciona bem.

spark.dynamicAllocation.enabled=true

Definimos esse parâmetro e, na inicialização, o Spark travou e não iniciou. Isso mesmo, porque eu tive que ler documentação mais cuidadosamente. Afirma que para que tudo corra bem é necessário também habilitar um parâmetro adicional.

spark.shuffle.service.enabled=true

Por que é necessário? Quando nosso trabalho não exigir mais tantos recursos, o Spark deverá devolvê-los ao pool comum. O estágio mais demorado em quase todas as tarefas do MapReduce é o estágio Shuffle. Este parâmetro permite salvar os dados gerados nesta fase e liberar os executores de acordo. E o executor é o processo que calcula tudo sobre o trabalhador. Possui um certo número de núcleos de processador e uma certa quantidade de memória.

Este parâmetro foi adicionado. Tudo parecia funcionar. Tornou-se perceptível que os participantes recebiam mais recursos quando precisavam deles. Mas surgiu outro problema - em algum momento outros participantes acordaram e também queriam usar o Spark, mas estava tudo ocupado ali e eles estavam insatisfeitos. Eles podem ser compreendidos. Começamos a olhar a documentação. Descobriu-se que existem vários outros parâmetros que podem ser usados ​​para influenciar o processo. Por exemplo, se o executor estiver em modo de espera, depois de quanto tempo os recursos poderão ser retirados dele?

spark.dynamicAllocation.executorIdleTimeout=120s

No nosso caso, se seus executores não fizerem nada por dois minutos, por favor, devolva-os ao grupo comum. Mas este parâmetro nem sempre foi suficiente. Ficou claro que a pessoa não fazia nada há muito tempo e os recursos não estavam sendo liberados. Descobriu-se que também existe um parâmetro especial - após quanto tempo selecionar executores que contêm dados em cache. Por padrão, esse parâmetro era infinito! Nós corrigimos isso.

spark.dynamicAllocation.cachedExecutorIdleTimeout=600s

Ou seja, se seus executores não fizerem nada por 5 minutos, entregue-os ao grupo comum. Nesta modalidade, a velocidade de liberação e emissão de recursos para um grande número de usuários tornou-se decente. A quantidade de descontentamento diminuiu. Mas decidimos ir mais longe e limitar o número máximo de executores por aplicação – essencialmente por participante do programa.

spark.dynamicAllocation.maxExecutors=19

Agora, é claro, há pessoas insatisfeitas do outro lado - "o cluster está ocioso e só tenho 19 executores", mas o que você pode fazer? Precisamos de algum tipo de equilíbrio correto. Você não pode fazer todo mundo feliz.

E mais uma pequena história relacionada às especificidades do nosso caso. De alguma forma, várias pessoas se atrasaram para uma aula prática e, por algum motivo, o Spark não começou para elas. Observamos a quantidade de recursos gratuitos - parece que existe. O Spark deve começar. Felizmente, a essa altura a documentação já havia sido adicionada ao subcórtex em algum lugar, e lembramos que ao iniciar o Spark ele procura uma porta para iniciar. Se a primeira porta do intervalo estiver ocupada, ela passa para a próxima na ordem. Se for gratuito, captura. E existe um parâmetro que indica o número máximo de tentativas para isso. O padrão é 16. O número é menor que o número de pessoas do nosso grupo na turma. Assim, após 16 tentativas, Spark desistiu e disse que eu não poderia começar. Corrigimos esta configuração.

spark.port.maxRetries=50

A seguir falarei sobre algumas configurações que não estão muito relacionadas às especificidades do nosso caso.

Para iniciar o Spark mais rapidamente, é recomendado arquivar a pasta jars localizada no diretório inicial SPARK_HOME e colocá-la no HDFS. Então ele não perderá tempo carregando esses jarniks pelos trabalhadores.

spark.yarn.archive=hdfs:///tmp/spark-archive.zip

Também é recomendado usar kryo como serializador para uma operação mais rápida. É mais otimizado que o padrão.

spark.serializer=org.apache.spark.serializer.KryoSerializer

E também há um problema antigo com o Spark: ele frequentemente falha na memória. Muitas vezes isso acontece no momento em que os trabalhadores calculam tudo e enviam o resultado ao motorista. Aumentamos esse parâmetro para nós mesmos. Por padrão, é 1 GB, nós criamos 3.

spark.driver.maxResultSize=3072

E por último, como sobremesa. Como atualizar o Spark para a versão 2.1 na distribuição HortonWorks - HDP 2.5.3.0. Esta versão do HDP contém uma versão 2.0 pré-instalada, mas uma vez decidimos por nós mesmos que o Spark está se desenvolvendo de forma bastante ativa, e cada nova versão corrige alguns bugs e fornece recursos adicionais, inclusive para a API python, então decidimos o que precisa ser ser feito é uma atualização.

Baixei a versão do site oficial do Hadoop 2.7. Descompacte e coloque na pasta HDP. Instalamos os links simbólicos conforme necessário. Nós o lançamos - ele não inicia. Escreve um erro muito estranho.

java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Depois de pesquisar no Google, descobrimos que o Spark decidiu não esperar até o nascimento do Hadoop e decidiu usar a nova versão do jersey. Eles próprios discutem entre si sobre esse tópico no JIRA. A solução foi baixar versão da camisa 1.17.1. Coloque-o na pasta jars em SPARK_HOME, compacte-o novamente e carregue-o no HDFS.

Contornamos esse erro, mas surgiu um novo e bastante simplificado.

org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master

Ao mesmo tempo, tentamos rodar a versão 2.0 - está tudo bem. Tente adivinhar o que está acontecendo. Analisamos os logs deste aplicativo e vimos algo assim:

/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar

Em geral, por algum motivo, hdp.version não resolveu. Depois de pesquisar no Google, encontramos uma solução. Você precisa ir para as configurações do YARN no Ambari e adicionar um parâmetro lá para personalizar o site do fio:

hdp.version=2.5.3.0-37

Essa magia ajudou e Spark decolou. Testamos vários de nossos laptops jupyter. Tudo está funcionando. Estamos prontos para a primeira aula do Spark no sábado (amanhã)!

UPD. Durante a aula, outro problema veio à tona. Em algum momento, o YARN parou de fornecer contêineres para o Spark. No YARN foi necessário corrigir o parâmetro, que por padrão era 0.2:

yarn.scheduler.capacity.maximum-am-resource-percent=0.8

Ou seja, apenas 20% dos recursos participaram da distribuição dos recursos. Após alterar os parâmetros, recarregamos o YARN. O problema foi resolvido e o restante dos participantes também conseguiu executar o contexto do Spark.

Fonte: habr.com

Adicionar um comentário