Facere DAG felis in Airflow utens Experimentalis API

Cum institutiones nostras parat, difficultatibus periodice occurrimus secundum instrumenta quaedam operandi. In momento, quo invenimus eos, non semper satis documenta sunt et articuli, qui adiuvant nos hanc quaestionem tolerare.

Hoc casu, exempli gratia, anno 2015, et programmate "Big Data Specialist" Hadoop botrum cum scintilla pro 35 simultaneis usoribus usi sumus. Non liquet quomodo eam ad talem usum causa utens FABULA apparandam. In fine, cum figuravimus et ambulavimus viam nostram, fecimus stipes in Habré ac etiam fieri Moscoviae scintillae Meetup.

erectus

Hoc tempore de alia programmata loquemur - Data ipsum. Participes nostri duo genera architecturae in ea aedificant: Labda et Cappa. Et in architectura lamdba, ut pars batch processus, Airflow est ligna ab HDFS ad ClickHouse transferre.

Omnia generaliter bona sunt. suos fistulas aedificent. Nihilominus est "sed": omnes programmata nostra technologice proferuntur ex ipso processu discendi. Ad lab reprimendum, automatariis latis utimur: particeps ire ad rationem personalem debet, preme "perscriptio" puga, et post aliquod tempus videt nonnullas opiniones extensas de eo quod fecit. Et hoc momento incepimus ad quaestionem nostram accedere.

Verificationis huius lab huius modi est: imperium data fasciculus in Kafka participem mittimus, tum Gobblin hanc fasciculum ad HDFS notitiae transfert, tum Airflow hanc contionem accipit et in ClickHouse ponit. Dolum est quod Airflow non habet hoc in tempore reali facere, secundum schedulam facit: singulis XV minutis fasciculum imaginum et fasciculorum illarum capit.

Evenit ut necessitatem nostram DAC impellere debeamus dum rogatu nostro hic et nunc currit. Post Google, invenimus in versionibus recentioribus Airfluviani sic dictum esse Experimentum API. sermo experimentalsane FORMIDULOSUS sonat, sed quid facere... Subito tollit.

Deinde totam viam describemus: ab institutione Airflow ad petitionem cursoris generandam, quae DAG experimentali API utens utitur. Cum Decuria operabimur 16.04.

Installation 1. Airflow

Reprehendamus quod Python 3 et virtualenv habemus.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Si quid hoc desit, tunc institue illud.

Nunc directorium creare in quo cum Airflow pergo laborare.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Airflow install;

(venv) $ pip install airflow

Versio elaborata est: 1.10.

Nunc opus est directorium creare airflow_home, ubi fasciculi DAG et plugins airflow locabuntur. Directorium post partum, ambitus variabilis, pone AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Proximus gradus est mandatum currere quod datorum datorum in SQLite creabit et initialize

(venv) $ airflow initdb

In database creabitur airflow.db defaltam.

Sit scriptor reprehendo si Airflow est installed:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Si mandatum operatum est, tunc Airflow suum configurationem file creavit airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow web interface habet. Currendo deduci potest mandatum;

(venv) $ airflow webserver --port 8081

Telam interfaciei ferire iam potes in navigatro in portu 8081 in hospitio ubi Airflow cursus erat, exempli gratia: <hostname:8081>.

2. Opus Experimentalis API

Hic, Airflow est felis ac paratus. Tamen etiam Experimentalis API currere necesse est. Nostrae pocula in Pythone scripta sunt, eo amplius omnes petitiones in ea bibliotheca utens requests.

Re vera, API iam pro simplicibus petitionibus operatur. Exempli gratia, petitio permittit ut eius operationem experiaris;

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Si talem nuntium accipis in responsione, significat omnia operari.

Tamen, cum DAG felis velimus, obvium habemus hoc genus petitionis sine authenticitate fieri non posse.

Ad hoc plures gradus facere debes.

Primum, hoc config addere debes:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Deinde, debes creare usorem tuum cum iure admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Deinde, debes creare utentem cum iuribus normalibus qui DAG pulsare licebit.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nunc omnia parata sunt.

3. Duc post petitionem

Postulatio ipsa POST haec aspicietur:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Postulatum processit feliciter.

Proinde, tunc DAG aliquandiu damus ad processum et rogationem ad mensam ClickHouse, conantes capere fasciculum imperium datae.

Reprehendo completur.

Source: www.habr.com

Add a comment