Как ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ Ρ‚Ρ€ΠΈΠ³Π³Π΅Ρ€ DAG’Π° Π² Airflow, ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Experimental API

ΠŸΡ€ΠΈ ΠΏΠΎΠ΄Π³ΠΎΡ‚ΠΎΠ²ΠΊΠ΅ Π½Π°ΡˆΠΈΡ… ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Π°Ρ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌ ΠΌΡ‹ пСриодичСски сталкиваСмся со слоТностями с Ρ‚ΠΎΡ‡ΠΊΠΈ зрСния Ρ€Π°Π±ΠΎΡ‚Ρ‹ с Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌΠΈ инструмСнтами. И Π½Π° Ρ‚ΠΎΡ‚ ΠΌΠΎΠΌΠ΅Π½Ρ‚, ΠΊΠΎΠ³Π΄Π° ΠΌΡ‹ с Π½ΠΈΠΌΠΈ сталикваСмся, Π½Π΅ всСгда Π΅ΡΡ‚ΡŒ достаточно Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ ΠΈ статСй, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΏΠΎΠΌΠΎΠ³Π»ΠΈ Π±Ρ‹ с этой ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠΎΠΉ ΡΠΏΡ€Π°Π²ΠΈΡ‚ΡŒΡΡ.

Π’Π°ΠΊ Π±Ρ‹Π»ΠΎ, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, Π² 2015 Π³ΠΎΠ΄Ρƒ ΠΈ ΠΌΡ‹ Π½Π° ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ΅ β€œΠ‘ΠΏΠ΅Ρ†ΠΈΠ°Π»ΠΈΡΡ‚ ΠΏΠΎ большим данным” пользовались Hadoop-кластСром со Spark Π½Π° 35 ΠΎΠ΄Π½ΠΎΠ²Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Π΅ΠΉ. Как Π΅Π³ΠΎ Π³ΠΎΡ‚ΠΎΠ²ΠΈΡ‚ΡŒ ΠΏΠΎΠ΄ Ρ‚Π°ΠΊΠΎΠΉ юзкСйс с использованиСм YARN, Π±Ρ‹Π»ΠΎ нСпонятно. Π’ ΠΈΡ‚ΠΎΠ³Π΅, Ρ€Π°Π·ΠΎΠ±Ρ€Π°Π²ΡˆΠΈΡΡŒ ΠΈ пройдя ΠΏΡƒΡ‚ΡŒ ΡΠ°ΠΌΠΎΡΡ‚ΠΎΡΡ‚Π΅Π»ΡŒΠ½ΠΎ, сдСлали пост Π½Π° Π₯Π°Π±Ρ€Π΅ ΠΈ Π΅Ρ‰Π΅ выступили Π½Π° Moscow Spark Meetup.

ΠŸΡ€Π΅Π΄Ρ‹ΡΡ‚ΠΎΡ€ΠΈΡ

Π’ этот Ρ€Π°Π· Ρ€Π΅Ρ‡ΡŒ ΠΏΠΎΠΉΠ΄Π΅Ρ‚ ΠΎ Π΄Ρ€ΡƒΠ³ΠΎΠΉ ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ΅ – Data Engineer. На Π½Π΅ΠΉ наши участники строят Π΄Π²Π° Ρ‚ΠΈΠΏΠ° Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Ρ‹: lambda ΠΈ kappa. И Π² lamdba-Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Π΅ Π² Ρ€Π°ΠΌΠΊΠ°Ρ… Π±Π°Ρ‚Ρ‡-ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ Airflow для пСрСкладывания Π»ΠΎΠ³ΠΎΠ² ΠΈΠ· HDFS Π² ClickHouse.

ВсС Π² ΠΎΠ±Ρ‰Π΅ΠΌ-Ρ‚ΠΎ Ρ…ΠΎΡ€ΠΎΡˆΠΎ. ΠŸΡƒΡΡ‚ΡŒ строят свои ΠΏΠ°ΠΉΠΏΠ»Π°ΠΉΠ½Ρ‹. Однако, Π΅ΡΡ‚ΡŒ «Π½ΠΎ»: всС наши ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΡ‹ Ρ‚Π΅Ρ…Π½ΠΎΠ»ΠΎΠ³ΠΈΡ‡Π½Ρ‹ с Ρ‚ΠΎΡ‡ΠΊΠΈ зрСния самого процСсса обучСния. Для ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΠΈ Π»Π°Π± ΠΌΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ автоматичСскиС Ρ‡Π΅ΠΊΠ΅Ρ€Ρ‹: участнику Π½ΡƒΠΆΠ½ΠΎ Π·Π°ΠΉΡ‚ΠΈ Π² Π»ΠΈΡ‡Π½Ρ‹ΠΉ ΠΊΠ°Π±ΠΈΠ½Π΅Ρ‚, Π½Π°ΠΆΠ°Ρ‚ΡŒ ΠΊΠ½ΠΎΠΏΠΊΡƒ β€œΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΈΡ‚ΡŒβ€, ΠΈ Ρ‡Π΅Ρ€Π΅Π· ΠΊΠ°ΠΊΠΎΠ΅-Ρ‚ΠΎ врСмя ΠΎΠ½ Π²ΠΈΠ΄ΠΈΡ‚ ΠΊΠ°ΠΊΡƒΡŽ-Ρ‚ΠΎ Ρ€Π°ΡΡˆΠΈΡ€Π΅Π½Π½ΡƒΡŽ ΠΎΠ±Ρ€Π°Ρ‚Π½ΡƒΡŽ связь Π½Π° Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ сдСлал. И ΠΈΠΌΠ΅Π½Π½ΠΎ Π² этот ΠΌΠΎΠΌΠ΅Π½Ρ‚ ΠΌΡ‹ Π½Π°Ρ‡ΠΈΠ½Π°Π΅ΠΌ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΈΡ‚ΡŒ ΠΊ нашСй ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠ΅.

ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ° этой Π»Π°Π±Ρ‹ устроСна Ρ‚Π°ΠΊ: ΠΌΡ‹ посылаСм ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»ΡŒΠ½Ρ‹ΠΉ ΠΏΠ°ΠΊΠ΅Ρ‚ Π΄Π°Π½Π½Ρ‹Ρ… Π² Kafka участника, Π΄Π°Π»Π΅Π΅ Gobblin ΠΏΠ΅Ρ€Π΅ΠΊΠ»Π°Π΄Ρ‹Π²Π°Π΅Ρ‚ этот ΠΏΠ°ΠΊΠ΅Ρ‚ Π΄Π°Π½Π½Ρ‹Ρ… Π½Π° HDFS, Π΄Π°Π»Π΅Π΅ Airflow Π±Π΅Ρ€Π΅Ρ‚ этот ΠΏΠ°ΠΊΠ΅Ρ‚ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈ ΠΊΠ»Π°Π΄Π΅Ρ‚ Π² ClickHouse. Ѐишка Π² Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ Airflow Π½Π΅ Π΄ΠΎΠ»ΠΆΠ΅Π½ это Π΄Π΅Π»Π°Ρ‚ΡŒ Π² Ρ€Π΅Π°Π»-Ρ‚Π°ΠΉΠΌΠ΅, ΠΎΠ½ это Π΄Π΅Π»Π°Π΅Ρ‚ ΠΏΠΎ Ρ€Π°ΡΠΏΠΈΡΠ°Π½ΠΈΡŽ: Ρ€Π°Π· Π² 15 ΠΌΠΈΠ½ΡƒΡ‚ Π±Π΅Ρ€Π΅Ρ‚ ΠΏΠ°Ρ‡ΠΊΡƒ Ρ„Π°ΠΉΠ»ΠΎΠ² ΠΈ Π·Π°ΠΊΠΈΠ΄Ρ‹Π²Π°Π΅Ρ‚.

ΠŸΠΎΠ»ΡƒΡ‡Π°Π΅Ρ‚ΡΡ, Ρ‡Ρ‚ΠΎ Π½Π°ΠΌ Π½ΡƒΠΆΠ½ΠΎ ΠΊΠ°ΠΊ-Ρ‚ΠΎ Ρ‚Ρ€ΠΈΠ³Π³Π΅Ρ€ΠΈΡ‚ΡŒ ΠΈΡ… DAG ΡΠ°ΠΌΠΎΡΡ‚ΠΎΡΡ‚Π΅Π»ΡŒΠ½ΠΎ ΠΏΠΎ Π½Π°ΡˆΠ΅ΠΌΡƒ Ρ‚Ρ€Π΅Π±ΠΎΠ²Π°Π½ΠΈΡŽ Π²ΠΎ врСмя Ρ€Π°Π±ΠΎΡ‚Ρ‹ Ρ‡Π΅ΠΊΠ΅Ρ€Π° здСсь ΠΈ сСйчас. ΠŸΠΎΠ³ΡƒΠ³Π»ΠΈΠ², выяснили, Ρ‡Ρ‚ΠΎ для ΠΏΠΎΠ·Π΄Π½ΠΈΡ… вСрсий Airflow сущСствуСт Ρ‚Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΡ‹ΠΉ Experimental API. Π‘Π»ΠΎΠ²ΠΎ experimental, ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎ, Π·Π²ΡƒΡ‡ΠΈΡ‚ ΠΏΡƒΠ³Π°ΡŽΡ‰Π΅, Π½ΠΎ Ρ‡Ρ‚ΠΎ Π΄Π΅Π»Π°Ρ‚ΡŒβ€¦ Π’Π΄Ρ€ΡƒΠ³ Π²Π·Π»Π΅Ρ‚ΠΈΡ‚.

Π”Π°Π»Π΅Π΅ опишСм вСсь ΠΏΡƒΡ‚ΡŒ: ΠΎΡ‚ установки Airflow Π΄ΠΎ формирования POST-запроса, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Ρ‚Ρ€ΠΈΠ³Π³Π΅Ρ€ΠΈΡ‚ DAG, ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Experimental API. Π Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π±ΡƒΠ΄Π΅ΠΌ с Ubuntu 16.04.

1. Установка Airflow

ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΈΠΌ, Ρ‡Ρ‚ΠΎ Ρƒ нас стоит Python 3 ΠΈ virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Если Ρ‡Π΅Π³ΠΎ-Ρ‚ΠΎ ΠΈΠ· этого Π½Π΅Ρ‚, Ρ‚ΠΎ установитС.

Π’Π΅ΠΏΠ΅Ρ€ΡŒ создадим ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³, Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Π±ΡƒΠ΄Π΅ΠΌ дальшС Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ с Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Установим Airflow:

(venv) $ pip install airflow

ВСрсия, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ ΠΌΡ‹: 1.10.

Π’Π΅ΠΏΠ΅Ρ€ΡŒ Π½Π°ΠΌ Π½ΡƒΠΆΠ½ΠΎ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³ airflow_home, Π³Π΄Π΅ Π±ΡƒΠ΄ΡƒΡ‚ Ρ€Π°ΡΠΏΠΎΠ»Π°Π³Π°Ρ‚ΡŒΡΡ DAG-Ρ„Π°ΠΉΠ»Ρ‹ ΠΈ ΠΏΠ»Π°Π³ΠΈΠ½Ρ‹ Airflow. ПослС создания ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³Π° установим ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ срСды AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Π‘Π»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΉ шаг β€” Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΠΌ ΠΊΠΎΠΌΠ°Π½Π΄Ρƒ, которая Π±ΡƒΠ΄Π΅Ρ‚ ΡΠΎΠ·Π΄Π°Π²Π°Ρ‚ΡŒ ΠΈ ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π±Π°Π·Ρƒ Π΄Π°Π½Π½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠ° Π΄Π°Π½Π½Ρ‹Ρ… Π² SQLite:

(venv) $ airflow initdb

Π‘Π°Π·Π° Π΄Π°Π½Π½Ρ‹Ρ… Π±ΡƒΠ΄Π΅Ρ‚ создана Π² airflow.db ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ.

ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΈΠΌ установился Π»ΠΈ Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Если ΠΊΠΎΠΌΠ°Π½Π΄Π° ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π»Π°, Ρ‚ΠΎ Airflow создал свой ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΎΠ½Π½Ρ‹ΠΉ Ρ„Π°ΠΉΠ» airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Π£ Airflow Π΅ΡΡ‚ΡŒ Π²Π΅Π±-интСрфСйс. Π•Π³ΠΎ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ, Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΠ² ΠΊΠΎΠΌΠ°Π½Π΄Ρƒ:

(venv) $ airflow webserver --port 8081

Π’Π΅ΠΏΠ΅Ρ€ΡŒ Π²Ρ‹ ΠΌΠΎΠΆΠ΅Ρ‚Π΅ ΠΏΠΎΠΏΠ°ΡΡ‚ΡŒ Π² Π²Π΅Π±-интСрфСйс Π² Π±Ρ€Π°ΡƒΠ·Π΅Ρ€Π΅ Π½Π° ΠΏΠΎΡ€Ρ‚Ρƒ 8081 Π½Π° хостС, Π³Π΄Π΅ Airflow Π±Ρ‹Π» Π·Π°ΠΏΡƒΡ‰Π΅Π½, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€: <hostname:8081>.

2. Π Π°Π±ΠΎΡ‚Π° с Experimental API

На этом Airflow настроСн ΠΈ Π³ΠΎΡ‚ΠΎΠ² ΠΊ Ρ€Π°Π±ΠΎΡ‚Π΅. Π’Π΅ΠΌ Π½Π΅ ΠΌΠ΅Π½Π΅Π΅, Π½Π°ΠΌ Π½ΡƒΠΆΠ½ΠΎ Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ Π΅Ρ‰Π΅ ΠΈ Experimental API. Наши Ρ‡Π΅ΠΊΠ΅Ρ€Ρ‹ написаны Π½Π° Python, поэтому Π΄Π°Π»Π΅Π΅ всС запросы Π±ΡƒΠ΄ΡƒΡ‚ Π½Π° Π½Π΅ΠΌ с использованиСм Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊΠΈ requests.

На самом Π΄Π΅Π»Π΅ API ΡƒΠΆΠ΅ Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚ для простых запросов. НапримСр, Ρ‚Π°ΠΊΠΎΠΉ запрос позволяСт ΠΏΠΎΡ‚Π΅ΡΡ‚ΠΈΡ‚ΡŒ Π΅Π³ΠΎ Ρ€Π°Π±ΠΎΡ‚Ρƒ:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Если ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ»ΠΈ Ρ‚Π°ΠΊΠΎΠ΅ сообщСниС Π² ΠΎΡ‚Π²Π΅Ρ‚, Ρ‚ΠΎ это Π·Π½Π°Ρ‡ΠΈΡ‚, Ρ‡Ρ‚ΠΎ всС Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚.

Однако, ΠΊΠΎΠ³Π΄Π° ΠΌΡ‹ Π·Π°Ρ…ΠΎΡ‚ΠΈΠΌ Π·Π°Ρ‚Ρ€ΠΈΠ³Π΅Ρ€ΠΈΡ‚ΡŒ DAG, Ρ‚ΠΎ столкнСмся с Ρ‚Π΅ΠΌ, Ρ‡Ρ‚ΠΎ этот Π²ΠΈΠ΄ запроса нСльзя ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ Π±Π΅Π· Π°ΡƒΡ‚Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ†ΠΈΠΈ.

Для этого Π½ΡƒΠΆΠ½ΠΎ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΡ€ΠΎΠ΄Π΅Π»Π°Ρ‚ΡŒ Π΅Ρ‰Π΅ ряд дСйствий.

Π’ΠΎ-ΠΏΠ΅Ρ€Π²Ρ‹Ρ…, Π² ΠΊΠΎΠ½Ρ„ΠΈΠ³ Π½ΡƒΠΆΠ½ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ это:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Π—Π°Ρ‚Π΅ΠΌ, Π½ΡƒΠΆΠ½ΠΎ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ своСго ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Ρ с админскими ΠΏΡ€Π°Π²Π°ΠΌΠΈ:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Π—Π°Ρ‚Π΅ΠΌ, Π½ΡƒΠΆΠ½ΠΎ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Ρ с ΠΎΠ±Ρ‹Ρ‡Π½Ρ‹ΠΌΠΈ ΠΏΡ€Π°Π²Π°ΠΌΠΈ, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ Π±ΡƒΠ΄Π΅Ρ‚ Ρ€Π°Π·Ρ€Π΅ΡˆΠ΅Π½ΠΎ Π΄Π΅Π»Π°Ρ‚ΡŒ Ρ‚Ρ€ΠΈΠ³Π³Π΅Ρ€ DAG’а.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Π’Π΅ΠΏΠ΅Ρ€ΡŒ всС Π³ΠΎΡ‚ΠΎΠ²ΠΎ.

3. Запуск POST-запроса

Π‘Π°ΠΌ POST-запрос Π±ΡƒΠ΄Π΅Ρ‚ выглядСт Ρ‚Π°ΠΊ:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Запрос ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π°Π½ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ.

БоотвСтствСнно, Π΄Π°Π»Π΅Π΅ ΠΌΡ‹ Π΄Π°Π΅ΠΌ ΠΊΠ°ΠΊΠΎΠ΅-Ρ‚ΠΎ врСмя DAG’у Π½Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ ΠΈ Π΄Π΅Π»Π°Π΅ΠΌ запрос Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ ClickHouse, ΠΏΡ‹Ρ‚Π°ΡΡΡŒ ΠΏΠΎΠΉΠΌΠ°Ρ‚ΡŒ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»ΡŒΠ½Ρ‹ΠΉ ΠΏΠ°ΠΊΠ΅Ρ‚ Π΄Π°Π½Π½Ρ‹Ρ….

ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ° Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½Π°.

Π˜ΡΡ‚ΠΎΡ‡Π½ΠΈΠΊ: habr.com