Kuidas käivitada DAG-i käivitaja Airflow'is, kasutades eksperimentaalset API-d

Haridusprogramme koostades puutume perioodiliselt kokku raskustega teatud tööriistadega töötamisel. Ja hetkel, kui me nendega kokku puutume, ei ole alati piisavalt dokumentatsiooni ja artikleid, mis aitaksid meil selle probleemiga toime tulla.

Nii oli see näiteks 2015. aastal ning programmi “Big Data Specialist” käigus kasutasime Sparkiga Hadoopi klastrit 35 samaaegsele kasutajale. Ei olnud selge, kuidas seda LÕNGA abil selliseks kasutusjuhtumiks ette valmistada. Lõpuks, olles selle selgeks saanud ja omal käel raja läbi käinud, tegimegi seda postitus Habré kohta ja esines ka kl Moskva sädeme kohtumine.

eelajalugu

Seekord räägime teistsugusest programmist - Andmete insener. Meie osalejad ehitavad sellele kahte tüüpi arhitektuuri: lambda ja kappa. Ja lamdba arhitektuuris kasutatakse paketttöötluse osana Airflow'i logide edastamiseks HDFS-ist ClickHouse'i.

Üldiselt on kõik hästi. Las nad ehitavad ise torujuhtmeid. Siiski on üks "aga": kõik meie programmid on õppeprotsessi enda seisukohast tehnoloogiliselt arenenud. Labori kontrollimiseks kasutame automaatseid kontrolle: osaleja peab minema oma isiklikule kontole, klõpsama nuppu "Kontrolli" ja mõne aja pärast näeb ta tehtu kohta mingit laiendatud tagasisidet. Ja just sel hetkel hakkame oma probleemile lähenema.

Selle labori kontrollimine on üles ehitatud järgmiselt: saadame kontrollandmepaketi osaleja Kafkale, seejärel edastab Gobblin selle andmepaketi HDFS-i, seejärel võtab Airflow selle andmepaketi ja paneb selle ClickHouse'i. Nipp seisneb selles, et Airflow ei pea seda tegema reaalajas, ta teeb seda ajakava järgi: iga 15 minuti järel võtab see hulga faile ja laadib need üles.

Selgub, et me peame nende DAG-i meie soovil ise kuidagi käivitama, kui kontrollija töötab siin ja praegu. Guugeldades saime teada, et Airflow hilisemate versioonide jaoks on nn Eksperimentaalne API. Sõna experimental, muidugi, see kõlab hirmutavalt, aga mis teha... Järsku võtab õhku.

Järgmisena kirjeldame kogu teed: alates Airflow installimisest kuni POST-päringu genereerimiseni, mis käivitab DAG-i eksperimentaalse API abil. Töötame Ubuntu 16.04-ga.

1. Õhuvoolu paigaldamine

Kontrollime, kas meil on Python 3 ja virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Kui mõni neist on puudu, installige see.

Nüüd loome kataloogi, kus jätkame Airflowga töötamist.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Paigaldage õhuvool:

(venv) $ pip install airflow

Versioon, mille kallal töötasime: 1.10.

Nüüd peame looma kataloogi airflow_home, kus asuvad DAG-failid ja Airflow-pluginad. Pärast kataloogi loomist määrake keskkonnamuutuja AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Järgmine samm on käivitada käsk, mis loob ja lähtestab SQLite'is andmevoo andmebaasi:

(venv) $ airflow initdb

Andmebaas luuakse aastal airflow.db vaikimisi.

Kontrollime, kas Airflow on installitud:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Kui käsk töötas, lõi Airflow oma konfiguratsioonifaili airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow'l on veebiliides. Selle saab käivitada, käivitades käsu:

(venv) $ airflow webserver --port 8081

Nüüd saate vajutada veebiliidest brauseris pordis 8081 hostis, kus Airflow töötas, näiteks: <hostname:8081>.

2. Eksperimentaalse API-ga töötamine

Sel hetkel on Airflow konfigureeritud ja töövalmis. Siiski peame käivitama ka eksperimentaalse API. Meie kabe on kirjutatud Pythonis, nii et edaspidi on kõik päringud selles raamatukogu kasutades requests.

Tegelikult töötab API juba lihtsate päringute puhul. Näiteks võimaldab see päring testida selle toimimist:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Kui saate vastuseks sellise teate, tähendab see, et kõik töötab.

Kui tahame aga DAG-i käivitada, seisame silmitsi tõsiasjaga, et seda tüüpi päringut ei saa teha ilma autentimiseta.

Selleks peate tegema veel mitmeid samme.

Esiteks peate selle konfiguratsiooni lisama:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Seejärel peate looma oma administraatoriõigustega kasutaja:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Järgmiseks peate looma tavaliste õigustega kasutaja, kellel on lubatud DAG käivitada.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nüüd on kõik valmis.

3. Käivitage POST-i päring

POST-i päring ise näeb välja selline:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Taotluse töötlemine õnnestus.

Sellest lähtuvalt anname DAG-le veidi aega töötlemiseks ja ClickHouse'i tabelisse päringu esitamiseks, püüdes kontrolli andmepaketti kinni püüda.

Kontroll lõpetatud.

Allikas: www.habr.com

Lisa kommentaar