የሙከራ ኤፒአይን በመጠቀም በአየር ፍሰት ውስጥ DAG ቀስቅሴ እንዴት እንደሚሰራ

የትምህርት ፕሮግራሞቻችንን በምንዘጋጅበት ጊዜ፣ ከተወሰኑ መሳሪያዎች ጋር በመስራት ረገድ በየጊዜው ችግሮች ያጋጥሙናል። እና ባጋጠመን ጊዜ, ይህንን ችግር ለመቋቋም የሚረዱን በቂ ሰነዶች እና ጽሑፎች ሁልጊዜ የሉም.

ጉዳዩ ይህ ነበር፣ ለምሳሌ፣ በ2015፣ እና በ"Big Data Specialist" ፕሮግራም ወቅት ለ35 በተመሳሳይ ጊዜ ተጠቃሚዎች የሃዱፕ ክላስተር ከስፓርክ ጋር ተጠቀምን። YARN ን በመጠቀም ለእንደዚህ አይነት መጠቀሚያ መያዣ እንዴት ማዘጋጀት እንደሚቻል ግልጽ አልነበረም. በመጨረሻ፣ አውጥተን በራሳችን መንገድ መንገዱን ከተጓዝን በኋላ አደረግን። Habré ላይ ልጥፍ እና እንዲሁም በ የሞስኮ ስፓርክ ስብሰባ.

prehistory

በዚህ ጊዜ ስለ ሌላ ፕሮግራም እንነጋገራለን - የመረጃ መሐንዲስ ፡፡. ተሳታፊዎቻችን በላዩ ላይ ሁለት ዓይነት አርክቴክቸር ይገነባሉ፡ ላምዳ እና ካፓ። እና በላምድባ አርክቴክቸር፣ እንደ ባች ማቀናበሪያ አካል፣ አየር ፍሰት ምዝግብ ማስታወሻዎችን ከHDFS ወደ ClickHouse ለማስተላለፍ ይጠቅማል።

ሁሉም ነገር በአጠቃላይ ጥሩ ነው. የራሳቸውን የቧንቧ መስመሮች እንዲገነቡ ያድርጉ. ሆኖም ግን, "ግን" አለ: ሁሉም ፕሮግራሞቻችን ከመማር ሂደቱ አንፃር በቴክኖሎጂ የላቁ ናቸው. ላቦራቶሪውን ለመፈተሽ አውቶማቲክ ማመሳከሪያዎችን እንጠቀማለን-ተሳታፊው ወደ የግል መለያው መሄድ ያስፈልገዋል, "Check" የሚለውን ቁልፍ ጠቅ ያድርጉ እና ከተወሰነ ጊዜ በኋላ ባደረገው ነገር ላይ አንድ ዓይነት የተራዘመ ግብረመልስ ያየዋል. እናም በዚህ ቅጽበት ነው ወደ ችግራችን መቅረብ የጀመርነው።

የዚህ ላቦራቶሪ ማረጋገጫ እንደዚህ የተዋቀረ ነው-የቁጥጥር መረጃ ፓኬት ወደ ተሳታፊው ካፍካ እንልካለን, ከዚያም ጎብሊን ይህን የውሂብ ፓኬት ወደ ኤችዲኤፍኤስ ያስተላልፋል, ከዚያም የአየር ፍሰት ይህንን የውሂብ ፓኬት ወስዶ በ ClickHouse ውስጥ ያስቀምጣል. ዘዴው የአየር ፍሰት ይህንን በእውነተኛ ጊዜ ማድረግ የለበትም, በጊዜ መርሐግብር መሰረት ያደርጋል: በየ 15 ደቂቃው ብዙ ፋይሎችን ወስዶ ይሰቅላል.

ፈታኙ እዚህ እና አሁን እየሮጠ እያለ በእኛ ጥያቄ እንደምንም የእነሱን DAG ራሳችንን ማስነሳት አለብን። ከጉግል በኋላ፣ ለኋለኞቹ የአየር ፍሰት ስሪቶች የሚባል ነገር እንዳለ ደርሰንበታል። የሙከራ ኤፒአይ. ቃሉ። experimental, በእርግጥ, አስፈሪ ይመስላል, ግን ምን ማድረግ እንዳለበት ... በድንገት ይነሳል.

በመቀጠል፣ አጠቃላይ መንገዱን እንገልፃለን፡ የአየር ፍሰትን ከመትከል አንስቶ የሙከራ ኤፒአይን በመጠቀም DAGን የሚያስነሳ የPOST ጥያቄ እስከ ማመንጨት ድረስ። ከኡቡንቱ 16.04 ጋር እንሰራለን።

1. የአየር ፍሰት መትከል

Python 3 እና virtualenv እንዳለን እንፈትሽ።

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

ከእነዚህ ውስጥ አንዱ ከጠፋ, ከዚያ ይጫኑት.

አሁን ከአየር ፍሰት ጋር መስራታችንን የምንቀጥልበት ማውጫ እንፍጠር።

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

የአየር ፍሰት መጫን;

(venv) $ pip install airflow

የሰራነው ስሪት፡ 1.10.

አሁን ማውጫ መፍጠር አለብን airflow_homeDAG ፋይሎች እና የአየር ፍሰት ተሰኪዎች የሚገኙበት። ማውጫውን ከፈጠሩ በኋላ የአካባቢን ተለዋዋጭ ያዘጋጁ AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

ቀጣዩ ደረጃ በSQLite ውስጥ የውሂብ ፍሰት ዳታቤዝ የሚፈጥር እና የሚያስጀምር ትእዛዝ ማስኬድ ነው።

(venv) $ airflow initdb

የመረጃ ቋቱ የሚፈጠረው በ ውስጥ ነው። airflow.db ነባሪ።

የአየር ፍሰት መጫኑን እንፈትሽ፡-

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

ትዕዛዙ ከሰራ ፣ ከዚያ አየር ፍሰት የራሱን የውቅር ፋይል ፈጠረ airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

የአየር ፍሰት የድር በይነገጽ አለው። ትዕዛዙን በማስኬድ ማስጀመር ይቻላል-

(venv) $ airflow webserver --port 8081

አሁን የዌብ በይነገጽን በፖርት 8081 ላይ የአየር ፍሰት በሚሰራበት አስተናጋጅ ላይ ባለው አሳሽ ውስጥ መምታት ይችላሉ ፣ ለምሳሌ፡- <hostname:8081>.

2. ከሙከራ ኤፒአይ ጋር መስራት

በዚህ ጊዜ የአየር ፍሰት ተዋቅሮ ለመሄድ ዝግጁ ነው። ሆኖም፣ የሙከራ ኤፒአይን ማስኬድ አለብን። የእኛ ቼኮች የተፃፉት በፓይዘን ነው፣ ስለዚህ ሁሉም ጥያቄዎች ቤተ-መጽሐፍቱን በመጠቀም በውስጡ ይሆናሉ requests.

በእርግጥ፣ ኤፒአይ ለቀላል ጥያቄዎች አስቀድሞ ይሰራል። ለምሳሌ፣ ይህ ጥያቄ አሰራሩን እንድትፈትሽ ይፈቅድልሃል፡-

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

በምላሹ እንዲህ አይነት መልእክት ከተቀበሉ, ሁሉም ነገር እየሰራ ነው ማለት ነው.

ነገር ግን፣ DAG ን ለመቀስቀስ ስንፈልግ፣ የዚህ አይነት ጥያቄ ያለማረጋገጫ ሊቀርብ የማይችል የመሆኑ እውነታ ይገጥመናል።

ይህንን ለማድረግ ብዙ ተጨማሪ እርምጃዎችን ማድረግ ያስፈልግዎታል.

በመጀመሪያ ይህንን ወደ ውቅሩ ማከል ያስፈልግዎታል-

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

ከዚያ ተጠቃሚዎን ከአስተዳዳሪ መብቶች ጋር መፍጠር ያስፈልግዎታል፡-

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

በመቀጠል, DAG ን እንዲያንቀሳቅስ የሚፈቀድለት መደበኛ መብቶች ያለው ተጠቃሚ መፍጠር አለብዎት.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

አሁን ሁሉም ነገር ዝግጁ ነው.

3. የPOST ጥያቄ አስጀምር

የPOST ጥያቄው ራሱ ይህን ይመስላል።

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

ጥያቄው በተሳካ ሁኔታ ተካሂዷል።

በዚህ መሠረት የቁጥጥር መረጃ ፓኬጁን ለመያዝ እየሞከርን ለ DAG ሂደቱን ለማስኬድ እና ለ ClickHouse ጠረጴዛ ጥያቄ ለማቅረብ የተወሰነ ጊዜ እንሰጠዋለን።

ማጣራት ተጠናቅቋል።

ምንጭ: hab.com

አስተያየት ያክሉ