DAG-laukaisimen tekeminen Airflow:ssa Experimental API:n avulla

Valmistellessamme koulutusohjelmiamme kohtaamme ajoittain vaikeuksia työskennellä tiettyjen työkalujen kanssa. Ja sillä hetkellä, kun kohtaamme ne, ei aina ole tarpeeksi dokumentaatiota ja artikkeleita, jotka auttaisivat meitä selviytymään tästä ongelmasta.

Näin oli esimerkiksi vuonna 2015, ja Big Data Specialist -ohjelman aikana käytimme Hadoop-klusteria Sparkilla 35 samanaikaiselle käyttäjälle. Ei ollut selvää, kuinka se valmistetaan tällaiseen käyttötapaukseen LANKAa käyttämällä. Lopulta me teimme sen, kun tajusimme sen ja kävelimme polun itse postaus Habressa ja esiintyi myös klo Moscow Spark Meetup.

esihistoria

Tällä kertaa puhumme eri ohjelmasta - Data Engineer. Osallistujamme rakentavat sille kahdenlaista arkkitehtuuria: lambda ja kappa. Ja lamdba-arkkitehtuurissa osana eräkäsittelyä Airflowta käytetään siirtämään lokit HDFS:stä ClickHouseen.

Kaikki on yleensä hyvin. Anna heidän rakentaa omat putkistonsa. On kuitenkin olemassa "mutta": kaikki ohjelmamme ovat teknisesti edistyksellisiä itse oppimisprosessin näkökulmasta. Laboratorion tarkistamiseen käytämme automaattisia tarkistuksia: osallistujan täytyy mennä henkilökohtaiselle tililleen, klikata "Tarkista" -painiketta ja jonkin ajan kuluttua hän näkee jonkinlaisen laajennetun palautteen tekemissään. Ja juuri tällä hetkellä alamme lähestyä ongelmaamme.

Tämän laboratorion varmennus on rakenteeltaan seuraava: lähetämme ohjaustietopaketin osallistujan Kafkaan, sitten Gobblin siirtää tämän datapaketin HDFS:ään, sitten Airflow ottaa tämän datapaketin ja laittaa sen ClickHouseen. Temppu on siinä, että Airflown ei tarvitse tehdä tätä reaaliajassa, se tekee sen aikataulun mukaan: 15 minuutin välein se ottaa joukon tiedostoja ja lataa ne.

Osoittautuu, että meidän täytyy jotenkin laukaista heidän DAG:nsa pyynnöstämme, kun tarkistus on käynnissä tässä ja nyt. Googlaamisen jälkeen saimme selville, että Airflown myöhemmille versioille on olemassa ns Kokeellinen API. Sana experimental, tietysti se kuulostaa pelottavalta, mutta mitä tehdä... Yhtäkkiä se lähtee liikkeelle.

Seuraavaksi kuvaamme koko polun: Airflow-asennuksesta POST-pyynnön luomiseen, joka käynnistää DAG:n kokeellisen API:n avulla. Työskentelemme Ubuntu 16.04:n kanssa.

1. Ilmavirran asennus

Tarkistamme, että meillä on Python 3 ja virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jos jokin näistä puuttuu, asenna se.

Luodaan nyt hakemisto, jossa jatkamme työskentelyä Airflown kanssa.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Asenna ilmavirta:

(venv) $ pip install airflow

Työskentelymme versio: 1.10.

Nyt meidän on luotava hakemisto airflow_home, jossa DAG-tiedostot ja Airflow-laajennukset sijaitsevat. Kun olet luonut hakemiston, aseta ympäristömuuttuja AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Seuraava vaihe on suorittaa komento, joka luo ja alustaa tietokulkutietokannan SQLitessä:

(venv) $ airflow initdb

Tietokanta luodaan vuonna airflow.db oletusarvo.

Tarkistamme, onko Airflow asennettu:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jos komento toimi, Airflow loi oman määritystiedoston airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflowlla on verkkokäyttöliittymä. Se voidaan käynnistää suorittamalla komento:

(venv) $ airflow webserver --port 8081

Voit nyt napsauttaa verkkokäyttöliittymää selaimessa portissa 8081 koneessa, jossa Airflow oli käynnissä, esimerkiksi: <hostname:8081>.

2. Työskentely Experimental API:n kanssa

Tässä vaiheessa Airflow on määritetty ja käyttövalmis. Meidän on kuitenkin suoritettava myös Experimental API. Tarkistamme ovat kirjoitettu Pythonilla, joten jatkossa kaikki pyynnöt tulevat siihen kirjaston avulla requests.

Itse asiassa API toimii jo yksinkertaisissa pyynnöissä. Esimerkiksi tämän pyynnön avulla voit testata sen toimintaa:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jos saat vastauksena tällaisen viestin, se tarkoittaa, että kaikki toimii.

Kuitenkin, kun haluamme laukaista DAG:n, joudumme kohtaamaan sen tosiasian, että tämän tyyppistä pyyntöä ei voida tehdä ilman todennusta.

Tätä varten sinun on suoritettava useita muita vaiheita.

Ensin sinun on lisättävä tämä konfiguraatioon:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Sitten sinun on luotava käyttäjä, jolla on järjestelmänvalvojan oikeudet:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Seuraavaksi sinun on luotava käyttäjä, jolla on normaalit oikeudet ja joka voi käynnistää DAG:n.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nyt kaikki on valmista.

3. Käynnistä POST-pyyntö

Itse POST-pyyntö näyttää tältä:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Pyyntö käsiteltiin onnistuneesti.

Sen mukaisesti annamme DAG:lle jonkin aikaa käsitellä ja tehdä pyyntö ClickHouse-taulukkoon yrittäen saada ohjaustietopaketin kiinni.

Tarkistus suoritettu.

Lähde: will.com

Lisää kommentti