د تجربوي API په کارولو سره په هوا کې د DAG محرک کولو څرنګوالی

کله چې خپل تعلیمي پروګرامونه چمتو کوو، موږ وخت په وخت د ځینو وسیلو سره د کار کولو په برخه کې له ستونزو سره مخ کیږو. او په اوس وخت کې چې موږ ورسره مخ شو، تل کافي اسناد او مقالې شتون نلري چې موږ سره د دې ستونزې سره مقابله کې مرسته وکړي.

دا قضیه وه، د بیلګې په توګه، په 2015 کې، او د "لوی ډیټا متخصص" پروګرام په جریان کې موږ د 35 یوځل کاروونکو لپاره د سپارک سره د هډوپ کلستر کارول. دا روښانه نده چې دا څنګه د یارن په کارولو سره د ورته کارونې قضیې لپاره چمتو کړي. په نهایت کې ، دا په ګوته کولو او پخپله لاره پرمخ تللو ، موږ وکړل په Habré کې پوسټ او هم په کې ترسره شو په مسکو کې د سپارک غونډه.

له تاریخ څخه دمخه

دا ځل به د یو بل پروګرام په اړه خبرې وکړو - د ډیټا انجینر. زموږ ګډونوال په دې کې دوه ډوله معمارۍ جوړوي: لامبدا او کاپا. او د لامډبا جوړښت کې ، د بیچ پروسس کولو برخې په توګه ، ایر فلو د HDFS څخه کلیک هاوس ته د لاګونو لیږدولو لپاره کارول کیږي.

هرڅه عموما ښه دي. اجازه راکړئ چې خپل پایپ لاینونه جوړ کړي. په هرصورت، دلته یو "مګر" شتون لري: زموږ ټول پروګرامونه د زده کړې پروسې له نظره د تخنیکي پلوه پرمختللي دي. د لابراتوار چک کولو لپاره، موږ اتوماتیک چیکر کاروو: ګډون کوونکی باید خپل شخصي حساب ته لاړ شي، د "چیک" تڼۍ کلیک وکړئ، او یو څه وخت وروسته هغه د هغه څه په اړه یو ډول پراخ فیډبیک وګوري چې هغه یې کړي. او دا هغه وخت دی چې موږ خپلې ستونزې ته رسیدو پیل کوو.

د دې لابراتوار تایید په دې ډول جوړ شوی دی: موږ د ګډون کونکي کافکا ته د کنټرول ډیټا پاکټ لیږو، بیا ګوبلین دا ډیټا پاکټ HDFS ته لیږدوي، بیا ایر فلو دا ډیټا پاکټ اخلي او په کلیک هاوس کې یې اچوي. چال دا دی چې ایر فلو اړتیا نلري دا په ریښتیني وخت کې وکړي ، دا د مهالویش سره سم ترسره کوي: په هر 15 دقیقو کې دا د فایلونو یوه ډله اخلي او اپلوډ کوي.

دا معلومه شوه چې موږ اړتیا لرو په یو ډول د دوی DAG پخپله زموږ په غوښتنه متحرک کړو پداسې حال کې چې چیکر دلته او اوس روان دی. د ګوګل کولو وروسته، موږ وموندله چې د ایر فلو د وروستیو نسخو لپاره یو تش په نامه دی تجرباتي API. کلمه experimentalالبته، دا ډارونکی ښکاري، مګر څه وکړي ... ناڅاپه دا لیرې کیږي.

بل ، موږ به ټوله لاره تشریح کړو: د هوایی فلو نصبولو څخه د POST غوښتنې رامینځته کولو پورې چې د تجربې API په کارولو سره DAG هڅوي. موږ به د اوبنټو 16.04 سره کار وکړو.

1. د هوا جریان نصبول

راځئ وګورو چې موږ Python 3 او virtualenv لرو.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

که له دې څخه کوم یو ورک وي نو بیا یې نصب کړئ.

اوس راځئ چې یو لارښود جوړ کړو په کوم کې چې موږ به د ایر فلو سره کار ته دوام ورکړو.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

د هوا جریان نصب کړئ:

(venv) $ pip install airflow

هغه نسخه چې موږ یې کار کاوه: 1.10.

اوس موږ اړتیا لرو چې لارښود جوړ کړو airflow_home، چیرې چې د DAG فایلونه او د هوا فلو پلگ ان به موقعیت ولري. د لارښود رامینځته کولو وروسته ، د چاپیریال متغیر تنظیم کړئ AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

بل ګام د کمانډ چلول دي چې په SQLite کې به د ډیټا فلو ډیټابیس رامینځته او پیل کړي:

(venv) $ airflow initdb

ډیټابیس به په کې جوړ شي airflow.db ډیفالټ

راځئ وګورو چې ایا د هوا جریان نصب شوی دی:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

که کمانډ کار وکړ، نو ایر فلو خپل د ترتیب کولو فایل جوړ کړ airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

د هوا جریان یو ویب انٹرفیس لري. دا د کمانډ په چلولو سره پیل کیدی شي:

(venv) $ airflow webserver --port 8081

تاسو اوس کولی شئ په کوربه کې د 8081 پورټ په براوزر کې ویب انٹرفیس ووهئ چیرې چې ایر فلو روان و ، د مثال په توګه: <hostname:8081>.

2. د تجربوي API سره کار کول

پدې مرحله کې ، د هوا جریان تنظیم شوی او د تګ لپاره چمتو دی. په هرصورت، موږ د تجربې API چلولو ته هم اړتیا لرو. زموږ چیکرونه په Python کې لیکل شوي، نو نور ټولې غوښتنې به د کتابتون په کارولو سره په دې کې وي requests.

په حقیقت کې، API دمخه د ساده غوښتنو لپاره کار کوي. د مثال په توګه، دا غوښتنه تاسو ته اجازه درکوي د دې عملیات ازموینه وکړئ:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

که تاسو په ځواب کې داسې پیغام ترلاسه کړئ، دا پدې مانا ده چې هرڅه کار کوي.

په هرصورت، کله چې موږ د DAG پیل کول غواړو، موږ د دې حقیقت سره مخ یو چې دا ډول غوښتنه د تصدیق پرته نشي ترسره کیدی.

د دې کولو لپاره، تاسو اړتیا لرئ چې یو شمیر نور ګامونه ترسره کړئ.

لومړی، تاسو اړتیا لرئ چې دا په ترتیب کې اضافه کړئ:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

بیا، تاسو اړتیا لرئ خپل کارن د اداري حقونو سره جوړ کړئ:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

بیا، تاسو اړتیا لرئ د عادي حقونو سره یو کارن جوړ کړئ څوک چې د DAG محرک کولو اجازه ولري.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

اوس هر څه چمتو دي.

3. د POST غوښتنه پیل کړئ

د POST غوښتنه به پخپله داسې ښکاري:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

غوښتنه په بریالیتوب سره پروسس شوه.

په دې اساس، موږ بیا DAG ته د پروسس کولو لپاره یو څه وخت ورکوو او د ClickHouse میز ته غوښتنه کوو، هڅه کوي د کنټرول ډیټا کڅوړه ونیسي.

چک بشپړ شو.

سرچینه: www.habr.com

Add a comment