ในการเตรียมโปรแกรมการศึกษาของเรา เราประสบปัญหาเป็นระยะๆ ในแง่ของการทำงานกับเครื่องมือบางอย่าง และในขณะที่เราพบพวกเขาไม่มีเอกสารและบทความเพียงพอที่จะช่วยรับมือกับปัญหานี้
ตัวอย่างเช่น ในปี 2015 เราใช้คลัสเตอร์ Hadoop กับ Spark สำหรับผู้ใช้พร้อมกัน 35 คนในโปรแกรม Big Data Specialist ยังไม่ชัดเจนว่าจะเตรียมอย่างไรสำหรับกรณีผู้ใช้ดังกล่าวโดยใช้ YARN ด้วยเหตุนี้ เมื่อคิดออกและเดินไปตามทางด้วยตนเองแล้ว
ประวัติศาสตร์
คราวนี้เราจะพูดถึงโปรแกรมอื่น -
ทุกอย่างโดยทั่วไปดี ให้พวกเขาสร้างท่อส่งน้ำ อย่างไรก็ตาม มี "แต่": โปรแกรมทั้งหมดของเรามีความก้าวหน้าทางเทคโนโลยีในแง่ของกระบวนการเรียนรู้ ในการตรวจสอบแล็บ เราใช้ตัวตรวจสอบอัตโนมัติ: ผู้เข้าร่วมต้องไปที่บัญชีส่วนตัวของเขา คลิกปุ่ม "ตรวจสอบ" และหลังจากนั้นไม่นาน เขาจะเห็นข้อเสนอแนะเพิ่มเติมเกี่ยวกับสิ่งที่เขาทำ และ ณ จุดนี้เราเริ่มเข้าใกล้ปัญหาของเรา
การตรวจสอบแล็บนี้มีการจัดเรียงดังนี้: เราส่งแพ็กเก็ตข้อมูลควบคุมไปยัง Kafka ของผู้เข้าร่วม จากนั้น Gobblin จะถ่ายโอนแพ็กเก็ตข้อมูลนี้ไปยัง HDFS จากนั้น Airflow จะนำแพ็กเก็ตข้อมูลนี้ไปใส่ใน ClickHouse เคล็ดลับคือ Airflow ไม่จำเป็นต้องทำสิ่งนี้ตามเวลาจริง แต่ทำตามกำหนดเวลา: ทุกๆ 15 นาทีจะใช้ไฟล์จำนวนมากและอัปโหลด
ปรากฎว่าเราจำเป็นต้องเรียกใช้ DAG ของพวกเขาเองตามคำขอของเราในขณะที่ตัวตรวจสอบกำลังทำงานที่นี่และตอนนี้ Google เราพบว่าสำหรับ Airflow รุ่นที่ใหม่กว่ามีสิ่งที่เรียกว่า experimental
แน่นอนว่ามันฟังดูน่ากลัว แต่จะทำอย่างไร ... ทันใดนั้นมันก็ดับลง
ต่อไป เราจะอธิบายเส้นทางทั้งหมด ตั้งแต่การติดตั้ง Airflow ไปจนถึงการสร้างคำขอ POST ที่ทริกเกอร์ DAG โดยใช้ Experimental API เราจะทำงานกับ Ubuntu 16.04
1. การติดตั้งระบบระบายอากาศ
ตรวจสอบว่าเรามี Python 3 และ virtualenv
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
หากไม่มีข้อใดข้อหนึ่ง ให้ติดตั้ง
ตอนนี้มาสร้างไดเร็กทอรีที่เราจะทำงานกับ Airflow ต่อไป
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
ติดตั้งการไหลของอากาศ:
(venv) $ pip install airflow
เวอร์ชันที่เราพัฒนา: 1.10
ตอนนี้เราต้องสร้างไดเร็กทอรี airflow_home
ที่ซึ่งไฟล์ DAG และปลั๊กอิน Airflow จะตั้งอยู่ หลังจากสร้างไดเร็กทอรีแล้ว ให้ตั้งค่าตัวแปรสภาพแวดล้อม AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
ขั้นตอนต่อไปคือการเรียกใช้คำสั่งที่จะสร้างและเริ่มต้นฐานข้อมูล dataflow ใน SQLite:
(venv) $ airflow initdb
ฐานข้อมูลจะถูกสร้างขึ้นใน airflow.db
ค่าเริ่มต้น.
ตรวจสอบว่ามีการติดตั้ง Airflow หรือไม่:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
หากคำสั่งใช้งานได้ Airflow จะสร้างไฟล์กำหนดค่าของตัวเอง airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow มีเว็บอินเตอร์เฟส สามารถเรียกใช้ได้โดยใช้คำสั่ง:
(venv) $ airflow webserver --port 8081
ขณะนี้คุณสามารถเข้าถึงเว็บอินเทอร์เฟซในเบราว์เซอร์บนพอร์ต 8081 บนโฮสต์ที่ Airflow ทำงานอยู่ ดังนี้: <hostname:8081>
.
2. การทำงานกับ Experimental API
บน Airflow นี้ได้รับการกำหนดค่าและพร้อมใช้งาน อย่างไรก็ตาม เราจำเป็นต้องเรียกใช้ Experimental API ด้วย ตัวตรวจสอบของเราเขียนด้วยภาษาไพธอน ดังนั้นคำขอทั้งหมดจะอยู่บนตัวตรวจสอบโดยใช้ไลบรารี requests
.
จริงๆ แล้ว API ทำงานอยู่แล้วสำหรับคำของ่ายๆ ตัวอย่างเช่น คำขอดังกล่าวอนุญาตให้คุณทดสอบการทำงาน:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
หากคุณได้รับข้อความตอบกลับแสดงว่าทุกอย่างกำลังทำงานอยู่
อย่างไรก็ตาม เมื่อเราต้องการเรียกใช้ DAG เราพบว่าคำขอประเภทนี้ไม่สามารถทำได้หากไม่มีการรับรองความถูกต้อง
ในการทำเช่นนี้คุณจะต้องดำเนินการหลายอย่าง
ก่อนอื่น คุณต้องเพิ่มสิ่งนี้ในการกำหนดค่า:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
จากนั้น คุณต้องสร้างผู้ใช้ของคุณด้วยสิทธิ์ของผู้ดูแลระบบ:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ถัดไป คุณต้องสร้างผู้ใช้ที่มีสิทธิ์ปกติที่จะได้รับอนุญาตให้สร้างทริกเกอร์ DAG
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ตอนนี้ทุกอย่างพร้อมแล้ว
3. เรียกใช้คำขอ POST
คำขอ POST จะมีลักษณะดังนี้:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
คำขอได้รับการดำเนินการเรียบร้อยแล้ว
ดังนั้น เราจึงให้เวลา DAG ในการประมวลผลและส่งคำขอไปยังตาราง ClickHouse โดยพยายามจับแพ็กเก็ตข้อมูลควบคุม
การยืนยันเสร็จสมบูรณ์
ที่มา: will.com