How to make a DAG trigger in Airflow using the Experimental API

In preparing our educational programs, we periodically encounter difficulties in terms of working with some tools. And at the moment when we encounter them, there is not always enough documentation and articles that would help to cope with this problem.

So it was, for example, in 2015, and we used the Hadoop cluster with Spark for 35 simultaneous users on the Big Data Specialist program. It was not clear how to prepare it for such a user case using YARN. As a result, having figured out and walking the path on their own, they did post on Habré and also performed Moscow Spark Meetup.

prehistory

This time we will talk about a different program - Data engineer. On it, our participants build two types of architecture: lambda and kappa. And in the lamdba architecture, Airflow is used as part of batch processing to transfer logs from HDFS to ClickHouse.

Everything is generally good. Let them build their pipelines. However, there is a “but”: all our programs are technologically advanced in terms of the learning process itself. To check the lab, we use automatic checkers: the participant needs to go to his personal account, click the “Check” button, and after a while he sees some kind of extended feedback on what he did. And it is at this point that we begin to approach our problem.

Checking this lab is arranged as follows: we send a control data packet to the participant's Kafka, then Gobblin transfers this data packet to HDFS, then Airflow takes this data packet and puts it in ClickHouse. The trick is that Airflow does not have to do this in real time, it does it on schedule: once every 15 minutes it takes a bunch of files and uploads them.

It turns out that we need to somehow trigger their DAG on our own at our request while the checker is running here and now. Googling, we found out that for later versions of Airflow there is a so-called Experimental API. Word experimental, of course, it sounds scary, but what to do ... It suddenly takes off.

Next, we will describe the whole path: from installing Airflow to generating a POST request that triggers a DAG using the Experimental API. We will work with Ubuntu 16.04.

1. Airflow installation

Let's check that we have Python 3 and virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

If one of these is missing, then install it.

Now let's create a directory in which we will continue to work with Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Install Airflow:

(venv) $ pip install airflow

Version we worked on: 1.10.

Now we need to create a directory airflow_home, where the DAG files and Airflow plugins will be located. After creating the directory, set the environment variable AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

The next step is to run the command that will create and initialize the dataflow database in SQLite:

(venv) $ airflow initdb

The database will be created in airflow.db default.

Check if Airflow is installed:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

If the command worked, then Airflow created its own configuration file airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow has a web interface. It can be launched by running the command:

(venv) $ airflow webserver --port 8081

You can now access the web interface in a browser on port 8081 on the host where Airflow was running, like this: <hostname:8081>.

2. Working with the Experimental API

On this Airflow is configured and ready to go. However, we also need to run the Experimental API. Our checkers are written in Python, so further all requests will be on it using the library requests.

Actually the API is already working for simple requests. For example, such a request allows you to test its work:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

If you received such a message in response, it means that everything is working.

However, when we want to trigger a DAG, we run into the fact that this kind of request cannot be made without authentication.

To do this, you will need to do a number of actions.

First, you need to add this to the config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Then, you need to create your user with admin rights:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Next, you need to create a user with normal rights that will be allowed to make a DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Now everything is ready.

3. Launching a POST request

The POST request itself will look like this:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Request processed successfully.

Accordingly, then we give the DAG some time to process and make a request to the ClickHouse table, trying to catch the control data packet.

Verification completed.

Source: habr.com

Add a comment