วิธีสร้างทริกเกอร์ DAG ใน Airflow โดยใช้ Experimental API

ในการเตรียมโปรแกรมการศึกษาของเรา เราประสบปัญหาเป็นระยะๆ ในแง่ของการทำงานกับเครื่องมือบางอย่าง และในขณะที่เราพบพวกเขาไม่มีเอกสารและบทความเพียงพอที่จะช่วยรับมือกับปัญหานี้

ตัวอย่างเช่น ในปี 2015 เราใช้คลัสเตอร์ Hadoop กับ Spark สำหรับผู้ใช้พร้อมกัน 35 คนในโปรแกรม Big Data Specialist ยังไม่ชัดเจนว่าจะเตรียมอย่างไรสำหรับกรณีผู้ใช้ดังกล่าวโดยใช้ YARN ด้วยเหตุนี้ เมื่อคิดออกและเดินไปตามทางด้วยตนเองแล้ว โพสต์เกี่ยวกับ Habré และแสดงด้วย มอสโคว์ สปาร์ค มีทอัพ.

ประวัติศาสตร์

คราวนี้เราจะพูดถึงโปรแกรมอื่น - วิศวกรข้อมูล. ผู้เข้าร่วมของเราสร้างสถาปัตยกรรมสองประเภท: แลมบ์ดาและคัปปา และในสถาปัตยกรรมแลมบานั้น Airflow ถูกใช้เป็นส่วนหนึ่งของการประมวลผลแบบกลุ่มเพื่อถ่ายโอนบันทึกจาก HDFS ไปยัง ClickHouse

ทุกอย่างโดยทั่วไปดี ให้พวกเขาสร้างท่อส่งน้ำ อย่างไรก็ตาม มี "แต่": โปรแกรมทั้งหมดของเรามีความก้าวหน้าทางเทคโนโลยีในแง่ของกระบวนการเรียนรู้ ในการตรวจสอบแล็บ เราใช้ตัวตรวจสอบอัตโนมัติ: ผู้เข้าร่วมต้องไปที่บัญชีส่วนตัวของเขา คลิกปุ่ม "ตรวจสอบ" และหลังจากนั้นไม่นาน เขาจะเห็นข้อเสนอแนะเพิ่มเติมเกี่ยวกับสิ่งที่เขาทำ และ ณ จุดนี้เราเริ่มเข้าใกล้ปัญหาของเรา

การตรวจสอบแล็บนี้มีการจัดเรียงดังนี้: เราส่งแพ็กเก็ตข้อมูลควบคุมไปยัง Kafka ของผู้เข้าร่วม จากนั้น Gobblin จะถ่ายโอนแพ็กเก็ตข้อมูลนี้ไปยัง HDFS จากนั้น Airflow จะนำแพ็กเก็ตข้อมูลนี้ไปใส่ใน ClickHouse เคล็ดลับคือ Airflow ไม่จำเป็นต้องทำสิ่งนี้ตามเวลาจริง แต่ทำตามกำหนดเวลา: ทุกๆ 15 นาทีจะใช้ไฟล์จำนวนมากและอัปโหลด

ปรากฎว่าเราจำเป็นต้องเรียกใช้ DAG ของพวกเขาเองตามคำขอของเราในขณะที่ตัวตรวจสอบกำลังทำงานที่นี่และตอนนี้ Google เราพบว่าสำหรับ Airflow รุ่นที่ใหม่กว่ามีสิ่งที่เรียกว่า API ทดลอง. คำว่า experimentalแน่นอนว่ามันฟังดูน่ากลัว แต่จะทำอย่างไร ... ทันใดนั้นมันก็ดับลง

ต่อไป เราจะอธิบายเส้นทางทั้งหมด ตั้งแต่การติดตั้ง Airflow ไปจนถึงการสร้างคำขอ POST ที่ทริกเกอร์ DAG โดยใช้ Experimental API เราจะทำงานกับ Ubuntu 16.04

1. การติดตั้งระบบระบายอากาศ

ตรวจสอบว่าเรามี Python 3 และ virtualenv

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

หากไม่มีข้อใดข้อหนึ่ง ให้ติดตั้ง

ตอนนี้มาสร้างไดเร็กทอรีที่เราจะทำงานกับ Airflow ต่อไป

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

ติดตั้งการไหลของอากาศ:

(venv) $ pip install airflow

เวอร์ชันที่เราพัฒนา: 1.10

ตอนนี้เราต้องสร้างไดเร็กทอรี airflow_homeที่ซึ่งไฟล์ DAG และปลั๊กอิน Airflow จะตั้งอยู่ หลังจากสร้างไดเร็กทอรีแล้ว ให้ตั้งค่าตัวแปรสภาพแวดล้อม AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

ขั้นตอนต่อไปคือการเรียกใช้คำสั่งที่จะสร้างและเริ่มต้นฐานข้อมูล dataflow ใน SQLite:

(venv) $ airflow initdb

ฐานข้อมูลจะถูกสร้างขึ้นใน airflow.db ค่าเริ่มต้น.

ตรวจสอบว่ามีการติดตั้ง Airflow หรือไม่:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

หากคำสั่งใช้งานได้ Airflow จะสร้างไฟล์กำหนดค่าของตัวเอง airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow มีเว็บอินเตอร์เฟส สามารถเรียกใช้ได้โดยใช้คำสั่ง:

(venv) $ airflow webserver --port 8081

ขณะนี้คุณสามารถเข้าถึงเว็บอินเทอร์เฟซในเบราว์เซอร์บนพอร์ต 8081 บนโฮสต์ที่ Airflow ทำงานอยู่ ดังนี้: <hostname:8081>.

2. การทำงานกับ Experimental API

บน Airflow นี้ได้รับการกำหนดค่าและพร้อมใช้งาน อย่างไรก็ตาม เราจำเป็นต้องเรียกใช้ Experimental API ด้วย ตัวตรวจสอบของเราเขียนด้วยภาษาไพธอน ดังนั้นคำขอทั้งหมดจะอยู่บนตัวตรวจสอบโดยใช้ไลบรารี requests.

จริงๆ แล้ว API ทำงานอยู่แล้วสำหรับคำของ่ายๆ ตัวอย่างเช่น คำขอดังกล่าวอนุญาตให้คุณทดสอบการทำงาน:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

หากคุณได้รับข้อความตอบกลับแสดงว่าทุกอย่างกำลังทำงานอยู่

อย่างไรก็ตาม เมื่อเราต้องการเรียกใช้ DAG เราพบว่าคำขอประเภทนี้ไม่สามารถทำได้หากไม่มีการรับรองความถูกต้อง

ในการทำเช่นนี้คุณจะต้องดำเนินการหลายอย่าง

ก่อนอื่น คุณต้องเพิ่มสิ่งนี้ในการกำหนดค่า:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

จากนั้น คุณต้องสร้างผู้ใช้ของคุณด้วยสิทธิ์ของผู้ดูแลระบบ:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ถัดไป คุณต้องสร้างผู้ใช้ที่มีสิทธิ์ปกติที่จะได้รับอนุญาตให้สร้างทริกเกอร์ DAG

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ตอนนี้ทุกอย่างพร้อมแล้ว

3. เรียกใช้คำขอ POST

คำขอ POST จะมีลักษณะดังนี้:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

คำขอได้รับการดำเนินการเรียบร้อยแล้ว

ดังนั้น เราจึงให้เวลา DAG ในการประมวลผลและส่งคำขอไปยังตาราง ClickHouse โดยพยายามจับแพ็กเก็ตข้อมูลควบคุม

การยืนยันเสร็จสมบูรณ์

ที่มา: will.com

เพิ่มความคิดเห็น