Wéi maachen ech en DAG Ausléiser am Airflow mat der Experimentell API

Bei der Virbereedung vun eise pädagogesche Programmer hu mir periodesch Schwieregkeeten am Sënn vun der Aarbecht mat e puer Tools. An de Moment wou mer se begéinen, gëtt et net ëmmer genuch Dokumentatioun an Artikelen, déi hëllefe mat dësem Problem ze këmmeren.

Sou war et zum Beispill am 2015, a mir hunn den Hadoop Cluster mat Spark fir 35 simultan Benotzer am Big Data Specialist Programm benotzt. Et war net kloer wéi een et op sou e Benotzerfall mat YARN virbereet. Als Resultat hunn se erausfonnt an de Wee eleng gaang, si hunn et gemaach Post op Habré an och opgefouert Moskau Spark Meetup.

Virgeschicht

Dës Kéier wäerte mir iwwer en anere Programm schwätzen - Daten Ingenieur. Dorop bauen eis Participanten zwou Aarte vun Architektur: Lambda a Kappa. An an der Lamdba Architektur gëtt Airflow als Deel vun der Batchveraarbechtung benotzt fir Logbicher vun HDFS op ClickHouse ze transferéieren.

Alles ass allgemeng gutt. Loosst se hir Pipelines bauen. Wéi och ëmmer, et gëtt en "awer": all eis Programmer sinn technologesch fortgeschratt wat de Léierprozess selwer ugeet. Fir de Labo z'iwwerpréiwen, benotze mir automatesch Checkers: de Participant muss op säi perséinleche Kont goen, klickt op de "Check" Knäppchen, a no enger Zäit gesäit hien eng Aart vu verlängerten Feedback iwwer dat wat hien gemaach huet. An et ass op dësem Punkt datt mir fänken un eise Problem unzegoen.

Dëse Labo iwwerpréift ass wéi follegt arrangéiert: mir schécken e Kontrolldatenpaket un de Kafka vum Participant, da transferéiert de Gobblin dësen Datepaket op HDFS, da hëlt Airflow dësen Datepaket a setzt se an ClickHouse. Den Trick ass datt Airflow dëst net an Echtzäit muss maachen, et mécht et op Zäitplang: eemol all 15 Minutten dauert et eng Rëtsch Dateien an lued se erop.

Et stellt sech eraus datt mir iergendwéi hiren DAG op eis Ufro mussen ausléisen, während de Checker hei an elo leeft. Googelen hu mir erausfonnt datt fir spéider Versioune vum Airflow e sougenannte Experimentell APIAn. D'Wuert experimental, natierlech, et kléngt grujeleg, mee wat ze maachen ... Et geet op eemol ugefaangen.

Als nächst wäerte mir de ganze Wee beschreiwen: vun der Installatioun vun Airflow bis zur Generatioun vun enger POST-Ufro déi en DAG mat der Experimentell API ausléist. Mir wäerte mat Ubuntu 16.04 schaffen.

1. Airflow Installatioun

Loosst eis kucken datt mir Python 3 a virtualenv hunn.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Wann ee vun dësen fehlt, da installéiere se.

Loosst eis elo e Verzeechnes erstellen an deem mir weider mat Airflow schaffen.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installéiert Airflow:

(venv) $ pip install airflow

Versioun, déi mir geschafft hunn: 1.10.

Elo musse mir e Verzeechnes erstellen airflow_home, wou d'DAG-Dateien an d'Airflow-Plugins sinn. Nodeems Dir de Verzeichnis erstallt hutt, setzt d'Ëmfeldvariabel AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

De nächste Schrëtt ass de Kommando auszeféieren deen d'Dateflow-Datebank an SQLite erstellt an initialiséiert:

(venv) $ airflow initdb

D'Datebank gëtt am airflow.db Default.

Kuckt ob Airflow installéiert ass:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Wann de Kommando geschafft huet, dann huet Airflow seng eege Konfiguratiounsdatei erstallt airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow huet eng Web Interface. Et kann gestart ginn andeems Dir de Kommando ausféiert:

(venv) $ airflow webserver --port 8081

Dir kënnt elo op d'Web-Interface an engem Browser um Hafen 8081 um Host wou Airflow leeft, sou wéi: <hostname:8081>.

2. Schafft mat der Experimentell API

Op dëser Airflow ass konfiguréiert a prett fir ze goen. Wéi och ëmmer, mir mussen och d'Experimentell API lafen. Eis Checkers sinn am Python geschriwwen, sou datt weider Ufroe mat der Bibliothéik drop sinn requests.

Eigentlech funktionnéiert d'API scho fir einfach Ufroen. Zum Beispill, esou eng Ufro erlaabt Iech seng Aarbecht ze testen:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Wann Dir esou e Message als Äntwert kritt, heescht et datt alles funktionnéiert.

Wéi och ëmmer, wa mir en DAG wëllen ausléisen, komme mir an der Tatsaach, datt dës Zort Ufro net ouni Authentifikatioun gemaach ka ginn.

Fir dëst ze maachen, musst Dir eng Rei vun Aktiounen maachen.

Als éischt musst Dir dëst an d'Konfiguratioun addéieren:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Da musst Dir Äre Benotzer mat Admin Rechter erstellen:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Als nächst musst Dir e Benotzer mat normale Rechter erstellen, deen erlaabt ass en DAG Ausléiser ze maachen.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Elo ass alles prett.

3. Lancéiere engem POST Ufro

D'POST Ufro selwer wäert esou ausgesinn:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Ufro erfollegräich veraarbecht.

Deementspriechend gi mir dem DAG e bëssen Zäit fir ze veraarbecht an eng Ufro un den ClickHouse Dësch ze maachen, probéiert de Kontrolldatenpaket ze fangen.

Verifikatioun fäerdeg.

Source: will.com

Setzt e Commentaire