Ahoana ny fomba fanaovana trigger DAG amin'ny Airflow amin'ny alΓ lan'ny API Experimental

Eo am-panomanana ny fandaharam-panabeazanay dia sendra olana tsindraindray izahay eo amin'ny lafiny fiasana amin'ny fitaovana sasany. Ary amin'izao fotoana izao rehefa mihaona amin'izy ireo isika, dia tsy ampy foana ny antontan-taratasy sy lahatsoratra hanampy amin'ny fiatrehana ity olana ity.

Toy izany, ohatra, tamin'ny 2015, ary nampiasa ny kluster Hadoop miaraka amin'i Spark ho an'ny mpampiasa 35 miaraka amin'ny programa Big Data Specialist izahay. Tsy mazava ny fomba hanomanana izany amin'ny tranga mpampiasa toy izany amin'ny fampiasana YARN. Vokatr'izany, rehefa nieritreritra sy nandeha irery ny lalana izy ireo dia nanao izany lahatsoratra tao amin'ny HabrΓ© ary natao koa Moscow Spark Meetup.

prehistory

Amin'ity indray mitoraka ity dia hiresaka momba ny programa hafa isika - Data Engineer. Ao anatin'izany, ny mpandray anjara dia manorina karazana maritrano roa: lambda sy kappa. Ary amin'ny maritrano lamdba, Airflow dia ampiasaina ho ampahany amin'ny fanodinana batch mba hamindrana logs avy amin'ny HDFS mankany ClickHouse.

Amin'ny ankapobeny dia tsara ny zava-drehetra. Avelao izy ireo hanamboatra ny fantsona. Na izany aza, misy ny "saingy": ny fandaharan'asantsika rehetra dia mandroso ara-teknolojia amin'ny lafiny fizotry ny fianarana. Mba hanamarinana ny laboratoara dia mampiasa checker mandeha ho azy izahay: mila mandeha any amin'ny kaontiny manokana ny mpandray anjara, tsindrio ny bokotra "Check", ary rehefa afaka kelikely dia mahita karazana fanehoan-kevitra lava momba ny zavatra nataony izy. Ary amin'io fotoana io no manomboka manatona ny olantsika.

Ny fanaraha-maso ity laboratoara ity dia toy izao manaraka izao: mandefa fonosana angon-drakitra fanaraha-maso any amin'ny Kafka an'ny mpandray anjara izahay, avy eo i Gobblin dia mamindra ity fonosana data ity amin'ny HDFS, avy eo ny Airflow dia maka ity fonosana data ity ary mametraka izany ao amin'ny ClickHouse. Ny hafetsena dia ny Airflow dia tsy voatery manao izany amin'ny fotoana tena izy, manao izany amin'ny fandaharam-potoana: indray mandeha isaky ny 15 minitra dia maka rakitra marobe ary mampakatra azy ireo.

Hita fa mila manetsika samirery ny DAG-n'izy ireo isika noho ny fangatahantsika raha toa ka mandeha eto sy ankehitriny ny mpitsikilo. Googling, hitanay fa ho an'ny dikan-teny farany amin'ny Airflow dia misy ilay antsoina hoe API andrana. Ny teny experimental, mazava ho azy, toa mampatahotra, fa inona no hatao ... Miala tampoka.

Manaraka, holazainay ny lalana manontolo: manomboka amin'ny fametrahana Airflow ka hatramin'ny famoronana fangatahana POST izay miteraka DAG amin'ny fampiasana ny API Experimental. Hiara-hiasa amin'ny Ubuntu 16.04 izahay.

1. Fametrahana Airflow

Andeha hojerentsika fa manana Python 3 sy virtualenv isika.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Raha tsy misy ny iray amin'ireo dia apetraho izany.

Andeha isika hamorona lahatahiry izay hitohizantsika amin'ny Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Mametraka Airflow:

(venv) $ pip install airflow

Dikan-teny niasanay: 1.10.

Ankehitriny dia mila mamorona lahatahiry isika airflow_home, izay misy ny rakitra DAG sy ny plugins Airflow. Aorian'ny famoronana ny lahatahiry dia apetraho ny fari-piainan'ny tontolo iainana AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Ny dingana manaraka dia ny fampandehanana ny baiko izay hamorona sy hanombohana ny angona dataflow ao amin'ny SQLite:

(venv) $ airflow initdb

Ny angon-drakitra dia hoforonina ao airflow.db default.

Jereo raha napetraka ny Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Raha niasa ny baiko, dia namorona ny rakitra fikirakirana azy manokana ny Airflow airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Ny Airflow dia manana interface tsara amin'ny Internet. Azo atomboka amin'ny alalan'ny fampandehanana ny baiko:

(venv) $ airflow webserver --port 8081

Azonao atao izao ny miditra amin'ny navigateur amin'ny seranan-tsambo 8081 amin'ny mpampiantrano izay nandehanan'ny Airflow, toy izao: <hostname:8081>.

2. Miasa miaraka amin'ny API Experimental

Amin'ity Airflow ity dia namboarina ary vonona ny handeha. Na izany aza, mila mampiasa ny Experimental API ihany koa isika. Voasoratra amin'ny Python ny mpitsikilo anay, noho izany dia ho eo amin'izany ny fangatahana rehetra amin'ny fampiasana ny tranomboky requests.

Raha ny marina dia efa miasa amin'ny fangatahana tsotra ny API. Ohatra, ny fangatahana toy izany dia ahafahanao manandrana ny asany:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Raha naharay hafatra toy izany ianao ho valin'izany dia midika izany fa mandeha ny zava-drehetra.

Na izany aza, rehefa te-hamoaka DAG isika dia miatrika ny zava-misy fa tsy azo atao ny fangatahana toy izany raha tsy misy ny fanamarinana.

Mba hanaovana izany dia mila manao hetsika maromaro ianao.

Voalohany, mila ampidirinao amin'ny config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Avy eo, mila mamorona mpampiasa anao ianao miaraka amin'ny zon'ny admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Manaraka, mila mamorona mpampiasa manana zo mahazatra ianao izay avela hanao trigger DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ankehitriny dia vonona ny zava-drehetra.

3. Famoahana fangatahana POST

Ny fangatahana POST dia ho toy izao:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Vita soa aman-tsara ny fangatahana.

Araka izany, dia omenay fotoana kely ny DAG mba hikarakarana sy hanao fangatahana amin'ny latabatra ClickHouse, manandrana misambotra ny fonosana data fanaraha-maso.

Vita ny fanamarinana.

Source: www.habr.com

Add a comment