Conas truicear DAG a dhéanamh in Airflow ag baint úsáide as an API Turgnamhach

Agus ár gcláir oideachais á n-ullmhú againn, bíonn deacrachtaí againn go tréimhsiúil maidir le bheith ag obair le roinnt uirlisí. Agus i láthair na huaire nuair a bhuailimid leo, ní i gcónaí a bhíonn go leor doiciméadú agus earraí a chuideodh chun dul i ngleic leis an bhfadhb seo.

Mar sin bhí sé, mar shampla, in 2015, agus d’úsáideamar braisle Hadoop le Spark do 35 úsáideoir comhuaineach ar an gclár Big Data Specialist. Ní raibh sé soiléir conas é a ullmhú le haghaidh cás úsáideora den sórt sin ag baint úsáide as YARN. Mar thoradh air sin, tar éis dóibh figured amach agus siúl an cosán ar a gcuid féin, rinne siad post ar Habré agus rinne freisin Cruinniú Spark Moscó.

réamhstair

An uair seo beidh muid ag caint faoi chlár difriúil - Innealtóir Sonraí. Ar sé, tógann ár rannpháirtithe dhá chineál ailtireachta: lambda agus kappa. Agus san ailtireacht lamdba, úsáidtear Airflow mar chuid de phróiseáil bhaisc chun logaí a aistriú ó HDFS go ClickHouse.

Tá gach rud go maith go ginearálta. Lig dóibh a gcuid píblínte a thógáil. Mar sin féin, tá “ach” ann: tá ár gcláir go léir chun cinn ó thaobh na teicneolaíochta de i dtéarmaí an phróisis foghlama féin. Chun an saotharlann a sheiceáil, bainimid úsáid as seiceálaithe uathoibríocha: ní mór don rannpháirtí dul chuig a chuntas pearsanta, cliceáil ar an gcnaipe “Seiceáil”, agus tar éis tamaill feiceann sé aiseolas leathnaithe de chineál éigin ar an méid a rinne sé. Agus is ag an bpointe seo a thosaímid ag dul i ngleic lenár bhfadhb.

Socraítear an tsaotharlann seo a sheiceáil mar seo a leanas: seolann muid paicéad sonraí rialaithe chuig Kafka an rannpháirtí, ansin aistríonn Gobblin an paicéad sonraí seo go HDFS, ansin glacann Airflow an paicéad sonraí seo agus cuireann sé i ClickHouse é. Is é an cleas nach gcaithfidh Airflow é seo a dhéanamh i bhfíor-am, déanann sé é de réir an sceidil: uair amháin gach 15 nóiméad a thógann sé a bunch de chomhaid agus uaslódáil iad.

Tarlaíonn sé go gcaithfimid a DAG a spreagadh ar bhealach éigin uainn féin arna iarraidh sin dúinn agus an seiceálaí ar siúl anseo agus anois. Googling, fuair muid amach go bhfuil mar a thugtar air le haghaidh leaganacha níos déanaí de Airflow API Turgnamhach. An focal experimental, ar ndóigh, fuaimeanna sé scary, ach cad atá le déanamh ... Bíonn sé go tobann as.

Ansin, déanfaimid cur síos ar an gcosán iomlán: ó Shreabhadh Aeir a shuiteáil go dtí iarratas POST a ghiniúint a spreagann DAG ag baint úsáide as an API Turgnamhach. Oibreoimid le Ubuntu 16.04.

1. Suiteáil sreabhadh aer

Déanaimis seiceáil go bhfuil Python 3 agus virtualenv againn.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Má tá ceann díobh seo ar iarraidh, ansin é a shuiteáil.

Anois, déanaimis eolaire a chruthú ina leanfaimid orainn ag obair le Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Suiteáil Airflow:

(venv) $ pip install airflow

Leagan ar oibrigh muid air: 1.10.

Anois ní mór dúinn eolaire a chruthú airflow_home, áit a mbeidh na comhaid DAG agus na forlíontáin Airflow suite. Tar éis duit an t-eolaire a chruthú, socraigh an t-athróg timpeallachta AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Is é an chéad chéim eile an t-ordú a rith a chruthóidh agus a thosóidh an bunachar sonraí sreafa i SQLite:

(venv) $ airflow initdb

Cruthófar an bunachar sonraí i airflow.db réamhshocraithe.

Seiceáil an bhfuil Airflow suiteáilte:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Má d'oibrigh an t-ordú, chruthaigh Airflow a chomhad cumraíochta féin airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Tá comhéadan gréasáin ag Airflow. Is féidir é a sheoladh tríd an ordú a rith:

(venv) $ airflow webserver --port 8081

Is féidir leat an comhéadan gréasáin a rochtain anois i mbrabhsálaí ar phort 8081 ar an ósta ina raibh Airflow ag rith, mar seo: <hostname:8081>.

2. Ag obair leis an API Turgnamhach

Ar an Sreabhadh Aeir seo tá sé cumraithe agus réidh le dul. Mar sin féin, ní mór dúinn an API Turgnamhach a rith freisin. Tá ár seiceálaithe scríofa i Python, mar sin beidh gach iarratas air ag baint úsáide as an leabharlann requests.

I ndáiríre tá an API ag obair cheana féin le haghaidh iarratais simplí. Mar shampla, ceadaíonn iarratas den sórt sin duit a chuid oibre a thástáil:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Má fuair tú teachtaireacht den sórt sin mar fhreagra, ciallaíonn sé go bhfuil gach rud ag obair.

Nuair a theastaíonn uainn DAG a thionscnamh, áfach, tuigimid nach féidir iarratas den chineál seo a dhéanamh gan fíordheimhniú.

Chun seo a dhéanamh, beidh ort roinnt gníomhartha a dhéanamh.

Ar dtús, ní mór duit é seo a chur leis an config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Ansin, ní mór duit d'úsáideoir a chruthú le cearta riaracháin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ansin, ní mór duit úsáideoir a chruthú le gnáthchearta a cheadófar chun truicear DAG a dhéanamh.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Anois tá gach rud réidh.

3. Iarratas POST a sheoladh

Beidh cuma mar seo ar an iarratas POST féin:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

D'éirigh leis an iarratas a phróiseáil.

Dá réir sin, ansin tugann muid roinnt ama don DAG próiseáil agus iarratas a dhéanamh chuig an tábla ClickHouse, ag iarraidh an paicéad sonraí rialaithe a ghabháil.

Fíorú críochnaithe.

Foinse: will.com

Add a comment