Habr, olá! Ontem em
Uma pequena introdução sobre como usamos o Spark. Temos um programa de três meses
A peculiaridade de nossa utilização é que o número de pessoas trabalhando simultaneamente no Spark pode ser igual a todo o grupo. Por exemplo, em um seminário, quando todos tentam algo ao mesmo tempo e repetem depois do nosso professor. E isso não é muito - às vezes até 40 pessoas. Provavelmente não existem muitas empresas no mundo que enfrentem tal caso de uso.
A seguir, direi como e por que selecionamos determinados parâmetros de configuração.
Vamos começar desde o início. O Spark tem 3 opções para rodar em um cluster: autônomo, usando Mesos e usando YARN. Decidimos escolher a terceira opção porque fazia sentido para nós. Já temos um cluster hadoop. Nossos participantes já conhecem bem sua arquitetura. Vamos usar o FIO.
spark.master=yarn
Ainda mais interessante. Cada uma dessas três opções de implantação possui duas opções de implantação: cliente e cluster. Baseado
spark.deploy-mode=client
Em geral, a partir de agora o Spark funcionará de alguma forma no YARN, mas isso não foi suficiente para nós. Como temos um programa sobre big data, às vezes os participantes não tinham o suficiente do que foi obtido no âmbito de uma divisão uniforme dos recursos. E então descobrimos uma coisa interessante: a alocação dinâmica de recursos. Resumindo, a questão é esta: se você tiver uma tarefa difícil e o cluster estiver livre (por exemplo, pela manhã), usar esta opção do Spark pode fornecer recursos adicionais. A necessidade é calculada ali de acordo com uma fórmula astuta. Não entraremos em detalhes – funciona bem.
spark.dynamicAllocation.enabled=true
Definimos esse parâmetro e, na inicialização, o Spark travou e não iniciou. Isso mesmo, porque eu tive que ler
spark.shuffle.service.enabled=true
Por que é necessário? Quando nosso trabalho não exigir mais tantos recursos, o Spark deverá devolvê-los ao pool comum. O estágio mais demorado em quase todas as tarefas do MapReduce é o estágio Shuffle. Este parâmetro permite salvar os dados gerados nesta fase e liberar os executores de acordo. E o executor é o processo que calcula tudo sobre o trabalhador. Possui um certo número de núcleos de processador e uma certa quantidade de memória.
Este parâmetro foi adicionado. Tudo parecia funcionar. Tornou-se perceptível que os participantes recebiam mais recursos quando precisavam deles. Mas surgiu outro problema - em algum momento outros participantes acordaram e também queriam usar o Spark, mas estava tudo ocupado ali e eles estavam insatisfeitos. Eles podem ser compreendidos. Começamos a olhar a documentação. Descobriu-se que existem vários outros parâmetros que podem ser usados para influenciar o processo. Por exemplo, se o executor estiver em modo de espera, depois de quanto tempo os recursos poderão ser retirados dele?
spark.dynamicAllocation.executorIdleTimeout=120s
No nosso caso, se seus executores não fizerem nada por dois minutos, por favor, devolva-os ao grupo comum. Mas este parâmetro nem sempre foi suficiente. Ficou claro que a pessoa não fazia nada há muito tempo e os recursos não estavam sendo liberados. Descobriu-se que também existe um parâmetro especial - após quanto tempo selecionar executores que contêm dados em cache. Por padrão, esse parâmetro era infinito! Nós corrigimos isso.
spark.dynamicAllocation.cachedExecutorIdleTimeout=600s
Ou seja, se seus executores não fizerem nada por 5 minutos, entregue-os ao grupo comum. Nesta modalidade, a velocidade de liberação e emissão de recursos para um grande número de usuários tornou-se decente. A quantidade de descontentamento diminuiu. Mas decidimos ir mais longe e limitar o número máximo de executores por aplicação – essencialmente por participante do programa.
spark.dynamicAllocation.maxExecutors=19
Agora, é claro, há pessoas insatisfeitas do outro lado - "o cluster está ocioso e só tenho 19 executores", mas o que você pode fazer? Precisamos de algum tipo de equilíbrio correto. Você não pode fazer todo mundo feliz.
E mais uma pequena história relacionada às especificidades do nosso caso. De alguma forma, várias pessoas se atrasaram para uma aula prática e, por algum motivo, o Spark não começou para elas. Observamos a quantidade de recursos gratuitos - parece que existe. O Spark deve começar. Felizmente, a essa altura a documentação já havia sido adicionada ao subcórtex em algum lugar, e lembramos que ao iniciar o Spark ele procura uma porta para iniciar. Se a primeira porta do intervalo estiver ocupada, ela passa para a próxima na ordem. Se for gratuito, captura. E existe um parâmetro que indica o número máximo de tentativas para isso. O padrão é 16. O número é menor que o número de pessoas do nosso grupo na turma. Assim, após 16 tentativas, Spark desistiu e disse que eu não poderia começar. Corrigimos esta configuração.
spark.port.maxRetries=50
A seguir falarei sobre algumas configurações que não estão muito relacionadas às especificidades do nosso caso.
Para iniciar o Spark mais rapidamente, é recomendado arquivar a pasta jars localizada no diretório inicial SPARK_HOME e colocá-la no HDFS. Então ele não perderá tempo carregando esses jarniks pelos trabalhadores.
spark.yarn.archive=hdfs:///tmp/spark-archive.zip
Também é recomendado usar kryo como serializador para uma operação mais rápida. É mais otimizado que o padrão.
spark.serializer=org.apache.spark.serializer.KryoSerializer
E também há um problema antigo com o Spark: ele frequentemente falha na memória. Muitas vezes isso acontece no momento em que os trabalhadores calculam tudo e enviam o resultado ao motorista. Aumentamos esse parâmetro para nós mesmos. Por padrão, é 1 GB, nós criamos 3.
spark.driver.maxResultSize=3072
E por último, como sobremesa. Como atualizar o Spark para a versão 2.1 na distribuição HortonWorks - HDP 2.5.3.0. Esta versão do HDP contém uma versão 2.0 pré-instalada, mas uma vez decidimos por nós mesmos que o Spark está se desenvolvendo de forma bastante ativa, e cada nova versão corrige alguns bugs e fornece recursos adicionais, inclusive para a API python, então decidimos o que precisa ser ser feito é uma atualização.
Baixei a versão do site oficial do Hadoop 2.7. Descompacte e coloque na pasta HDP. Instalamos os links simbólicos conforme necessário. Nós o lançamos - ele não inicia. Escreve um erro muito estranho.
java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
Depois de pesquisar no Google, descobrimos que o Spark decidiu não esperar até o nascimento do Hadoop e decidiu usar a nova versão do jersey. Eles próprios discutem entre si sobre esse tópico no JIRA. A solução foi baixar
Contornamos esse erro, mas surgiu um novo e bastante simplificado.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master
Ao mesmo tempo, tentamos rodar a versão 2.0 - está tudo bem. Tente adivinhar o que está acontecendo. Analisamos os logs deste aplicativo e vimos algo assim:
/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar
Em geral, por algum motivo, hdp.version não resolveu. Depois de pesquisar no Google, encontramos uma solução. Você precisa ir para as configurações do YARN no Ambari e adicionar um parâmetro lá para personalizar o site do fio:
hdp.version=2.5.3.0-37
Essa magia ajudou e Spark decolou. Testamos vários de nossos laptops jupyter. Tudo está funcionando. Estamos prontos para a primeira aula do Spark no sábado (amanhã)!
UPD. Durante a aula, outro problema veio à tona. Em algum momento, o YARN parou de fornecer contêineres para o Spark. No YARN foi necessário corrigir o parâmetro, que por padrão era 0.2:
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
Ou seja, apenas 20% dos recursos participaram da distribuição dos recursos. Após alterar os parâmetros, recarregamos o YARN. O problema foi resolvido e o restante dos participantes também conseguiu executar o contexto do Spark.
Fonte: habr.com