Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Olá, Habr! Neste artigo quero falar sobre uma ótima ferramenta para desenvolver processos de processamento de dados em lote, por exemplo, na infraestrutura de um DWH corporativo ou no seu DataLake. Falaremos sobre Apache Airflow (doravante denominado Airflow). É injustamente privado de atenção em Habré, e na parte principal tentarei convencê-lo de que pelo menos vale a pena dar uma olhada no Airflow ao escolher um agendador para seus processos ETL/ELT.

Anteriormente, escrevi uma série de artigos sobre o tema DWH quando trabalhei no Tinkoff Bank. Agora faço parte da equipe do Grupo Mail.Ru e estou desenvolvendo uma plataforma para análise de dados na área de jogos. Na verdade, à medida que surgirem novidades e soluções interessantes, eu e minha equipe falaremos aqui sobre nossa plataforma de análise de dados.

Prólogo

Então, vamos começar. O que é fluxo de ar? Esta é uma biblioteca (ou conjunto de bibliotecas) para desenvolver, planejar e monitorar processos de trabalho. A principal característica do Airflow: o código Python é usado para descrever (desenvolver) processos. Isso traz muitas vantagens para organizar seu projeto e desenvolvimento: em essência, seu (por exemplo) projeto ETL é apenas um projeto Python, e você pode organizá-lo como desejar, levando em consideração as especificidades da infraestrutura, tamanho da equipe e outros requerimentos. Instrumentalmente tudo é simples. Use por exemplo PyCharm + Git. É maravilhoso e muito prático!

Agora vamos dar uma olhada nas principais entidades do Airflow. Ao compreender sua essência e propósito, você pode organizar de maneira ideal sua arquitetura de processos. Talvez a entidade principal seja o Gráfico Acíclico Direcionado (doravante denominado DAG).

DAG

Um DAG é uma associação significativa de suas tarefas que você deseja concluir em uma sequência estritamente definida de acordo com um cronograma específico. O Airflow fornece uma interface web conveniente para trabalhar com DAGs e outras entidades:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

O DAG pode ser assim:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

O desenvolvedor, ao projetar um DAG, estabelece um conjunto de operadores nos quais as tarefas dentro do DAG serão construídas. Aqui chegamos a outra entidade importante: Operador de Fluxo de Ar.

operadores

Um operador é uma entidade com base na qual as instâncias de trabalho são criadas, que descreve o que acontecerá durante a execução de uma instância de trabalho. Lançamentos do Airflow do GitHub já contém um conjunto de operadores prontos para uso. Exemplos:

  • BashOperator - operador para executar um comando bash.
  • PythonOperator - operador para chamar código Python.
  • EmailOperator — operador para envio de email.
  • HTTPOperator - operador para trabalhar com solicitações http.
  • SqlOperator - operador para execução de código SQL.
  • Sensor é um operador de espera por um evento (chegada do tempo requerido, aparecimento do arquivo requerido, uma linha no banco de dados, uma resposta da API, etc., etc.).

Existem operadores mais específicos: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Você também pode desenvolver operadores com base em suas próprias características e utilizá-los em seu projeto. Por exemplo, criamos o MongoDBToHiveViaHdfsTransfer, um operador para exportar documentos do MongoDB para o Hive, e vários operadores para trabalhar com clickhouse: CHLoadFromHiveOperator e CHTableLoaderOperator. Essencialmente, assim que um projeto tiver usado frequentemente código baseado em instruções básicas, você poderá pensar em construí-lo em uma nova instrução. Isso simplificará o desenvolvimento adicional e você expandirá sua biblioteca de operadores no projeto.

A seguir, todas essas instâncias de tarefas precisam ser executadas, e agora falaremos sobre o agendador.

Agendador

O agendador de tarefas do Airflow é baseado em Aipo. Celery é uma biblioteca Python que permite organizar uma fila e executar tarefas assíncronas e distribuídas. No lado do Airflow, todas as tarefas são divididas em pools. Os pools são criados manualmente. Normalmente, seu objetivo é limitar a carga de trabalho de trabalho com a fonte ou tipificar tarefas dentro do DWH. Os pools podem ser gerenciados através da interface web:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Cada pool tem um limite no número de slots. Ao criar um DAG, é fornecido um pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Um pool definido no nível do DAG pode ser substituído no nível da tarefa.
Um processo separado, o Agendador, é responsável por agendar todas as tarefas no Airflow. Na verdade, o Scheduler lida com toda a mecânica de configuração de tarefas para execução. A tarefa passa por vários estágios antes de ser executada:

  1. As tarefas anteriores foram concluídas no DAG; uma nova pode ser colocada na fila.
  2. A fila é ordenada de acordo com a prioridade das tarefas (as prioridades também podem ser controladas), e se houver um slot livre no pool, a tarefa pode ser colocada em operação.
  3. Se houver um aipo trabalhador livre, a tarefa é enviada para ele; começa o trabalho que você programou no problema, usando um ou outro operador.

Simples o suficiente.

O Agendador é executado no conjunto de todos os DAGs e em todas as tarefas dentro dos DAGs.

Para que o Agendador comece a trabalhar com o DAG, o DAG precisa definir um cronograma:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Existe um conjunto de predefinições prontas: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Você também pode usar expressões cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data de execução

Para entender como funciona o Airflow, é importante entender qual é a data de execução de um DAG. No Airflow, o DAG possui uma dimensão Data de Execução, ou seja, dependendo do cronograma de trabalho do DAG, são criadas instâncias de tarefas para cada Data de Execução. E para cada Data de Execução, as tarefas podem ser reexecutadas – ou, por exemplo, um DAG pode trabalhar simultaneamente em diversas Datas de Execução. Isso é claramente mostrado aqui:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Infelizmente (ou talvez felizmente: depende da situação), se a implementação da tarefa no DAG for corrigida, a execução na Data de Execução anterior prosseguirá tendo em conta os ajustes. Isso é bom se você precisar recalcular dados de períodos anteriores usando um novo algoritmo, mas é ruim porque a reprodutibilidade do resultado é perdida (claro, ninguém o incomoda em retornar a versão necessária do código-fonte do Git e calcular o que você precisa de uma vez, do jeito que você precisa).

Gerando tarefas

A implementação do DAG é código em Python, então temos uma maneira muito conveniente de reduzir a quantidade de código ao trabalhar, por exemplo, com fontes fragmentadas. Digamos que você tenha três fragmentos do MySQL como fonte, você precisa entrar em cada um deles e coletar alguns dados. Além disso, de forma independente e em paralelo. O código Python no DAG pode ser assim:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

O DAG fica assim:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Nesse caso, você pode adicionar ou remover um fragmento simplesmente ajustando as configurações e atualizando o DAG. Confortável!

Você também pode usar geração de código mais complexa, por exemplo, trabalhar com fontes na forma de um banco de dados ou descrever uma estrutura de tabela, um algoritmo para trabalhar com uma tabela e, levando em consideração os recursos da infraestrutura DWH, gerar um processo para carregar N tabelas em seu armazenamento. Ou, por exemplo, trabalhando com uma API que não suporta trabalhar com um parâmetro na forma de uma lista, você pode gerar N tarefas em um DAG a partir desta lista, limitar o paralelismo de solicitações na API para um pool e raspar os dados necessários da API. Flexível!

repositório

O Airflow possui seu próprio repositório backend, um banco de dados (pode ser MySQL ou Postgres, temos Postgres), que armazena os estados das tarefas, DAGs, configurações de conexão, variáveis ​​globais, etc., etc. O repositório no Airflow é muito simples (cerca de 20 tabelas) e conveniente se você deseja construir algum de seus próprios processos sobre ele. Lembro-me das 100500 tabelas do repositório da Informatica, que tiveram que ser muito estudadas antes de entender como construir uma consulta.

Monitoramento

Dada a simplicidade do repositório, você pode construir um processo de monitoramento de tarefas que seja conveniente para você. Usamos um bloco de notas no Zeppelin, onde observamos o status das tarefas:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

Esta também pode ser a interface web do próprio Airflow:

Airflow é uma ferramenta para desenvolver e manter processos de processamento de dados em lote de maneira conveniente e rápida

O código do Airflow é de código aberto, por isso adicionamos alertas ao Telegram. Cada instância em execução de uma tarefa, caso ocorra um erro, envia spam para o grupo no Telegram, onde consiste toda a equipe de desenvolvimento e suporte.

Recebemos uma resposta imediata através do Telegram (se necessário) e através do Zeppelin recebemos uma visão geral das tarefas no Airflow.

No total

O Airflow é principalmente de código aberto e você não deve esperar milagres dele. Esteja preparado para investir tempo e esforço para construir uma solução que funcione. O objetivo é alcançável, acredite, vale a pena. Velocidade de desenvolvimento, flexibilidade, facilidade de adição de novos processos - você vai gostar. Claro que é preciso prestar muita atenção na organização do projeto, na estabilidade do próprio Airflow: milagres não acontecem.

Agora temos o Airflow funcionando diariamente cerca de 6,5 mil tarefas. Eles são bem diferentes em caráter. Existem tarefas de carregamento de dados no DWH principal de muitas fontes diferentes e muito específicas, existem tarefas de cálculo de vitrines dentro do DWH principal, existem tarefas de publicação de dados em um DWH rápido, existem muitas, muitas tarefas diferentes - e Airflow mastiga todos eles dia após dia. Falando em números, isso é Milhares de 2,3 Tarefas ELT de complexidade variável dentro de DWH (Hadoop), aprox. 2,5 centenas de bancos de dados fontes, esta é uma equipe de 4 desenvolvedores de ETL, que são divididos em processamento de dados ETL em DWH e processamento de dados ELT dentro de DWH e, claro, mais um administrador, que trata da infraestrutura do serviço.

Planos para o futuro

O número de processos está inevitavelmente crescendo, e a principal coisa que faremos em termos de infraestrutura do Airflow é o dimensionamento. Queremos construir um cluster Airflow, alocar um par de pernas para trabalhadores do Celery e criar um cabeçote autoduplicado com processos de agendamento de trabalho e um repositório.

Epílogo

Claro que isso não é tudo que eu gostaria de contar sobre o Airflow, mas tentei destacar os pontos principais. O apetite vem com a comida, experimente e você vai gostar :)

Fonte: habr.com

Adicionar um comentário