Como fazer um gatilho DAG no Airflow usando a API Experimental

Na preparação dos nossos programas educativos, periodicamente encontramos dificuldades no trabalho com algumas ferramentas. E no momento em que os encontramos, nem sempre há documentação e artigos suficientes que ajudem a lidar com este problema.

Assim foi, por exemplo, em 2015, e utilizamos o cluster Hadoop com Spark para 35 usuários simultâneos no programa Big Data Specialist. Não ficou claro como prepará-lo para esse caso de usuário usando o YARN. Como resultado, tendo descoberto e trilhado o caminho por conta própria, eles fizeram postar no Habré e também realizou Encontro do Spark em Moscou.

Pré-história

Desta vez falaremos de um programa diferente - Engenheiro de Dados. Nele, nossos participantes constroem dois tipos de arquitetura: lambda e kappa. E na arquitetura lamdba, o Airflow é usado como parte do processamento em lote para transferir logs do HDFS para o ClickHouse.

Geralmente tudo é bom. Deixe-os construir seus pipelines. Porém, existe um “mas”: todos os nossos programas são tecnologicamente avançados em termos do próprio processo de aprendizagem. Para verificar o laboratório, usamos verificadores automáticos: o participante precisa acessar sua conta pessoal, clicar no botão “Verificar” e depois de um tempo ele vê algum tipo de feedback estendido sobre o que fez. E é neste ponto que começamos a abordar o nosso problema.

A verificação deste laboratório é organizada da seguinte forma: enviamos um pacote de dados de controle para o Kafka do participante, então o Gobblin transfere esse pacote de dados para o HDFS, então o Airflow pega esse pacote de dados e o coloca no ClickHouse. O truque é que o Airflow não precisa fazer isso em tempo real, ele faz isso dentro do cronograma: uma vez a cada 15 minutos ele pega um monte de arquivos e os carrega.

Acontece que precisamos de alguma forma acionar seu DAG por conta própria, a nosso pedido, enquanto o verificador está sendo executado aqui e agora. Pesquisando no Google, descobrimos que para versões posteriores do Airflow existe um chamado API experimental. A palavra experimental, claro, parece assustador, mas o que fazer... De repente, ele decola.

A seguir, descreveremos todo o caminho: desde a instalação do Airflow até a geração de uma solicitação POST que aciona um DAG usando a API Experimental. Trabalharemos com Ubuntu 16.04.

1. Instalação de fluxo de ar

Vamos verificar se temos Python 3 e virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Se um deles estiver faltando, instale-o.

Agora vamos criar um diretório no qual continuaremos trabalhando com o Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instale o fluxo de ar:

(venv) $ pip install airflow

Versão em que trabalhamos: 1.10.

Agora precisamos criar um diretório airflow_home, onde os arquivos DAG e plug-ins do Airflow estarão localizados. Após criar o diretório, defina a variável de ambiente AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

A próxima etapa é executar o comando que irá criar e inicializar o banco de dados de fluxo de dados no SQLite:

(venv) $ airflow initdb

O banco de dados será criado em airflow.db por padrão.

Verifique se o Airflow está instalado:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Se o comando funcionou, o Airflow criou seu próprio arquivo de configuração airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

O Airflow possui uma interface web. Ele pode ser iniciado executando o comando:

(venv) $ airflow webserver --port 8081

Agora você pode acessar a interface da web em um navegador na porta 8081 do host onde o Airflow estava sendo executado, assim: <hostname:8081>.

2. Trabalhando com a API Experimental

Neste Airflow está configurado e pronto para uso. No entanto, também precisamos executar a API Experimental. Nossos verificadores são escritos em Python, então todas as solicitações estarão nele usando a biblioteca requests.

Na verdade a API já está funcionando para solicitações simples. Por exemplo, tal solicitação permite testar seu funcionamento:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Se você recebeu essa mensagem em resposta, significa que tudo está funcionando.

Porém, quando queremos acionar um DAG, nos deparamos com o fato de que esse tipo de solicitação não pode ser feita sem autenticação.

Para fazer isso, você precisará realizar uma série de ações.

Primeiro, você precisa adicionar isto à configuração:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Então, você precisa criar seu usuário com direitos de administrador:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Em seguida, você precisa criar um usuário com direitos normais que terá permissão para disparar um DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Agora tudo está pronto.

3. Lançando uma solicitação POST

A própria solicitação POST ficará assim:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Solicitação processada com sucesso.

Dessa forma, damos algum tempo ao DAG para processar e fazer uma solicitação à tabela ClickHouse, tentando capturar o pacote de dados de controle.

Verificação concluída.

Fonte: habr.com

Adicionar um comentário