เบงเบดเบ—เบตเบเบฒเบ™เบชเป‰เบฒเบ‡เบœเบปเบ™เบเบฐเบ—เบปเบšเบ•เปเปˆ DAG เปƒเบ™ Airflow เป‚เบ”เบเปƒเบŠเป‰ API เบ—เบปเบ”เบฅเบญเบ‡

เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบเบฒเบ™เบเบฐเบเบฝเบกเป‚เบ„เบ‡เบเบฒเบ™เบเบฒเบ™เบชเบถเบเบชเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบžเบงเบเป€เบฎเบปเบฒเปเบ•เปˆเบฅเบฐเป„เบฅเบเบฐเบžเบปเบšเบ„เบงเบฒเบกเบซเบเบธเป‰เบ‡เบเบฒเบเปƒเบ™เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบšเป€เบ„เบทเปˆเบญเบ‡เบกเบทเบชเบฐเป€เบžเบฒเบฐเปƒเบ”เบซเบ™เบถเปˆเบ‡. เปเบฅเบฐเปƒเบ™เป€เบงเบฅเบฒเบ™เบตเป‰เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบžเบปเบšเบžเบงเบเป€เบ‚เบปเบฒ, เบšเปเปˆเบกเบตเป€เบญเบเบฐเบชเบฒเบ™เปเบฅเบฐเบšเบปเบ”เบ„เบงเบฒเบกเบžเบฝเบ‡เบžเปเบ—เบตเปˆเบˆเบฐเบŠเปˆเบงเบเปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบฎเบฑเบšเบกเบทเบเบฑเบšเบšเบฑเบ™เบซเบฒเบ™เบตเป‰.

เบ™เบตเป‰เปเบกเปˆเบ™เบเปเบฅเบฐเบ™เบต, เบ•เบปเบงเบขเปˆเบฒเบ‡, เปƒเบ™เบ›เบต 2015, เปเบฅเบฐเปƒเบ™เบฅเบฐเบซเบงเปˆเบฒเบ‡เป‚เบ„เบ‡เบเบฒเบ™ "Big Data Specialist" เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เปƒเบŠเป‰เบเบธเปˆเบก Hadoop เบเบฑเบš Spark เบชเปเบฒเบฅเบฑเบš 35 เบœเบนเป‰เปƒเบŠเป‰เบžเป‰เบญเบกเป†เบเบฑเบ™. เบกเบฑเบ™เบšเปเปˆเบˆเบฐเปเบˆเป‰เบ‡เบเปˆเบฝเบงเบเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เบเบฐเบเบฝเบกเบกเบฑเบ™เบชเปเบฒเบฅเบฑเบšเบเปเบฅเบฐเบ™เบตเบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเป‚เบ”เบเปƒเบŠเป‰ YARN. เปƒเบ™โ€‹เบ—เบตเปˆโ€‹เบชเบธเบ”, เป„เบ”เป‰โ€‹เบ„เบดเบ”โ€‹เบญเบญเบโ€‹เบกเบฑเบ™โ€‹เบญเบญเบโ€‹เปเบฅเบฐโ€‹เป€เบ”เบตเบ™โ€‹เบ—เบฒเบ‡โ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบ™โ€‹เป€เบญเบ‡, เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เป„เบ”เป‰โ€‹เป€เบฎเบฑเบ” เป‚เบžเบ”เปƒเบ™ Habre เปเบฅเบฐเบเบฑเบ‡เบ›เบฐเบ•เบดเบšเบฑเบ”เบขเบนเปˆ เบกเบญเบ”เป‚เบ Spark Meetup.

เบ›เบฐเบงเบฑเบ”เบชเบฒเบ”

เป€เบงเบฅเบฒเบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบงเบปเป‰เบฒเบเปˆเบฝเบงเบเบฑเบšเป‚เบ„เบ‡เบเบฒเบ™เบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™ - เบ™เบฑเบเบงเบดเบชเบฐเบงเบฐเบเบญเบ™เบ‚เปเป‰เบกเบนเบ™. เบœเบนเป‰เป€เบ‚เบปเป‰เบฒเบฎเปˆเบงเบกเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบเปเปˆเบชเป‰เบฒเบ‡เบชเบฐเบ–เบฒเบ›เบฑเบ”เบ•เบฐเบเบฐเบเปเบฒเบชเบญเบ‡เบ›เบฐเป€เบžเบ”เบขเบนเปˆเป€เบ—เบดเบ‡เบกเบฑเบ™: lambda เปเบฅเบฐ kappa. เปเบฅเบฐเปƒเบ™เบชเบฐเบ–เบฒเบ›เบฑเบ”เบ•เบฐเบเบฐเบเปเบฒ lamdba, เป€เบ›เบฑเบ™เบชเปˆเบงเบ™เบซเบ™เบถเปˆเบ‡เบ‚เบญเบ‡เบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡ batch, Airflow เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป€เบžเบทเปˆเบญเป‚เบญเบ™เบšเบฑเบ™เบ—เบถเบเบˆเบฒเบ HDFS เป„เบ› ClickHouse.

เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›เปเบกเปˆเบ™เบ”เบต. เปƒเบซเป‰เบžเบงเบเป€เบ‚เบปเบฒเบชเป‰เบฒเบ‡เบ—เปเปˆเบ‚เบญเบ‡เบ•เบปเบ™เป€เบญเบ‡. เบขเปˆเบฒเบ‡เปƒเบ”เบเปเปˆเบ•เบฒเบก, เบกเบต "เปเบ•เปˆ": เป‚เบ„เบ‡เบเบฒเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ—เบฑเบ‡เบซเบกเบปเบ”เปเบกเปˆเบ™เบเป‰เบฒเบงเบซเบ™เป‰เบฒเบ—เบฒเบ‡เบ”เป‰เบฒเบ™เป€เบ•เบฑเบเป‚เบ™เป‚เบฅเบขเบตเบˆเบฒเบเบ—เบฑเบ”เบชเบฐเบ™เบฐเบ‚เบญเบ‡เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบฎเบฝเบ™เบฎเบนเป‰เบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡. เป€เบžเบทเปˆเบญเบเบงเบ”เป€เบšเบดเปˆเบ‡เบซเป‰เบญเบ‡เบ—เบปเบ”เบฅเบญเบ‡, เบžเบงเบเป€เบฎเบปเบฒเปƒเบŠเป‰เป€เบ„เบทเปˆเบญเบ‡เบเบงเบ”เบญเบฑเบ”เบ•เบฐเป‚เบ™เบกเบฑเบ”: เบœเบนเป‰เป€เบ‚เบปเป‰เบฒเบฎเปˆเบงเบกเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป„เบ”เป‰เป„เบ›เบซเบฒเบšเบฑเบ™เบŠเบตเบชเปˆเบงเบ™เบ•เบปเบงเบ‚เบญเบ‡เบฅเบฒเบง, เบเบปเบ”เบ›เบธเปˆเบก "เบเบงเบ”เป€เบšเบดเปˆเบ‡", เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเป€เบงเบฅเบฒเปƒเบ”เบซเบ™เบถเปˆเบ‡เบฅเบฒเบงเป€เบซเบฑเบ™เบšเบฒเบ‡เบ„เปเบฒเบ•เบดเบŠเบปเบกเป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบเปˆเบฝเบงเบเบฑเบšเบชเบดเปˆเบ‡เบ—เบตเปˆเบฅเบฒเบงเป„เบ”เป‰เป€เบฎเบฑเบ”. เปเบฅเบฐเบกเบฑเบ™เปเบกเปˆเบ™เปƒเบ™เป€เบงเบฅเบฒเบ™เบตเป‰เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป€เบฅเบตเปˆเบกเป€เบ‚เบปเป‰เบฒเบซเบฒเบšเบฑเบ™เบซเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ.

เบเบฒเบ™เบขเบฑเป‰เบ‡เบขเบทเบ™เบ‚เบญเบ‡เบซเป‰เบญเบ‡เบ—เบปเบ”เบฅเบญเบ‡เบ™เบตเป‰เปเบกเปˆเบ™เบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เป€เบŠเบฑเปˆเบ™เบ™เบตเป‰: เบžเบงเบเป€เบฎเบปเบฒเบชเบปเปˆเบ‡เบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ„เบงเบšเบ„เบธเบกเป„เบ›เบซเบฒ Kafka เบ‚เบญเบ‡เบœเบนเป‰เป€เบ‚เบปเป‰เบฒเบฎเปˆเบงเบก, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ Gobblin เป‚เบญเบ™เบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เบ™เบตเป‰เป„เบ›เบซเบฒ HDFS, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ Airflow เป€เบญเบปเบฒเบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เบ™เบตเป‰เปเบฅเบฐเบงเบฒเบ‡เป„เบงเป‰เปƒเบ™ ClickHouse. เป€เบ„เบฑเบ”เบฅเบฑเบšเปเบกเปˆเบ™เบงเปˆเบฒ Airflow เบšเปเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบฎเบฑเบ”เปเบšเบšเบ™เบตเป‰เปƒเบ™เป€เบงเบฅเบฒเบˆเบดเบ‡, เบกเบฑเบ™เป€เบฎเบฑเบ”เบ•เบฒเบกเบ•เบฒเบ•เบฐเบฅเบฒเบ‡: เบ—เบธเบเป† 15 เบ™เบฒเบ—เบตเบกเบฑเบ™เปƒเบŠเป‰เป€เบงเบฅเบฒเบซเบผเบฒเบเป„เบŸเบฅเปŒเปเบฅเบฐเบญเบฑเบšเป‚เบซเบฅเบ”เบžเบงเบเบกเบฑเบ™.

เบกเบฑเบ™เบ›เบฐเบเบปเบ”เบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป„เบ”เป‰เบเบฐเบ•เบธเป‰เบ™ DAG เบ‚เบญเบ‡เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒเป€เบญเบ‡เบ•เบฒเบกเบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เปเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆเบ•เบปเบงเบเบงเบ”เบชเบญเบšเบเปเบฒเบฅเบฑเบ‡เปเบฅเปˆเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰เปเบฅเบฐเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™. เบซเบผเบฑเบ‡เบˆเบฒเบ googling, เบžเบงเบเป€เบฎเบปเบฒเบžเบปเบšเป€เบซเบฑเบ™เบงเปˆเบฒเบชเปเบฒเบฅเบฑเบšเบชเบฐเบšเบฑเบšเบ•เปเปˆเบกเบฒเบ‚เบญเบ‡ Airflow เบกเบตเบญเบฑเบ™เบ—เบตเปˆเป€เบญเบตเป‰เบ™เบงเปˆเบฒ API เบ—เบปเบ”เบฅเบญเบ‡เบ—เบตเปˆเบขเบนเปˆ เบ„เปเบฒเบงเปˆเบฒ experimental, เปเบ™เปˆเบ™เบญเบ™, เบกเบฑเบ™เบŸเบฑเบ‡เปเบฅเป‰เบงเป€เบ›เบฑเบ™เบ•เบฒเบขเป‰เบฒเบ™, เปเบ•เปˆเบˆเบฐเป€เบฎเบฑเบ”เปเบ™เบงเปƒเบ”... เบ—เบฑเบ™เปƒเบ”เบ™เบฑเป‰เบ™เบกเบฑเบ™เบเปเบญเบญเบเป„เบ›.

เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบญเบฐเบ—เบดเบšเบฒเบเป€เบชเบฑเป‰เบ™เบ—เบฒเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”: เบˆเบฒเบเบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡ Airflow เป€เบžเบทเปˆเบญเบชเป‰เบฒเบ‡เบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เป POST เบ—เบตเปˆเบเบฐเบ•เบธเป‰เบ™ DAG เป‚เบ”เบเปƒเบŠเป‰ API เบ—เบปเบ”เบฅเบญเบ‡. เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบš Ubuntu 16.04.

1. เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡เบเบฐเปเบชเบฅเบปเบก

เปƒเบซเป‰เบเบงเบ”เป€เบšเบดเปˆเบ‡เบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบกเบต Python 3 เปเบฅเบฐ virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

เบ–เป‰เบฒเบญเบฑเบ™เปƒเบ”เบญเบฑเบ™เปœเบถเปˆเบ‡เบ‚เบฒเบ”เบซเบฒเบเป„เบ›, เบˆเบฒเบเบ™เบฑเป‰เบ™เปƒเบซเป‰เบ•เบดเบ”เบ•เบฑเป‰เบ‡เบกเบฑเบ™.

เบ•เบญเบ™เบ™เบตเป‰เปƒเบซเป‰เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบตเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบชเบทเบšเบ•เปเปˆเป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบš Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

เบ•เบดเบ”เบ•เบฑเป‰เบ‡ Airflow:

(venv) $ pip install airflow

เบชเบฐเบšเบฑเบšเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบงเบฝเบเบเปˆเบฝเบงเบเบฑเบš: 1.10.

เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบชเป‰เบฒเบ‡เป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบต airflow_home, เบšเปˆเบญเบ™เบ—เบตเปˆเป„เบŸเบฅเปŒ DAG เปเบฅเบฐเบ›เบฅเบฑเบเบญเบดเบ™ Airflow เบˆเบฐเบ•เบฑเป‰เบ‡เบขเบนเปˆ. เบซเบผเบฑเบ‡เบˆเบฒเบเบเบฒเบ™เบชเป‰เบฒเบ‡เป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบต, เบเปเบฒเบ™เบปเบ”เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบ•เปเปˆเป„เบ›เปเบกเปˆเบ™เป€เบžเบทเปˆเบญเบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡เบ—เบตเปˆเบˆเบฐเบชเป‰เบฒเบ‡เปเบฅเบฐเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ dataflow เปƒเบ™ SQLite:

(venv) $ airflow initdb

เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฐเบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™เปƒเบ™ airflow.db เบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™.

เปƒเบซเป‰เบเบงเบ”เป€เบšเบดเปˆเบ‡เบงเปˆเบฒ Airflow เบ–เบทเบเบ•เบดเบ”เบ•เบฑเป‰เบ‡:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

เบ–เป‰เบฒเบ„เปเบฒเบชเบฑเปˆเบ‡เป€เบฎเบฑเบ”เบงเบฝเบ, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ Airflow เบชเป‰เบฒเบ‡เป„เบŸเบฅเปŒเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ‚เบญเบ‡เบ•เบปเบ™เป€เบญเบ‡ airflow.cfg ะฒ AIRFLOW_HOME:

$ tree
.
โ”œโ”€โ”€ airflow.cfg
โ””โ”€โ”€ unittests.cfg

Airflow เบกเบตเบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเป€เบงเบฑเบšเป„เบŠเบ•เปŒ. เบกเบฑเบ™เบชเบฒเบกเบฒเบ”เบ–เบทเบเป€เบ›เบตเบ”เบ•เบปเบงเป‚เบ”เบเบเบฒเบ™เปเบฅเปˆเบ™เบ„เปเบฒเบชเบฑเปˆเบ‡:

(venv) $ airflow webserver --port 8081

เบ•เบญเบ™เบ™เบตเป‰เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ•เบตเบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเป€เบงเบฑเบšเปƒเบ™เบšเบฃเบฒเบงเป€เบŠเบตเป€เบ—เบดเบ‡เบžเบญเบ” 8081 เป€เบ—เบดเบ‡เป‚เบฎเบชเบ—เบตเปˆ Airflow เปเบฅเปˆเบ™เบขเบนเปˆ, เบ•เบปเบงเบขเปˆเบฒเบ‡: <hostname:8081>.

2. เป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบš API เบ—เบปเบ”เบฅเบญเบ‡

เปƒเบ™เบˆเบธเบ”เบ™เบตเป‰, Airflow เป„เบ”เป‰เบ–เบทเบเบ•เบฑเป‰เบ‡เบ„เปˆเบฒเปเบฅเบฐเบžเป‰เบญเบกเบ—เบตเปˆเบˆเบฐเป„เบ›. เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™ API เบ—เบปเบ”เบฅเบญเบ‡. checkers เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ–เบทเบเบ‚เบฝเบ™เป„เบงเป‰เปƒเบ™ Python, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบ—เบฑเบ‡เบซเบกเบปเบ”เบˆเบฐเบขเบนเปˆเปƒเบ™เบกเบฑเบ™เป‚เบ”เบเปƒเบŠเป‰เบซเป‰เบญเบ‡เบชเบฐเบซเบกเบธเบ” requests.

เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡, API เปเบฅเป‰เบงเป€เบฎเบฑเบ”เบงเบฝเบเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบ‡เปˆเบฒเบเบ”เบฒเบ. เบ•เบปเบงเบขเปˆเบฒเบ‡, เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบ™เบตเป‰เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ—เบปเบ”เบชเบญเบšเบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบ‡เบฒเบ™เบ‚เบญเบ‡เบกเบฑเบ™:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #ะฒ ะฝะฐัˆะตะผ ัะปัƒั‡ะฐะต ั‚ะฐะบะพะน, ะฐ ะฟะพ ะดะตั„ะพะปั‚ัƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

เบ–เป‰เบฒเบ—เปˆเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบ„เบงเบฒเบกเบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเปƒเบ™เบเบฒเบ™เบ•เบญเบšเบชเบฐเบซเบ™เบญเบ‡, เบกเบฑเบ™เบซเบกเบฒเบเบ„เบงเบฒเบกเบงเปˆเบฒเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบฎเบฑเบ”เบงเบฝเบ.

เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เป€เบกเบทเปˆเบญเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™เบเบฐเบ•เบธเป‰เบ™ DAG, เบžเบงเบเป€เบฎเบปเบฒเบ›เบฐเป€เบŠเบตเบ™เบเบฑเบšเบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบตเปˆเบงเปˆเบฒเบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เปเบ›เบฐเป€เบžเบ”เบ™เบตเป‰เบšเปเปˆเบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เป„เบ”เป‰เป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เบเบงเบ”เบชเบญเบšเบ„เบงเบฒเบกเบ–เบทเบเบ•เป‰เบญเบ‡.

เป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”เบชเบดเปˆเบ‡เบ™เบตเป‰, เบ—เปˆเบฒเบ™เบˆเบฐเบ•เป‰เบญเบ‡เป€เบฎเบฑเบ”เบซเบผเบฒเบเบ‚เบฑเป‰เบ™เบ•เบญเบ™.

เบเปˆเบญเบ™เบญเบทเปˆเบ™ เปเบปเบ”, เบ—เปˆเบฒเบ™ เบˆเบณ เป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบžเบตเปˆเบกเบกเบฑเบ™เป€เบ‚เบปเป‰เบฒเปƒเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบชเป‰เบฒเบ‡เบœเบนเป‰เปƒเบŠเป‰เบ‚เบญเบ‡เบ—เปˆเบฒเบ™เบ”เป‰เบงเบเบชเบดเบ”เบ—เบด admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

เบ•เปเปˆเป„เบ›, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบชเป‰เบฒเบ‡เบœเบนเป‰เปƒเบŠเป‰เบ—เบตเปˆเบกเบตเบชเบดเบ”เบ—เบดเบ›เบปเบเบเบฐเบ•เบดเบœเบนเป‰เบ—เบตเปˆเบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบเบฐเบ•เบธเป‰เบ™ DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปเบกเปˆเบ™เบเบฝเบกเบžเป‰เบญเบก.

3. เป€เบ›เบตเบ”เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เป POST

เบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เป POST เบ•เบปเบงเบ‚เบญเบ‡เบกเบฑเบ™เป€เบญเบ‡เบˆเบฐเบกเบตเบฅเบฑเบเบชเบฐเบ™เบฐเบ™เบตเป‰:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเป„เบ”เป‰เบ–เบทเบเบ”เปเบฒเป€เบ™เบตเบ™เบขเปˆเบฒเบ‡เบชเปเบฒเป€เบฅเบฑเบ”เบœเบปเบ™.

เบ•เบฒเบกเบ™เบฑเป‰เบ™เปเบฅเป‰เบง, เบžเบงเบเป€เบฎเบปเบฒเปƒเบซเป‰เป€เบงเบฅเบฒ DAG เป€เบžเบทเปˆเบญเบ›เบฐเบกเบงเบ™เบœเบปเบ™เปเบฅเบฐเป€เบฎเบฑเบ”เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเปƒเบซเป‰เบ•เบฒเบ•เบฐเบฅเบฒเบ‡ ClickHouse, เบžเบฐเบเบฒเบเบฒเบกเบˆเบฑเบšเบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เบ„เบงเบšเบ„เบธเบก.

เบเบงเบ”เบชเบญเบšเปเบฅเป‰เบง.

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™