Momwe mungapangire choyambitsa cha DAG mu Airflow pogwiritsa ntchito Experimental API

Pokonzekera mapulogalamu athu a maphunziro, nthawi ndi nthawi timakumana ndi zovuta pakugwiritsa ntchito zida zina. Ndipo panthawi yomwe timakumana nawo, nthawi zonse palibe zolemba ndi zolemba zokwanira zomwe zingathandize kuthana ndi vutoli.

Kotero zinali, mwachitsanzo, mu 2015, ndipo tinagwiritsa ntchito gulu la Hadoop ndi Spark kwa ogwiritsa ntchito 35 panthawi imodzi pa pulogalamu ya Big Data Specialist. Sizinadziwike momwe angakonzekerere vuto la ogwiritsa ntchito pogwiritsa ntchito YARN. Chotsatira chake, atalingalira ndikuyenda njira pawokha, adatero positi pa Habre komanso anachita Msonkhano wa Moscow Spark.

prehistory

Nthawi ino tikambirana za pulogalamu ina - Katswiri wa Zolemba. Pa izo, otenga nawo mbali amamanga mitundu iwiri ya zomangamanga: lambda ndi kappa. Ndipo muzomangamanga za lamdba, Airflow imagwiritsidwa ntchito ngati gawo la batch processing kusamutsa mitengo kuchokera ku HDFS kupita ku ClickHouse.

Chilichonse chimakhala chabwino. Asiyeni amange mapaipi awo. Komabe, pali "koma": mapulogalamu athu onse amapita patsogolo mwaukadaulo malinga ndi njira yophunzirira yokha. Kuti tiwone labu, timagwiritsa ntchito ma checkers okha: wophunzirayo ayenera kupita ku akaunti yake, dinani batani la "Chongani", ndipo patapita kanthawi amawona ndemanga zowonjezera pazomwe adachita. Ndipo ndipamene timayamba kuyandikira vuto lathu.

Kuwona labu iyi kumakonzedwa motere: timatumiza paketi ya data ku Kafka ya otenga nawo mbali, kenako Gobblin imasamutsa paketi ya data iyi ku HDFS, kenako Airflow imatenga paketi ya data iyi ndikuyiyika mu ClickHouse. Chinyengo ndichakuti Airflow sayenera kuchita izi munthawi yeniyeni, imachita nthawi yake: kamodzi mphindi 15 zilizonse zimatengera mulu wa mafayilo ndikuzikweza.

Zikuwonekeratu kuti tiyenera mwanjira ina kuyambitsa DAG yawo tokha pazopempha zathu pomwe chekeni chikuyenda pano ndi pano. Googling, tidapeza kuti m'mitundu ina ya Airflow pali zomwe zimatchedwa Experimental API. Mawu experimental, ndithudi, zikumveka zowopsya, koma choti muchite ... Zimachoka mwadzidzidzi.

Kenako, tifotokoza njira yonse: kuyambira kukhazikitsa Airflow mpaka kupanga POST pempho lomwe limayambitsa DAG pogwiritsa ntchito Experimental API. Tidzagwira ntchito ndi Ubuntu 16.04.

1. Kuyika kwa Airflow

Tiyeni tiwone kuti tili ndi Python 3 ndi virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ngati imodzi mwa izi ikusowa, ikani.

Tsopano tiyeni tipange chikwatu momwe tipitilize kugwira ntchito ndi Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Ikani Airflow:

(venv) $ pip install airflow

Mtundu womwe tidagwiritsapo ntchito: 1.10.

Tsopano tiyenera kupanga chikwatu airflow_home, kumene mafayilo a DAG ndi mapulagini a Airflow adzakhalapo. Pambuyo popanga chikwatu, ikani kusintha kwa chilengedwe AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Chotsatira ndikuyendetsa lamulo lomwe lipanga ndikuyambitsanso database ya dataflow mu SQLite:

(venv) $ airflow initdb

Database idzapangidwa mkati airflow.db kusakhulupirika.

Onani ngati Airflow yayikidwa:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ngati lamulolo linagwira ntchito, ndiye Airflow idapanga fayilo yake yosinthira airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Airflow ili ndi mawonekedwe apaintaneti. Ikhoza kukhazikitsidwa poyendetsa lamulo:

(venv) $ airflow webserver --port 8081

Tsopano mutha kulumikiza mawonekedwe a intaneti mu msakatuli pa doko 8081 pa wolandila komwe Airflow inali kuyenda, motere: <hostname:8081>.

2. Kugwira ntchito ndi Experimental API

Pa Airflow iyi imakonzedwa ndikukonzekera kupita. Komabe, tiyeneranso kuyendetsa Experimental API. Ma checkers athu amalembedwa mu Python, kotero zopempha zonse zidzakhalapo pogwiritsa ntchito laibulale requests.

Kwenikweni API ikugwira ntchito kale pazopempha zosavuta. Mwachitsanzo, pempho lotere limakupatsani mwayi woyesa ntchito yake:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ngati munalandira uthenga woterewu poyankha, zikutanthauza kuti zonse zikuyenda.

Komabe, tikafuna kuyambitsa DAG, timathamangira kuti pempho lamtunduwu silingachitike popanda kutsimikizika.

Kuti muchite izi, muyenera kuchita zingapo.

Choyamba, muyenera kuwonjezera izi ku config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Kenako, muyenera kupanga wosuta wanu ndi ufulu wa admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Chotsatira, muyenera kupanga wogwiritsa ntchito yemwe ali ndi ufulu wabwinobwino yemwe angaloledwe kupanga choyambitsa cha DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tsopano zonse zakonzeka.

3. Kukhazikitsa pempho la POST

Pempho la POST palokha liwoneka motere:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Pempho lakonzedwa bwino.

Chifukwa chake, ndiye timapatsa DAG nthawi yoti achite ndikupempha ku tebulo la ClickHouse, kuyesera kugwira paketi ya data yolamulira.

Kutsimikizira kwatha.

Source: www.habr.com

Kuwonjezera ndemanga