የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

ሰላም ሀብር! በዚህ ጽሑፍ ውስጥ ስለ ባች ዳታ ማቀናበሪያ ሂደቶችን ለማዳበር ስለ አንድ ጥሩ መሣሪያ መነጋገር እፈልጋለሁ ፣ ለምሳሌ ፣ በድርጅት DWH ወይም በእርስዎ DataLake መሠረተ ልማት ውስጥ። ስለ Apache Airflow (ከዚህ በኋላ የአየር ፍሰት ይባላል) እንነጋገራለን. በሐበሬ ላይ ፍትሃዊ ባልሆነ መልኩ ትኩረት ተሰጥቶታል፣ እና በዋናው ክፍል ቢያንስ የአየር ፍሰት ለኢቲኤል/ኤልቲ ሂደቶች መርሐግብር መርሐግብር ሲመርጡ ማየት ተገቢ መሆኑን ለማሳመን እሞክራለሁ።

ከዚህ ቀደም በቲንኮፍ ባንክ ውስጥ ስሰራ ስለ DWH ርዕስ ተከታታይ ጽሁፎችን ጻፍኩ. አሁን የ Mail.Ru ቡድን ቡድን አባል ሆኛለሁ እና በጨዋታው አካባቢ የውሂብ ትንተና መድረክን እያዘጋጀሁ ነው። በእውነቱ ፣ ዜና እና አስደሳች መፍትሄዎች እየታዩ ፣ እኔ እና ቡድኔ እዚህ ስለ እኛ የመረጃ ትንተና መድረክ እንነጋገራለን ።

መቅድም

ስለዚህ, እንጀምር. የአየር ፍሰት ምንድን ነው? ይህ ቤተ-መጽሐፍት ነው (ወይም የቤተ-መጻህፍት ስብስብ) የሥራ ሂደቶችን ለማዘጋጀት, ለማቀድ እና ለመቆጣጠር. የአየር ፍሰት ዋና ባህሪ፡ Python ኮድ ሂደቶችን ለመግለፅ (ለማዳበር) ጥቅም ላይ ይውላል። ይህ ፕሮጄክትዎን እና ልማትዎን ለማደራጀት ብዙ ጥቅሞች አሉት-በመሰረቱ ፣ የእርስዎ (ለምሳሌ) የኢቲኤል ፕሮጀክት የፓይዘን ፕሮጄክት ብቻ ነው ፣ እናም የመሠረተ ልማት ፣ የቡድን ብዛት እና ልዩ ሁኔታዎችን ከግምት ውስጥ በማስገባት እንደፈለጉ ማደራጀት ይችላሉ ። ሌሎች መስፈርቶች. በመሳሪያነት ሁሉም ነገር ቀላል ነው. ለምሳሌ PyCharm + Git ተጠቀም። በጣም ጥሩ እና በጣም ምቹ ነው!

አሁን የአየር ፍሰት ዋና ዋና አካላትን እንመልከት ። የእነሱን ማንነት እና አላማ በመረዳት የሂደትዎን ስነ-ህንፃ በጥሩ ሁኔታ ማደራጀት ይችላሉ። ምናልባት ዋናው አካል ዳይሬክትድ አሲክሊክ ግራፍ ነው (ከዚህ በኋላ DAG ተብሎ ይጠራል)።

DAG

DAG በተወሰነ የጊዜ ሰሌዳ መሰረት በጥብቅ በተገለፀው ቅደም ተከተል ማጠናቀቅ የሚፈልጓቸው ተግባሮችዎ አንዳንድ ትርጉም ያለው ማህበር ነው። የአየር ፍሰት ከDAGs እና ከሌሎች አካላት ጋር ለመስራት ምቹ የድር በይነገጽን ይሰጣል፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

DAG ይህን ሊመስል ይችላል፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

ገንቢው DAG ሲነድፍ በDAG ውስጥ የትኞቹ ተግባራት እንደሚገነቡ የኦፕሬተሮችን ስብስብ ያስቀምጣል። እዚህ ወደ ሌላ አስፈላጊ አካል ደርሰናል: የአየር ፍሰት ኦፕሬተር.

ከዋኞች

ኦፕሬተር በየትኛዎቹ የሥራ ሁኔታዎች ላይ የተመሰረተ አካል ነው, ይህም የሥራ ሁኔታ በሚፈፀምበት ጊዜ ምን እንደሚሆን ይገልጻል. የአየር ፍሰት ከ GitHub ይለቀቃል ቀድሞውኑ ለመጠቀም ዝግጁ የሆኑ የኦፕሬተሮች ስብስብ ይዟል። ምሳሌዎች፡-

  • BashOperator - የ bash ትዕዛዝ ለማስፈጸም ኦፕሬተር.
  • PythonOperator - የ Python ኮድ ለመደወል ኦፕሬተር።
  • ኢሜል ኦፕሬተር - ኢሜል ለመላክ ኦፕሬተር ።
  • HTTPOperator - ከ http ጥያቄዎች ጋር ለመስራት ኦፕሬተር።
  • SqlOperator - የ SQL ኮድን ለማስፈጸም ከዋኝ.
  • ዳሳሽ አንድን ክስተት ለመጠበቅ ኦፕሬተር ነው (የሚፈለገውን ጊዜ መምጣት ፣ የሚፈለገው ፋይል ገጽታ ፣ በመረጃ ቋቱ ውስጥ ያለ መሾመር ፣ ከኤፒአይ ምላሽ ፣ ወዘተ ፣ ወዘተ.)።

ተጨማሪ የተወሰኑ ኦፕሬተሮች አሉ፡ DockerOperator፣ HiveOperator፣ S3FileTransferOperator፣ PrestoToMysqlOperator፣ SlackOperator።

እንዲሁም በራስዎ ባህሪያት ላይ ተመስርተው ኦፕሬተሮችን ማዳበር እና በፕሮጀክትዎ ውስጥ ሊጠቀሙባቸው ይችላሉ. ለምሳሌ፣ ሰነዶችን ከMongoDB ወደ ቀፎ የሚላክ ኦፕሬተር እና ብዙ ኦፕሬተሮችን ከሞንጎዲቢ ወደ ቀፎ ለመላክ MongoDBToHiveViaHdfsTransfer ፈጥረናል። ጠቅታ ቤት: CHLoadFromHiveOperator እና CHTableLoaderOperator. በመሠረቱ፣ አንድ ፕሮጀክት በመሠረታዊ መግለጫዎች ላይ የተገነባውን ኮድ በተደጋጋሚ እንደተጠቀመ፣ ወደ አዲስ መግለጫ ስለመገንባት ማሰብ ይችላሉ። ይህ ተጨማሪ እድገትን ቀላል ያደርገዋል, እና በፕሮጀክቱ ውስጥ የኦፕሬተሮች ቤተ-መጽሐፍትዎን ያሰፋሉ.

በመቀጠል, እነዚህ ሁሉ የተግባሮች ምሳሌዎች መፈጸም አለባቸው, እና አሁን ስለ መርሐግብር አውጪው እንነጋገራለን.

መርሐግብር አዘጋጅ

የአየር ፍሰት ተግባር መርሐግብር የተገነባው በ ላይ ነው። ቂጣ. ሴሌሪ ወረፋ እና ያልተመሳሰሉ እና የተከፋፈለ የተግባር አፈፃፀም እንዲያደራጁ የሚያስችልዎ የፓይዘን ቤተ-መጽሐፍት ነው። በአየር ፍሰት በኩል ሁሉም ተግባራት ወደ ገንዳዎች ይከፈላሉ. ገንዳዎች የሚፈጠሩት በእጅ ነው። በተለምዶ፣ አላማቸው ከምንጩ ጋር አብሮ የመስራትን ስራ መገደብ ወይም በDWH ውስጥ ስራዎችን መተየብ ነው። ገንዳዎችን በድር በይነገጽ ማስተዳደር ይቻላል፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

እያንዳንዱ ገንዳ በቦታዎች ብዛት ላይ ገደብ አለው. DAG ሲፈጥሩ ገንዳ ይሰጠዋል፡-

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

በDAG ደረጃ የተገለፀው ገንዳ በተግባር ደረጃ ሊገለበጥ ይችላል።
የተለየ ሂደት፣ መርሐግብር፣ ሁሉንም ተግባራት በአየር ፍሰት ውስጥ የማዘጋጀት ኃላፊነት አለበት። በእውነቱ፣ መርሐግብር አስፈፃሚ ሥራዎችን የማዘጋጀት ሁሉንም መካኒኮች ይመለከታል። ሥራው ከመፈጸሙ በፊት በበርካታ ደረጃዎች ውስጥ ያልፋል.

  1. የቀደሙት ተግባራት በDAG ውስጥ ተጠናቀዋል፤ አዲስ ሊሰለፍ ይችላል።
  2. ወረፋው እንደ ተግባራቶች ቅድሚያ ይደረደራል (ቅድሚያዎችም እንዲሁ ቁጥጥር ሊደረግባቸው ይችላል) እና በገንዳው ውስጥ ነፃ ማስገቢያ ካለ ተግባሩ ወደ ሥራ ሊገባ ይችላል።
  3. ነፃ ሠራተኛ ሴሊሪ ካለ, ተግባሩ ወደ እሱ ይላካል; በችግሩ ውስጥ ፕሮግራም ያወጡት ሥራ አንድ ወይም ሌላ ኦፕሬተር በመጠቀም ይጀምራል።

ቀላል በቂ።

መርሐግብር ሰጪው በሁሉም የDAGs ስብስብ እና በDAGs ውስጥ ባሉ ሁሉም ተግባራት ላይ ይሰራል።

መርሐግብር ከDAG ጋር መሥራት እንዲጀምር DAG መርሐግብር ማዘጋጀት ይኖርበታል፡-

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

ዝግጁ-የተዘጋጁ ቅድመ-ቅምጦች ስብስብ አለ- @once, @hourly, @daily, @weekly, @monthly, @yearly.

ክሮን አገላለጾችንም መጠቀም ይችላሉ፡-

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

የማስፈጸሚያ ቀን

የአየር ፍሰት እንዴት እንደሚሰራ ለመረዳት፣ የማስፈጸሚያ ቀን ለDAG ምን እንደሆነ መረዳት አስፈላጊ ነው። በአየር ፍሰት ውስጥ, DAG የማስፈጸሚያ ቀን ልኬት አለው, ማለትም, በ DAG የስራ መርሃ ግብር ላይ በመመስረት, ለእያንዳንዱ የአፈፃፀም ቀን የተግባር ምሳሌዎች ይፈጠራሉ. እና ለእያንዳንዱ የማስፈጸሚያ ቀን ተግባራት እንደገና ሊከናወኑ ይችላሉ - ወይም ለምሳሌ ፣ DAG በበርካታ የማስፈጸሚያ ቀናት ውስጥ በተመሳሳይ ጊዜ ሊሰራ ይችላል። ይህ በግልጽ እዚህ ይታያል፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

እንደ አለመታደል ሆኖ (ወይም እንደ እድል ሆኖ: እንደ ሁኔታው ​​ይወሰናል), በ DAG ውስጥ ያለው ተግባር አፈፃፀም ከተስተካከለ, በቀድሞው የማስፈጸሚያ ቀን ውስጥ ማስፈጸሚያው ማስተካከያዎችን ግምት ውስጥ በማስገባት ይቀጥላል. አዲስ አልጎሪዝምን በመጠቀም ያለፉትን ጊዜያት ውሂብ እንደገና ማስላት ከፈለጉ ይህ ጥሩ ነው ፣ ግን ውጤቱ መጥፎ ነው ምክንያቱም የውጤቱ ተደጋጋሚነት ጠፍቷል (በእርግጥ ማንም የሚፈልገውን የምንጭ ኮድ ስሪት ከ Git እንዲመልሱ እና ምን ለማስላት አያስቸግርዎትም። አንድ ጊዜ ያስፈልግዎታል, በሚፈልጉበት መንገድ).

ተግባራትን መፍጠር

የDAG አተገባበር በፓይዘን ውስጥ ኮድ ነው, ስለዚህ በሚሰሩበት ጊዜ የኮዱን መጠን ለመቀነስ በጣም ምቹ መንገድ አለን, ለምሳሌ, በተቆራረጡ ምንጮች. እንደ ምንጭ ሶስት MySQL ሻርዶች አሉህ እንበል፣ ወደ እያንዳንዳቸው መውጣት እና የተወሰነ ውሂብ መውሰድ አለብህ። ከዚህም በላይ, በተናጥል እና በትይዩ. በDAG ውስጥ ያለው የፓይዘን ኮድ ይህን ሊመስል ይችላል፡-

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG ይህንን ይመስላል

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

በዚህ አጋጣሚ ቅንብሮቹን በቀላሉ በማስተካከል እና DAG ን በማዘመን ሸርተቴ ማከል ወይም ማስወገድ ይችላሉ። ምቹ!

እንዲሁም የበለጠ ውስብስብ የኮድ ማመንጨትን መጠቀም ይችላሉ ፣ ለምሳሌ ፣ ከምንጮች ጋር በመረጃ ቋት መልክ መስራት ወይም የጠረጴዛ መዋቅርን ፣ ከጠረጴዛ ጋር ለመስራት ስልተ ቀመር እና የ DWH መሠረተ ልማትን ባህሪዎች ከግምት ውስጥ በማስገባት ሂደትን መፍጠር ይችላሉ ። N ሠንጠረዦችን ወደ ማከማቻዎ ለመጫን። ወይም ለምሳሌ ከመለኪያ ጋር በዝርዝር መልክ መስራትን ከማይደግፍ ኤፒአይ ጋር በመስራት ከዚህ ዝርዝር ውስጥ N ተግባራትን በDAG ማመንጨት፣ በኤፒአይ ውስጥ ያሉ የጥያቄዎች ትይዩነት ወደ ገንዳ መገደብ እና መቧጠጥ ይችላሉ። አስፈላጊ ውሂብ ከኤፒአይ. ተለዋዋጭ!

ማከማቻ

የአየር ፍሰት የራሱ የኋላ ማከማቻ ፣ የውሂብ ጎታ ( MySQL ወይም Postgres ሊሆን ይችላል ፣ እኛ ፖስትግሬስ አለን) ፣ የተግባር ሁኔታዎችን ፣ DAGs ፣ የግንኙነት መቼቶች ፣ ዓለም አቀፍ ተለዋዋጮችን ፣ ወዘተ ... ወዘተ ያከማቻል ። እዚህ እኔ ማለት እፈልጋለሁ በአየር ፍሰት ውስጥ ያለው ማከማቻ በጣም ቀላል (ወደ 20 ጠረጴዛዎች) እና ማንኛውንም የራስዎን ሂደቶች በላዩ ላይ ለመገንባት ከፈለጉ ምቹ ነው። ጥያቄን እንዴት እንደሚገነቡ ከመረዳትዎ በፊት ለረጅም ጊዜ ማጥናት የነበረባቸው በ Informatica ማከማቻ ውስጥ ያሉትን 100500 ጠረጴዛዎች አስታውሳለሁ።

ክትትል

የማጠራቀሚያውን ቀላልነት ከግምት ውስጥ በማስገባት ለእርስዎ ምቹ የሆነ የተግባር ክትትል ሂደት መገንባት ይችላሉ። በዜፔሊን ውስጥ የማስታወሻ ደብተር እንጠቀማለን፣ የተግባራትን ሁኔታ የምንመለከትበት፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

ይህ የአየር ፍሰት ራሱ የድር በይነገጽ ሊሆንም ይችላል፡-

የአየር ፍሰት ባች ዳታ ማቀነባበሪያ ሂደቶችን በአመቺ እና በፍጥነት ለማዳበር እና ለማቆየት መሳሪያ ነው።

የአየር ፍሰት ኮድ ክፍት ምንጭ ነው, ስለዚህ ለቴሌግራም ማንቂያ ጨምረናል. እያንዳንዱ የሥራ ማስኬጃ ምሳሌ፣ ስህተት ከተፈጠረ፣ መላው የልማት እና የድጋፍ ቡድን ባካተተበት በቴሌግራም ውስጥ ቡድኑን አይፈለጌ መልዕክት ያደርጋል።

ፈጣን ምላሽ በቴሌግራም እንቀበላለን (ከተፈለገ) እና በዜፔሊን በኩል በአየር ፍሰት ውስጥ የተግባር አጠቃላይ ምስል እንቀበላለን።

ԸՆԴՀԱՆՈՒՐ ԳԻՆ

የአየር ፍሰት በዋናነት ክፍት ምንጭ ነው፣ እና ከእሱ ተአምራትን መጠበቅ የለብዎትም። የሚሠራውን መፍትሔ ለመገንባት ጊዜ እና ጥረት ለማድረግ ዝግጁ ይሁኑ። ግቡ ሊደረስበት የሚችል ነው, እመኑኝ, ዋጋ ያለው ነው. የእድገት ፍጥነት ፣ ተለዋዋጭነት ፣ አዳዲስ ሂደቶችን የመጨመር ቀላልነት - ይወዳሉ። እርግጥ ነው, ለፕሮጀክቱ አደረጃጀት ብዙ ትኩረት መስጠት አለብዎት, የአየር ፍሰት እራሱ መረጋጋት: ተአምራት አይከሰቱም.

አሁን የአየር ፍሰት በየቀኑ እየሰራን ነው። ወደ 6,5 ሺህ የሚጠጉ ተግባራት. በባህሪያቸው በጣም የተለያዩ ናቸው። ከተለያዩ እና በጣም የተወሰኑ ምንጮች ወደ ዋናው DWH የመጫን ተግባራት አሉ፣ በዋናው DWH ውስጥ የሱቅ ፊት የማስላት ስራዎች አሉ፣ መረጃን ወደ ፈጣን DWH የማተም ስራዎች፣ ብዙ እና ብዙ የተለያዩ ስራዎች አሉ - እና የአየር ፍሰት ከቀን ወደ ቀን ሁሉንም ያኝኳቸዋል። በቁጥር መናገር ይህ ነው። 2,3 ሺህ በDWH (Hadoop) ውስጥ የተለያየ ውስብስብነት ያላቸው ELT ተግባራት፣ በግምት። 2,5 መቶ የውሂብ ጎታዎች ምንጮች, ይህ ቡድን ከ ነው 4 የኢቲኤል ገንቢዎችበDWH እና በDWH ውስጥ ELT ዳታ ማቀናበር እና ሌሎችም በ ETL መረጃ ሂደት የተከፋፈሉ ናቸው። አንድ አስተዳዳሪ, የአገልግሎቱን መሠረተ ልማት የሚመለከት.

ለወደፊቱ እቅድ

የሂደቱ ብዛት ማደጉ የማይቀር ሲሆን ከአየር ፍሰት መሠረተ ልማት ጋር በተያያዘ የምናደርገው ዋናው ነገር መስፋፋት ነው። የአየር ፍሰት ክላስተር መገንባት እንፈልጋለን፣ ለሴለሪ ሰራተኞች ጥንድ እግሮችን እንመድባለን እና በራስ የሚባዛ ጭንቅላት ከስራ መርሃ ግብር ሂደቶች እና ማከማቻ ጋር።

Epilogue

ይህ በእርግጥ ስለ አየር ፍሰት መናገር የምፈልገው ሁሉም ነገር አይደለም ነገር ግን ዋና ዋና ነጥቦቹን ለማጉላት ሞከርኩ. የምግብ ፍላጎት ከመብላት ጋር ይመጣል ፣ ይሞክሩት እና ይወዳሉ :)

ምንጭ: hab.com

አስተያየት ያክሉ