Πώς να δημιουργήσετε μια ενεργοποίηση DAG στο Airflow χρησιμοποιώντας το Experimental API

Κατά την προετοιμασία των εκπαιδευτικών μας προγραμμάτων, συναντάμε περιοδικά δυσκολίες όσον αφορά την εργασία με συγκεκριμένα εργαλεία. Και τη στιγμή που τα συναντάμε, δεν υπάρχει πάντα αρκετή τεκμηρίωση και άρθρα που θα μας βοηθούσαν να αντιμετωπίσουμε αυτό το πρόβλημα.

Αυτό συνέβη, για παράδειγμα, το 2015 και κατά τη διάρκεια του προγράμματος «Big Data Specialist» χρησιμοποιήσαμε ένα σύμπλεγμα Hadoop με Spark για 35 ταυτόχρονους χρήστες. Δεν ήταν σαφές πώς να το προετοιμάσετε για μια τέτοια περίπτωση χρήσης χρησιμοποιώντας ΝΗΜΑ. Στο τέλος, αφού το καταλάβαμε και περπατήσαμε μόνοι μας το μονοπάτι, το καταφέραμε ανάρτηση στο Habré και επίσης εκτελέστηκε στο Μόσχα Spark Meetup.

Ιστορικό

Αυτή τη φορά θα μιλήσουμε για ένα διαφορετικό πρόγραμμα - Μηχανικός δεδομένων. Οι συμμετέχοντες μας χτίζουν δύο τύπους αρχιτεκτονικής πάνω του: λάμδα και κάπα. Και στην αρχιτεκτονική lamdba, ως μέρος της επεξεργασίας κατά παρτίδες, το Airflow χρησιμοποιείται για τη μεταφορά αρχείων καταγραφής από το HDFS στο ClickHouse.

Όλα είναι γενικά καλά. Αφήστε τους να φτιάξουν τους δικούς τους αγωγούς. Ωστόσο, υπάρχει ένα «αλλά»: όλα τα προγράμματά μας είναι τεχνολογικά προηγμένα από την άποψη της ίδιας της μαθησιακής διαδικασίας. Για να ελέγξουμε το εργαστήριο, χρησιμοποιούμε αυτόματα πούλια: ο συμμετέχων πρέπει να μεταβεί στον προσωπικό του λογαριασμό, να κάνει κλικ στο κουμπί "Έλεγχος" και μετά από κάποιο χρονικό διάστημα βλέπει κάποιου είδους εκτεταμένη ανατροφοδότηση για αυτό που έκανε. Και είναι αυτή τη στιγμή που αρχίζουμε να προσεγγίζουμε το πρόβλημά μας.

Η επαλήθευση αυτού του εργαστηρίου είναι δομημένη ως εξής: στέλνουμε ένα πακέτο δεδομένων ελέγχου στο Kafka του συμμετέχοντος, στη συνέχεια ο Gobblin μεταφέρει αυτό το πακέτο δεδομένων στο HDFS και, στη συνέχεια, το Airflow παίρνει αυτό το πακέτο δεδομένων και το τοποθετεί στο ClickHouse. Το κόλπο είναι ότι το Airflow δεν χρειάζεται να το κάνει αυτό σε πραγματικό χρόνο, το κάνει σύμφωνα με ένα πρόγραμμα: κάθε 15 λεπτά χρειάζεται ένα σωρό αρχεία και τα ανεβάζει.

Αποδεικνύεται ότι πρέπει με κάποιο τρόπο να ενεργοποιήσουμε μόνοι μας το DAG τους κατόπιν αιτήματός μας, ενώ το πούλι εκτελείται εδώ και τώρα. Αφού γκουγκλάραμε, ανακαλύψαμε ότι για μεταγενέστερες εκδόσεις του Airflow υπάρχει το λεγόμενο Πειραματικό API. λέξη experimental, βέβαια, ακούγεται τρομακτικό, αλλά τι να κάνουμε... Ξαφνικά απογειώνεται.

Στη συνέχεια, θα περιγράψουμε ολόκληρη τη διαδρομή: από την εγκατάσταση Airflow έως τη δημιουργία ενός αιτήματος POST που ενεργοποιεί το DAG χρησιμοποιώντας το Experimental API. Θα δουλέψουμε με το Ubuntu 16.04.

1. Εγκατάσταση ροής αέρα

Ας ελέγξουμε ότι έχουμε Python 3 και virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Εάν λείπει κάποιο από αυτά, εγκαταστήστε το.

Τώρα ας δημιουργήσουμε έναν κατάλογο στον οποίο θα συνεχίσουμε να εργαζόμαστε με το Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Εγκαταστήστε τη ροή αέρα:

(venv) $ pip install airflow

Η έκδοση που δουλέψαμε: 1.10.

Τώρα πρέπει να δημιουργήσουμε έναν κατάλογο airflow_home, όπου θα βρίσκονται τα αρχεία DAG και τα πρόσθετα Airflow. Αφού δημιουργήσετε τον κατάλογο, ορίστε τη μεταβλητή περιβάλλοντος AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Το επόμενο βήμα είναι να εκτελέσετε μια εντολή που θα δημιουργήσει και θα προετοιμάσει μια βάση δεδομένων ροής δεδομένων στο SQLite:

(venv) $ airflow initdb

Η βάση δεδομένων θα δημιουργηθεί στο airflow.db Προκαθορισμένο.

Ας ελέγξουμε αν έχει εγκατασταθεί η ροή αέρα:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Εάν η εντολή λειτούργησε, τότε το Airflow δημιούργησε το δικό του αρχείο διαμόρφωσης airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Η ροή αέρα έχει μια διεπαφή ιστού. Μπορεί να ξεκινήσει εκτελώντας την εντολή:

(venv) $ airflow webserver --port 8081

Τώρα μπορείτε να πατήσετε τη διεπαφή ιστού σε ένα πρόγραμμα περιήγησης στη θύρα 8081 στον κεντρικό υπολογιστή όπου εκτελούσε το Airflow, για παράδειγμα: <hostname:8081>.

2. Εργασία με το Experimental API

Σε αυτό το σημείο, η ροή αέρα έχει διαμορφωθεί και είναι έτοιμη να ξεκινήσει. Ωστόσο, πρέπει επίσης να εκτελέσουμε το Experimental API. Τα πούλια μας είναι γραμμένα σε Python, επομένως όλα τα αιτήματα θα βρίσκονται σε αυτό χρησιμοποιώντας τη βιβλιοθήκη requests.

Στην πραγματικότητα, το API λειτουργεί ήδη για απλά αιτήματα. Για παράδειγμα, αυτό το αίτημα σάς επιτρέπει να δοκιμάσετε τη λειτουργία του:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Εάν λάβετε ένα τέτοιο μήνυμα ως απάντηση, σημαίνει ότι όλα λειτουργούν.

Ωστόσο, όταν θέλουμε να ενεργοποιήσουμε ένα DAG, βρισκόμαστε αντιμέτωποι με το γεγονός ότι αυτό το είδος αιτήματος δεν μπορεί να γίνει χωρίς έλεγχο ταυτότητας.

Για να το κάνετε αυτό, θα χρειαστεί να κάνετε μια σειρά ακόμη βημάτων.

Πρώτα, πρέπει να προσθέσετε αυτό στη διαμόρφωση:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Στη συνέχεια, πρέπει να δημιουργήσετε τον χρήστη σας με δικαιώματα διαχειριστή:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Στη συνέχεια, πρέπει να δημιουργήσετε έναν χρήστη με κανονικά δικαιώματα στον οποίο θα επιτρέπεται να ενεργοποιεί το DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Τώρα όλα είναι έτοιμα.

3. Εκκινήστε ένα αίτημα POST

Το ίδιο το αίτημα POST θα μοιάζει με αυτό:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Το αίτημα διεκπεραιώθηκε με επιτυχία.

Αντίστοιχα, δίνουμε στη συνέχεια στο DAG λίγο χρόνο για να επεξεργαστεί και να υποβάλει αίτημα στον πίνακα ClickHouse, προσπαθώντας να συλλάβει το πακέτο δεδομένων ελέγχου.

Ο έλεγχος ολοκληρώθηκε.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο