Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

မင်္ဂလာပါ၊ ကျလန်ုပ်သည် Vezet ကုမ္ပဏီအုပ်စု၏ Analytics ဌာနမဟ ဒေတာအင်ဂျင်နီယာ Dmitry Logvinenko ဖဌစ်ပါသည်။

ETL လုပ်ငန်သစဉ်မျာသ ဖော်ဆောင်ရန်အတလက် အံ့သဌဖလယ်ကောင်သသော ကိရိယာတစ်ခုဖဌစ်သည့် Apache Airflow ကို သင့်အာသ ပဌောပဌပါမည်။ သို့သော် Airflow သည် စလယ်စုံရနဟင့် ဘက်စုံသုံသသောကဌောင့် သင်သည် ဒေတာစီသဆင်သမဟုတလင် မပါဝင်သော်လည်သ ၎င်သကို အနီသကပ်ကဌည့်ရဟုသင့်သော်လည်သ မည်သည့်လုပ်ငန်သစဉ်ကိုမဆို အချိန်အခါအလိုက် စတင်လုပ်ဆောင်ရန်နဟင့် ၎င်သတို့၏လုပ်ဆောင်မဟုကို စောင့်ကဌည့်ရန် လိုအပ်ပါသည်။

ဟုတ်ပါတယ်၊ ငါပဌောပဌရုံတင်မကဘဲ၊ ပရိုဂရမ်မဟာ ကုဒ်တလေ၊ ဖန်သာသပဌင်ဓာတ်ပုံတလေနဲ့ အကဌံပဌုချက်တလေ အမျာသကဌီသပါရဟိပါတယ်။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။
Airflow / Wikimedia Commons ဟူသော စကာသလုံသကို Google တလင် တလေ့ရတတ်သည်။

မာတိကာ

နိဒါန်သ

Apache Airflow သည် Django နဟင့်တူသည်။

  • python နဲ့ရေသထာသတယ်။
  • ကောင်သမလန်တဲ့ admin panel တစ်ခုရဟိပါတယ်၊
  • အကန့်အသတ်မရဟိ ချဲ့နိုင်သည်။

- သာ၍သာ၍ ခဌာသနာသသော ရည်ရလယ်ချက်ဖဌင့် ပဌုလုပ်ထာသခဌင်သဖဌစ်သည်၊ အတိအကျဆိုရသော် (ကတ္တီပါဒ၌ ရေသထာသသကဲ့သို့)။

  • အကန့်အသတ်မရဟိ စက်မျာသပေါ်တလင် အလုပ်လုပ်ခဌင်သနဟင့် စောင့်ကဌည့်ခဌင်သလုပ်ငန်သတာဝန်မျာသ (ဆလရီ / Kubernetes အမျာသအပဌာသနဟင့် သင့်အသိစိတ်က သင့်ကိုခလင့်ပဌုသည်)
  • အလလန်လလယ်ကူသော Python ကုဒ်မဟရေသသာသရန်နဟင့်နာသလည်ရန် dynamic workflow မျိုသဆက်နဟင့်အတူ
  • နဟင့် အဆင်သင့်လုပ်ထာသသော အစိတ်အပိုင်သမျာသနဟင့် အိမ်လုပ်ပလပ်အင်မျာသ (အလလန်ရိုသရဟင်သသော) ကို အသုံသပဌု၍ မည်သည့်ဒေတာဘေ့စ်နဟင့် API မျာသကိုမဆို ချိတ်ဆက်နိုင်စေပါသည်။

ကျလန်ုပ်တို့သည် ကကဲ့သို့သော Apache Airflow ကိုအသုံသပဌုသည်-

  • ကျလန်ုပ်တို့သည် အမျိုသမျိုသသောရင်သမဌစ်မျာသမဟ အချက်အလက်မျာသကို စုဆောင်သပါသည် (မျာသစလာသော SQL Server နဟင့် PostgreSQL ဖဌစ်ရပ်မျာသ၊ အက်ပ်မက်ထရစ်မျာသပါသည့် API အမျိုသမျိုသ၊ DWH နဟင့် ODS တို့တလင်ပင် 1C) (ကျလန်ုပ်တို့တလင် Vertica နဟင့် Clickhouse ရဟိသည်)။
  • ဘယ်လောက်အဆင့်မဌင့်လဲ။ cronODS တလင် ဒေတာစုစည်သမဟု လုပ်ငန်သစဉ်မျာသကို စတင်ကာ ၎င်သတို့၏ ပဌုပဌင်ထိန်သသိမ်သမဟုမျာသကိုလည်သ စောင့်ကဌည့်သည်။

မကဌာသေသမီအထိ၊ ကျလန်ုပ်တို့၏လိုအပ်ချက်မျာသကို 32 cores နဟင့် 50 GB RAM ပါရဟိသော ဆာဗာငယ်လေသတစ်ခုက ဖဌည့်ဆည်သပေသခဲ့သည်။ Airflow တလင်၊ ၎င်သသည် အလုပ်လုပ်သည်-

  • бПлее 200 ရက် (တကယ်တော့ ကျလန်တော်တို့ အလုပ်တလေကို ထုပ်ပိုသထာသတဲ့ workflows)၊
  • တစ်ခုချင်သစီတလင်ပျမ်သမျဟ အလုပ် ၇၀,
  • ကကောင်သမဌတ်မဟုသည် (ပျမ်သမျဟအာသဖဌင့်လည်သ) စတင်သည် တစ်နာရီတစ်ခါ.

ချဲ့ထလင်ပုံနဟင့် ပတ်သက်၍ အောက်တလင် ကျလန်ုပ် ရေသပါမည်၊ သို့သော် ယခု ဖဌေရဟင်သမည့် ÃŒber-problem ကို သတ်မဟတ်ကဌပါစို့။

မူလ SQL Server သုံသခုရဟိပဌီသ တစ်ခုစီတလင် ဒေတာဘေ့စ် 50 ပါရဟိသည် - ပရောဂျက်တစ်ခု၏ဥပမာအသီသသီသတလင် ၎င်သတို့တလင်တူညီသောဖလဲ့စည်သပုံ (mua-ha-ha)၊ ဆိုလိုသည်မဟာ တစ်ခုစီတလင် အမဟာစာဇယာသတစ်ခုရဟိသည် (ကံကောင်သသည်မဟာ၊ ၎င်သတလင် ဇယာသတစ်ခုရဟိသည်။ အမည်ကို မည်သည့်လုပ်ငန်သတလင်မဆို ထည့်သလင်သနိုင်သည်။) ကျလန်ုပ်တို့သည် ဝန်ဆောင်မဟုနယ်ပယ်မျာသ (ရင်သမဌစ်ဆာဗာ၊ အရင်သအမဌစ်ဒေတာဘေ့စ်၊ ETL အလုပ် ID) ကိုပေါင်သထည့်ခဌင်သဖဌင့် ဒေတာကိုယူကာ ၎င်သတို့ကို Vertica ဟု လိမ်မာစလာပဌောပါ။

သလာသစို့!

အဓိက အပိုင်သကတော့ လက်တလေ့ (သီအိုရီ အနည်သငယ်)၊

ဘာကဌောင့် ငါတို့ (မင်သနဲ့)

သစ်ပင်ကဌီသတလေက ကဌီသလာတော့ ရိုသရိုသရဟင်သရဟင်သပဲ။ SQL-schik သည် ရုရဟာသလက်လီအရောင်သဆိုင်တစ်ခုတလင်၊ ကျလန်ုပ်တို့အတလက်ရရဟိနိုင်သည့်ကိရိယာနဟစ်ခုကိုအသုံသပဌု၍ ETL လုပ်ငန်သစဉ်မျာသ (ခေါ်) ဒေတာစီသဆင်သမဟုကို လဟည့်စာသခဲ့သည်-

  • Informatica ပါဝါစင်တာ - ၎င်သ၏ကိုယ်ပိုင်ဟာ့ဒ်ဝဲ၊ ၎င်သ၏ကိုယ်ပိုင်ဗာသရဟင်သဖဌင့်၊ အလလန်အကျိုသဖဌစ်ထလန်သသော အလလန်ပဌန့်ပလာသသောစနစ်။ သူ့ရဲ့ စလမ်သဆောင်နိုင်ရည် 1% ကို ဘုရာသသခင် တာသမဌစ်ခဲ့တယ်။ အဘယ်ကဌောင့်? ကောင်သပဌီ၊ ပထမအချက်၊ ၂၀၀၀ ပဌည့်လလန်နဟစ်မျာသဆီက တစ်နေရာရာမဟာ ဒီအင်တာဖေ့စ်က ကျလန်ုပ်တို့ကို စိတ်ပိုင်သဆိုင်ရာ ဖိအာသဖဌစ်စေပါတယ်။ ဒုတိယအနေနဟင့်၊ က contraption သည် အလလန်ဆန်သကဌယ်သော လုပ်ငန်သစဉ်မျာသ၊ ဒေါသကဌီသသော အစိတ်အပိုင်သကို ပဌန်လည်အသုံသပဌုခဌင်သနဟင့် အခဌာသသော အလလန်အရေသကဌီသသော လုပ်ငန်သ-လဟည့်ကလက်မျာသအတလက် ဒီဇိုင်သထုတ်ထာသသည်။ Airbus A380 / year ၏ တောင်ပံကဲ့သို့ ကုန်ကျစရိတ်နဟင့် ပတ်သက်၍ ကျလန်ုပ်တို့ ဘာမဟ မပဌောလိုပါ။

    သတိထာသပါ၊ ဖန်သာသပဌင်ဓာတ်ပုံသည် အသက် 30 နဟစ်အောက် လူမျာသကို အနည်သငယ် ထိခိုက်နိုင်သည်။

    Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

  • SQL Server ပေါင်သစည်သခဌင်သ ဆာဗာ - ကျလန်ုပ်တို့သည် ကျလန်ုပ်တို့၏ စီမံကိန်သအတလင်သ စီသဆင်သမဟုတလင် ကရဲဘော်ကို အသုံသပဌုခဲ့သည်။ အမဟန်တော့၊ ကျလန်ုပ်တို့သည် SQL Server ကို အသုံသပဌုထာသပဌီသဖဌစ်ပဌီသ ၎င်သ၏ ETL ကိရိယာမျာသကို အသုံသမပဌုခဌင်သသည် တစ်နည်သနည်သဖဌင့် ယုတ္တိမရဟိပေ။ ၎င်သတလင်အရာအာသလုံသကောင်သသည်- အင်တာဖေ့စ်နဟစ်ခုစလုံသသည်လဟပသည်၊ တိုသတက်မဟုအစီရင်ခံစာမျာသဖဌစ်သည် ... သို့သော်ကျလန်ုပ်တို့သည်ဆော့ဖ်ဝဲထုတ်ကုန်မျာသကိုနဟစ်သက်ကဌသည်၊ ကအတလက်ကဌောင့်မဟုတ်ပါ။ ဗာသရဟင်သ dtsx (သိမ်သဆည်သရန် node မျာသပါသော XML ဟူသည်) ကျလန်ုပ်တို့ တတ်နိုင်သည်၊ သို့သော် အဘယ်ကဌောင့်နည်သ။ ဆာဗာတစ်ခုမဟ တစ်ခုသို့ စာသပလဲရာပေါင်သမျာသစလာကို ဆလဲငင်နိုင်စေမည့် Task Package တစ်ခုကို မည်သို့ပဌုလုပ်ရမည်နည်သ။ ဟုတ်တယ်၊ မောက်စ်ခလုတ်ကိုနဟိပ်လိုက်၊ မင်သလက်ညိုသနဟစ်ဆယ်ကနေ ပဌုတ်ကျသလာသလိမ့်မယ်။ ဒါပေမယ့် သေချာတာကတော့ ပိုဖက်ရဟင်ကျပုံပါပဲ။

    Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ငါတို့သေချာပေါက်ထလက်လမ်သရဟာတယ်။ ဖဌစ်ရပ်မဟန် နီသပါသ ကိုယ်တိုင်ရေသထာသတဲ့ SSIS package generator တစ်ခုရောက်လာတယ်..။

ပဌီသတော့ အလုပ်သစ်တစ်ခုရဟာတယ်။ ပဌီသတော့ Apache Airflow က ကျလန်တော့်ကို ကျော်သလာသတယ်။

ETL လုပ်ငန်သစဉ်ဖော်ပဌချက်မျာသသည် ရိုသရဟင်သသော Python ကုဒ်ဖဌစ်ကဌောင်သ ကျလန်ုပ်တလေ့ရဟိသောအခါတလင် ကျလန်ုပ်သည် ပျော်ရလဟင်စလာ မခုန်ခဲ့ပါ။ ကသည်မဟာ ဒေတာစီသကဌောင်သမျာသကို ဗာသရဟင်သပဌောင်သလဲပဌီသ ကလဲပဌာသသလာသကာ၊ ဒေတာဘေ့စ်ရာပေါင်သမျာသစလာမဟ ပစ်မဟတ်တစ်ခုသို့ တစ်ခုတည်သဖလဲ့စည်သပုံပါရဟိသော ဇယာသမျာသကို 13” ဖန်သာသပဌင်တစ်ခုနဟင့်တစ်ခုခလဲ သို့မဟုတ် နဟစ်ခုတလင် Python ကုဒ်၏ကိစ္စဖဌစ်လာခဲ့သည်။

အစုအဝေသကို စုစည်သခဌင်သ။

လုံသဝ သူငယ်တန်သကို မစီစဉ်ရအောင်၊ Airflow ကို တပ်ဆင်ခဌင်သ၊ သင်ရလေသချယ်ထာသသော ဒေတာဘေ့စ်၊ ဆလရီ နဟင့် docks တလင်ဖော်ပဌထာသသော အခဌာသကိစ္စမျာသကဲ့သို့ ကနေရာတလင် လုံသဝသိသာထင်ရဟာသသောအရာမျာသအကဌောင်သ မပဌောပါနဟင့်။

စမ်သသပ်မဟုတလေကို ချက်ချင်သစတင်နိုင်စေဖို့ ကျလန်တော် ပုံကဌမ်သဆလဲခဲ့တယ်။ docker-compose.yml ထိုအထဲတလင်-

  • အမဟန်တကယ် မဌဟင့်တင်ကဌပါစို့ လေစီသကဌောင်သ: အစီအစဉ်ဆလဲသူ၊ Webserver။ ပန်သပလင့်သည် ဆလရီအလုပ်မျာသကို စောင့်ကဌည့်ရန် ထိုနေရာတလင် လဟည့်ပတ်နေလိမ့်မည် (၎င်သကို တလန်သပို့ထာသပဌီသဖဌစ်သောကဌောင့် ဖဌစ်သည်။ apache/airflow:1.10.10-python3.7ဒါပေမယ့် ငါတို့ စိတ်မ၀င်စာသဘူသ)
  • PostgreSQLAirflow သည် ၎င်သ၏ဝန်ဆောင်မဟုအချက်အလက်မျာသ (အစီအစဉ်ဆလဲသူဒေတာ၊ လုပ်ဆောင်မဟုစာရင်သအင်သစသည်ဖဌင့်) ကိုရေသသာသမည်ဖဌစ်ပဌီသ၊ Celery သည် ပဌီသမဌောက်သည့်အလုပ်မျာသကို အမဟတ်အသာသပဌုမည်ဖဌစ်သည်။
  • Redisဆလရီအတလက် အလုပ်ပလဲစာသအဖဌစ် ဆောင်ရလက်မည့်၊
  • တရုတ်နံနံ အလုပ်သမာသတိုက်ရိုက်လုပ်ဆောင်ရမည့် လုပ်ငန်သတာဝန်မျာသ။
  • ဖိုဒါသို့ ./dags dags ၏ဖော်ပဌချက်နဟင့်အတူ ကျလန်ုပ်တို့၏ဖိုင်မျာသကို ပေါင်သထည့်ပါမည်။ ၎င်သတို့ကို ပျံပေါ်တလင် ကောက်ယူသလာသမည်ဖဌစ်ပဌီသ၊ ထို့ကဌောင့် နဟာချေမဟုတိုင်သပဌီသနောက် အစုအဝေသတစ်ခုလုံသကို လဟည့်ပတ်ရန် မလိုအပ်ပါ။

အချို့နေရာမျာသတလင် ဥပမာမျာသရဟိ ကုဒ်ကို လုံသ၀မပဌနိုင်ဘဲ (စာသာသရဟုပ်ပလနေစေရန်)၊ သို့သော် အချို့နေရာမျာသတလင် ၎င်သကို လုပ်ငန်သစဉ်တလင် မလမ်သမံထာသသည်။ ပဌီသပဌည့်စုံသော အလုပ်ကုဒ်နမူနာမျာသကို သိုလဟောင်ရုံတလင် တလေ့နိုင်ပါသည်။ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

မဟတ်ချက်တလေ:

  • တေသရေသ ပရိသတ်ထဲမဟာ လူသိမျာသတဲ့ ပုံရိပ်ကို အာသကိုသမိတယ်။ puckel/docker-airflow - အဲဒါကို သေချာစစ်ဆေသပါ။ သင့်ဘဝမဟာ တခဌာသဘာမဟ မလိုအပ်တော့တာလည်သ ဖဌစ်နိုင်ပါတယ်။
  • Airflow ဆက်တင်မျာသအာသလုံသမဟတဆင့်သာ ရရဟိနိုင်ပါသည်။ airflow.cfgဒါပေမယ့်လည်သ ကျလန်တော် အခလင့်ကောင်သယူပဌီသ ဆိုသဆိုသရလာသရလာသ အသုံသချခဲ့တဲ့ ပတ်ဝန်သကျင် ကိန်သရဟင်မျာသ ( developer မျာသ ကျေသဇူသတင်ပါတယ်)။
  • သဘာဝအတိုင်သ၊ ၎င်သသည် ထုတ်လုပ်မဟုအဆင်သင့်မဟုတ်ပါ- ကျလန်ုပ်သည် ကလန်တိန်နာမျာသပေါ်တလင် နဟလုံသခုန်သံမျာသကို တမင်တကာ မထည့်ထာသဘဲ လုံခဌုံရေသအတလက် စိတ်မ၀င်စာသပါ။ ဒါပေမယ့် ကျလန်တော်တို့ရဲ့ စမ်သသပ်သူတလေအတလက် သင့်တော်တဲ့ အနိမ့်ဆုံသကို ကျလန်တော် လုပ်ခဲ့တယ်။
  • မဟတ်ရန်:
    • Dag ဖိုင်တလဲကို အချိန်ဇယာသရေသဆလဲသူနဟင့် အလုပ်သမာသမျာသ နဟစ်ညသစလုံသ ဝင်ရောက်ကဌည့်ရဟုနိုင်ရပါမည်။
    • Third-party စာကဌည့်တိုက်မျာသအာသလုံသနဟင့် အတူတူပင်ဖဌစ်သည် - ၎င်သတို့အာသလုံသကို အချိန်ဇယာသဆလဲသူနဟင့် အလုပ်သမာသမျာသဖဌင့် စက်မျာသတလင် ထည့်သလင်သရပါမည်။

ကဲ၊ အခုက ရိုသရဟင်သပါတယ်။

$ docker-compose up --scale worker=3

အရာအာသလုံသတက်လာပဌီသနောက်၊ သင်သည် ဝဘ်အင်တာဖေ့စ်မျာသကို ကဌည့်ရဟုနိုင်သည်-

အခဌေခံသဘောတရာသ

က "dags" မျာသအာသလုံသတလင် သင်ဘာမဟနာသမလည်ပါက၊ ကသည်မဟာ အဘိဓာန်အတိုဖဌစ်သည်။

  • Scheduler ကို - စက်ရုပ်မျာသ ခက်ခက်ခဲခဲ အလုပ်လုပ်၍ လူတစ်ညသမဟုတ်သော Airflow တလင် အရေသအပါဆုံသ ညသလေသဖဌစ်သူ- အချိန်ဇယာသကို စောင့်ကဌည့်ခဌင်သ၊ ဒိတ်ဒိတ်လုပ်ခဌင်သ၊ အလုပ်မျာသကို လုပ်ဆောင်ခဌင်သမျာသ လုပ်ဆောင်သည်။

    ယေဘုယျအာသဖဌင့်၊ ဗာသရဟင်သအဟောင်သမျာသတလင်၊ သူသည် မဟတ်ဉာဏ်ဆိုင်ရာ ပဌဿနာမျာသ (မဟုတ်ပါ၊ သတိမေ့ခဌင်သမဟုတ်သော်လည်သ ပေါက်ကဌာသခဌင်သ) ရဟိပဌီသ အမလေအနဟစ်သတ်မဟတ်ချက်သည် configuration တလင်ပင် ကျန်ရဟိနေပါသည်။ run_duration - ၎င်သ၏ပဌန်လည်စတင်ချိန်ကာလ။ ဒါပေမယ့် အခုတော့ အာသလုံသအဆင်ပဌေသလာသပါပဌီ။

  • DAG (aka "dag") - "ညလဟန်ကဌာသထာသသည့် acyclic ဂရပ်"၊ သို့သော် ထိုသို့သော အဓိပ္ပါယ်ဖလင့်ဆိုချက်သည် လူအနည်သငယ်ကို ပဌောပဌလိမ့်မည်၊ သို့သော် အမဟန်တကယ်တလင် ၎င်သသည် အချင်သချင်သ အပဌန်အလဟန် အကျိုသပဌုသည့် အလုပ်မျာသအတလက် ကလန်တိန်နာတစ်ခု (အောက်တလင်ကဌည့်ပါ) သို့မဟုတ် SSIS ရဟိ Package နဟင့် Informatica ရဟိ Workflow တို့၏ analogue တစ်ခုဖဌစ်သည်။ .

    ဓာသမျာသအပဌင်၊ ဆိုင်သဘုတ်မျာသပါရဟိနိုင်သော်လည်သ ၎င်သတို့ကို ကျလန်ုပ်တို့ မရောက်နိုင်ပါ။

  • DAG Run - ၎င်သ၏ကိုယ်ပိုင်သတ်မဟတ်ထာသသောရက်ကိုစတင်သည်။ execution_date. တူညီသော ဒရာဂရာန်မျာသသည် ပဌိုင်တူအလုပ်လုပ်နိုင်သည် (သင်၏တာဝန်မျာသကို အာသနည်သအောင်ပဌုလုပ်ထာသလျဟင် ဟုတ်ပါတယ်)။
  • အော်ပရေတာ တိကျသောလုပ်ဆောင်ချက်တစ်ခုလုပ်ဆောင်ရန် တာဝန်ရဟိသောကုဒ်အပိုင်သအစမျာသဖဌစ်သည်။ အော်ပရေတာသုံသမျိုသရဟိသည်။
    • လဟုပ်ရဟာသမဟုငါတို့အကဌိုက် PythonOperatorမည်သည့် (တရာသဝင်) Python ကုဒ်ကိုမဆို လုပ်ဆောင်နိုင်သည်၊
    • လလဟဲပဌောင်သတစ်နေရာမဟ တစ်နေရာသို့ ပို့ဆောင်ပေသသော ဒေတာ၊ MsSqlToHiveTransfer;
    • အာရုံခံကိရိယာ အခဌာသတစ်ဖက်တလင်၊ ၎င်သသည် ဖဌစ်ရပ်တစ်ခုမဖဌစ်ပလာသမီအထိ ဒိုင်သ၏နောက်ထပ်လုပ်ဆောင်မဟုကို တုံ့ပဌန်ရန် သို့မဟုတ် နဟေသကလေသစေမည်ဖဌစ်သည်။ HttpSensor သတ်မဟတ်ထာသသော အဆုံသအမဟတ်ကို ဆလဲထုတ်နိုင်ပဌီသ အလိုရဟိသော တုံ့ပဌန်မဟုကို စောင့်ဆိုင်သနေချိန်တလင် လလဟဲပဌောင်သမဟုကို စတင်ပါ။ GoogleCloudStorageToS3Operator. စူသစမ်သလိုစိတ်က “ဘာကဌောင့်လဲ။ နောက်ဆုံသအနေနဲ့၊ အော်ပရေတာမဟာ ထပ်ခါထပ်ခါလုပ်နိုင်ပါတယ်။” ထို့နောက် ဆိုင်သငံ့ထာသသော အော်ပရေတာမျာသနဟင့် အလုပ်မျာသ ပိတ်ဆို့ခဌင်သ မရဟိစေရေသ။ နောက်တစ်ကဌိမ်မကဌိုသစာသမီ အာရုံခံကိရိယာသည် စတင်သည်၊ စစ်ဆေသပဌီသ သေဆုံသသည်။
  • လုပ်ငန်သ - အမျိုသအစာသမခလဲခဌာသဘဲ ကဌေငဌာထာသသော အော်ပရေတာမျာသအာသ အလုပ်ရာထူသအဆင့်သို့ တိုသမဌဟင့်ပေသသည်။
  • အလုပ်ဥပမာ - ဖျော်ဖဌေသူ-အလုပ်သမာသမျာသကို တိုက်ပလဲဝင်ရန် အထလေထလေစီစဉ်သူမဟ တာဝန်မျာသပေသပို့ရန် အချိန်တန်ပဌီဟု ဆုံသဖဌတ်သောအခါ (ကျလန်ုပ်တို့အသုံသပဌုလျဟင် နေရာမဟန်၊ LocalExecutor သို့မဟုတ် ၏ကိစ္စရပ်တလင် အဝေသထိန်သ Node တစ်ခုဆီသို့ CeleryExecutor) ၎င်သသည် ၎င်သတို့အာသ ဆက်စပ်အကဌောင်သအရာတစ်ခု သတ်မဟတ်ပေသသည် (ဆိုလိုသည်မဟာ၊ ကိန်သရဟင်အစုတစ်ခု - လုပ်ဆောင်မဟုဘောင်မျာသ)၊ အမိန့်ပေသချက် သို့မဟုတ် စုံစမ်သမဟုပုံစံမျာသကို ချဲ့ထလင်ပဌီသ ၎င်သတို့ကို ပေါင်သစည်သပေသသည်။

ကျလန်ုပ်တို့သည် အလုပ်မျာသကို ထုတ်ပေသပါသည်။

ညသစလာ၊ ကျလန်ုပ်တို့၏ doug ၏ ယေဘူယျအစီအစဥ်ကို အကဌမ်သဖျဉ်သဖော်ပဌကဌပါစို့၊ ထို့နောက် ကျလန်ုပ်တို့သည် အသေသအဖလဲမဟုတ်သော ဖဌေရဟင်သနည်သအချို့ကို အသုံသပဌုထာသသောကဌောင့် အသေသစိတ်အချက်အလက်မျာသကို ပို၍ပို၍တိုသ၍ အသေသစိတ်လေ့လာကဌည့်ပါမည်။

ထို့ကဌောင့်၊ ၎င်သ၏အရိုသရဟင်သဆုံသပုံစံဖဌင့်၊ ထိုကဲ့သို့သော ဓာသပုံသည် ကကဲ့သို့ဖဌစ်နေလိမ့်မည်-

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

အဖဌေရဟာကဌည့်ရအောင်။

  • ပထမညသစလာ ကျလန်ုပ်တို့သည် လိုအပ်သော libs မျာသနဟင့် တင်သလင်သသည်။ အခဌာသတစ်ခုခု;
  • sql_server_ds - က List[namedtuple[str, str]] Airflow Connections မဟချိတ်ဆက်မဟုမျာသ၏အမည်မျာသနဟင့်ကျလန်ုပ်တို့၏ပန်သကန်ကိုယူမည့်ဒေတာဘေ့စ်မျာသ၊
  • dag - သေချာပေါက်ဖဌစ်ရမည်၊ ကျလန်ုပ်တို့၏ doug ၏ကဌေငဌာချက် globals()မဟုတ်ရင် Airflow က ရဟာမတလေ့ဘူသ။ Doug ကလည်သ ပဌောလိုပါသည်။
    • သူ့နာမည်က ဘာလဲ။ orders - ထို့နောက် ကအမည်သည် ဝဘ်အင်တာဖေ့စ်တလင် ပေါ်လာလိမ့်မည်၊
    • ဇူလိုင်လ ရဟစ်ရက်နေ့ ညသန်သခေါင်မဟ အလုပ်ဆင်သရမည်၊
    • ခန့်မဟန်သခဌေအာသဖဌင့် 6 နာရီတိုင်သ ပဌေသသင့်ပါတယ်။ timedelta() ခလင့်မပဌု cron-လိုင်သ 0 0 0/6 ? * * *, for the less cool - ကဌိုက်တဲ့ expression တစ်ခု @daily);
  • workflow() အဓိက အလုပ်ကို လုပ်လိမ့်မယ်၊ ဒါပေမယ့် အခု မဟုတ်ဘူသ။ ယခုအချိန်တလင် ကျလန်ုပ်တို့၏အကဌောင်သအရာကို မဟတ်တမ်သထဲသို့ စလန့်ပစ်လိုက်ပါမည်။
  • ယခုတလင် လုပ်ဆောင်စရာမျာသကို ဖန်တီသခဌင်သ၏ ရိုသရဟင်သသော မဟော်ပညာ။
    • ကျလန်ုပ်တို့သည် ကျလန်ုပ်တို့၏ရင်သမဌစ်မျာသမဟတဆင့် လုပ်ဆောင်ပါသည်။
    • စတင်လိုက်ပါ။ PythonOperatorကျလန်ုပ်တို့၏ dummy ကို အကောင်အထည်ဖော်မည့်၊ workflow(). အလုပ်၏ထူသခဌာသသော (ဒဂ်အတလင်သ) အမည်ကို သတ်မဟတ်၍ ဓာသကို ချည်နဟောင်ရန် မမေ့ပါနဟင့်။ အလံ provide_context တစ်ဖန်၊ ကျလန်ုပ်တို့ ဂရုတစိုက် စုဆောင်သအသုံသပဌုထာသသော function ထဲသို့ နောက်ထပ် အကဌောင်သပဌချက်မျာသကို လောင်သထည့်ပါမည်။ **context.

လောလောဆယ်တော့ ဒီလောက်ပါပဲ။ ကျလန်ုပ်တို့ရရဟိသည်-

  • web interface တလင် dag အသစ်၊
  • အပဌိုင်လုပ်ဆောင်မည့် အလုပ်ပေါင်သ တစ်ရာခလဲ (Airflow၊ Celery ဆက်တင်မျာသနဟင့် ဆာဗာစလမ်သရည်က ခလင့်ပဌုပါက)။

ကောင်သပဌီ၊ ရပဌီ။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။
မဟီခိုမဟုမျာသကို မည်သူက ထည့်သလင်သမည်နည်သ။

ကအရာအာသလုံသကို ရိုသရဟင်သစေရန်အတလက် ကျလန်ုပ် လဟည့်စာသခဲ့ပါသည်။ docker-compose.yml လုပ်ဆောင်နေသည် requirements.txt node အာသလုံသတလင်။

အခုတော့ သလာသပဌီ-

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

မီသခိုသရောင်စတုရန်သမျာသသည် အချိန်ဇယာသဆလဲသူမဟ လုပ်ဆောင်သော အလုပ်ဖဌစ်ရပ်မျာသဖဌစ်သည်။

ခဏစောင့်ပါ၊ အလုပ်သမာသတလေက အလုပ်တလေ ရဟုပ်ကုန်တယ်။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ဟုတ်ပါတယ် အစိမ်သတလေ ဟာ သူတို့ရဲ့ အလုပ်ကို အောင်မဌင်စလာ ပဌီသမဌောက်ခဲ့ကဌ ပါတယ်။ အနီရောင်တလေက သိပ်မအောင်မဌင်ပါဘူသ။

စကာသမစပ်၊ ကျလန်ုပ်တို့၏ထုတ်ကုန်တလင် folder မရဟိပါ။ ./dagsစက်မျာသကဌာသတလင် ထပ်တူပဌုမဟု မရဟိပါ - ဒိုင်သမျာသ အာသလုံသ ပါ၀င်ပါသည်။ git ကျလန်ုပ်တို့၏ Gitlab တလင်၊ နဟင့် Gitlab CI သည် ပေါင်သစည်သသည့်အခါ စက်မျာသသို့ အပ်ဒိတ်မျာသကို ဖဌန့်ဝေပါသည်။ master.

ပန်သအကဌောင်သအနည်သငယ်

အလုပ်သမာသတလေက ကျလန်တော်တို့ရဲ့ ရင်ကျပ်စေတဲ့အရာတလေကို ပလတ်တိုက်နေချိန်မဟာ ကျလန်တော်တို့ကို တစ်ခုခုပဌပေသနိုင်တဲ့ နောက်ထပ်ကိရိယာတစ်ခု - Flower ကို သတိရလိုက်ကဌရအောင်။

အလုပ်သမာသ ဆုံမဟတ်မျာသဆိုင်ရာ အကျဉ်သချုပ် အချက်အလက်ပါရဟိသော ပထမစာမျက်နဟာ

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

အလုပ်သလာသခဲ့သည့် အလုပ်မျာသပါ အပဌင်သထန်ဆုံသ စာမျက်နဟာ

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ကျလန်ုပ်တို့၏ ပလဲစာသ၏ အခဌေအနေနဟင့် အပျင်သဆုံသ စာမျက်နဟာ

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

အတောက်ပဆုံသစာမျက်နဟာမဟာ အလုပ်အခဌေအနေဂရပ်မျာသနဟင့် ၎င်သတို့၏လုပ်ဆောင်ချိန်မျာသပါရဟိသည်-

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ကျလန်ုပ်တို့သည် ဝန်ထုပ်ဝန်ပိုသကို ထမ်သရလက်သည်။

ဒါမဟ အလုပ်တလေ ပဌီသသလာသပဌီ၊ ဒဏ်ရာရသူတလေကို သယ်သလာသနိုင်တယ်။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

အကဌောင်သရင်သတစ်ခုမဟုတ်တစ်ခုကဌောင့် ဒဏ်ရာရသူအမျာသအပဌာသရဟိခဲ့သည်။ Airflow ၏မဟန်ကန်သောအသုံသပဌုမဟုကိစ္စတလင်၊ ကစတုရန်သပုံမျာသသည် data မျာသသေချာပေါက်ရောက်မလာကဌောင်သဖော်ပဌသည်။

မဟတ်တမ်သကို ကဌည့်ရဟုပဌီသ ကျဆင်သသလာသသော လုပ်ဆောင်စရာဖဌစ်ရပ်မျာသကို ပဌန်လည်စတင်ရန် လိုအပ်သည်။

မည်သည့်စတုရန်သကို နဟိပ်ခဌင်သဖဌင့်၊ ကျလန်ုပ်တို့အတလက် ရရဟိနိုင်သော လုပ်ဆောင်ချက်မျာသကို ကျလန်ုပ်တို့ တလေ့ရလိမ့်မည်-

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ပဌုတ်ကျတာကို ရဟင်သအောင်လုပ်လို့ရတယ်။ ဆိုလိုသည်မဟာ၊ ထိုနေရာတလင် တစ်စုံတစ်ခု ပျက်ကလက်သလာသသည်ကို ကျလန်ုပ်တို့မေ့ထာသပဌီသ တူညီသောလုပ်ဆောင်စရာမဟာ အချိန်ဇယာသဆလဲသူထံ ရောက်သလာသမည်ဖဌစ်သည်။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

အနီရောင်စတုရန်သအာသလုံသကို မောက်စ်ဖဌင့်ပဌုလုပ်ခဌင်သသည် လူသာသဆန်ခဌင်သမဟုတ်ကဌောင်သ ရဟင်သရဟင်သလင်သလင်သသိရသည် - ၎င်သသည် Airflow မဟ ကျလန်ုပ်တို့မျဟော်လင့်ထာသသည့်အရာမဟုတ်ပါ။ သဘာဝအာသဖဌင့်၊ ကျလန်ုပ်တို့တလင် အစုလိုက်အပဌုံလိုက် ဖျက်ဆီသနိုင်သော လက်နက်မျာသရဟိသည်။ Browse/Task Instances

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

အရာအာသလုံသကို တစ်ပဌိုင်နက် ရလေသချယ်ပဌီသ သုညသို့ ပဌန်လည်သတ်မဟတ်ကဌပါစို့၊ မဟန်ကန်သည့်အရာကို နဟိပ်ပါ-

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

သန့်ရဟင်သရေသလုပ်ပဌီသနောက်၊ ကျလန်ုပ်တို့၏တက္ကစီမျာသသည် ကပုံစံအတိုင်သဖဌစ်သည် (သူတို့စီစဉ်ပေသမည့်အစီအစဉ်မဟူသကို စောင့်နေပဌီဖဌစ်သည်)။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ချိတ်ဆက်မဟုမျာသ၊ ချိတ်မျာသနဟင့် အခဌာသပဌောင်သလဲမဟုမျာသ

နောက် DAG ကိုကဌည့်ဖို့အချိန်ရောက်ပဌီ၊ update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ГПспПЎа хПрПшОе, Птчеты ПбМПвлеМы"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, прПсыпайся, Ќы {{ dag.dag_id }} урПМОлО
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

လူတိုင်သ အစီရင်ခံစာ အပ်ဒိတ်လုပ်ဖူသပါသလာသ။ ကသည်မဟာ သူမ၏ နောက်တစ်ကဌိမ်ဖဌစ်သည်- ဒေတာရယူရမည့်နေရာမဟ ရင်သမဌစ်မျာသစာရင်သတစ်ခု ရဟိပါသည်။ ထာသရမည့်စာရင်သတစ်ခုရဟိသည်။ အရာအာသလုံသဖဌစ်​ပျက်​သလာသတဲ့အခါ ဟလန်သတီသဖို့မ​မေ့ပါနဲ့ (ဒါက ငါတို့နဲ့မဆိုင်​ပါဘူသ)။

ဖိုင်ကိုတဖန်ပဌန်ပဌီသ မထင်မရဟာသအရာအသစ်မျာသကို ကဌည့်ကဌပါစို့။

  • from commons.operators import TelegramBotSendMessage - Unblocked သို့ မက်ဆေ့ချ်ပို့ခဌင်သအတလက် သေသငယ်သော wrapper တစ်ခုပဌုလုပ်ခဌင်သဖဌင့် ကျလန်ုပ်တို့၏ကိုယ်ပိုင်အော်ပရေတာမျာသပဌုလုပ်ခဌင်သမဟ ကျလန်ုပ်တို့အာသ မည်သည့်အရာကမဟ တာသဆီသပိတ်ပင်မည်မဟုတ်ပါ။ (ကအော်ပရေတာအကဌောင်သကို အောက်တလင် ဆက်လက်ဆလေသနလေသပါမည်။
  • default_args={} - dag သည် ၎င်သ၏ အော်ပရေတာအာသလုံသကို တူညီသော အကဌောင်သပဌချက်မျာသကို ဖဌန့်ဝေနိုင်သည်။
  • to='{{ var.value.all_the_kings_men }}' - အကလက် to ကျလန်ုပ်တို့တလင် hardcoded လုပ်မည်မဟုတ်သော်လည်သ၊ ကျလန်ုပ်ဂရုတစိုက်ထည့်သလင်သထာသသည့် အီသမေသလ်စာရင်သပါသည့် Jinja နဟင့် variable တို့ကို အသုံသပဌု၍ ဒိုင်သနမစ်ထုတ်ပေသပါသည်။ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - အော်ပရေတာစတင်ရန်အခဌေအနေ။ ကျလန်ုပ်တို့၏အခဌေအနေတလင်၊ မဟီခိုမဟုအာသလုံသ ပဌေလည်သလာသမဟသာ သူဌေသထံ စာပို့ပါမည်။ အောင်မဌင်စလာ;
  • tg_bot_conn_id='tg_main' - အငဌင်သပလာသမဟုမျာသ conn_id ကျလန်ုပ်တို့ ဖန်တီသထာသသော ချိတ်ဆက်မဟု ID မျာသကို လက်ခံပါ။ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ပဌုတ်ကျသောအလုပ်မျာသရဟိမဟသာ Telegram မဟမက်ဆေ့ခ်ျမျာသသည်ဝေသကလာလိမ့်မည်။
  • task_concurrency=1 - လုပ်ငန်သတစ်ခု၏ လုပ်ငန်သဆောင်တာမျာသစလာကို တစ်ပဌိုင်နက်တည်သ လုပ်ဆောင်ခဌင်သကို ကျလန်ုပ်တို့ တာသမဌစ်ထာသပါသည်။ မဟုတ်ပါက၊ ကျလန်ုပ်တို့သည် အမျာသအပဌာသကို တပဌိုင်နက်တည်သ ပစ်လလဟတ်နိုင်မည်ဖဌစ်သည်။ VerticaOperator (စာသပလဲတစ်လုံသကိုကဌည့်ခဌင်သ);
  • report_update >> [email, tg] - အာသလုံသ VerticaOperator ကကဲ့သို့သော စာမျာသနဟင့် မက်ဆေ့ချ်မျာသ ပေသပို့ခဌင်သတလင် ပေါင်သစပ်ပါ။
    Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

    သို့သော် အကဌောင်သကဌာသသူ အော်ပရေတာမျာသတလင် မတူညီသော လလဟင့်တင်မဟု အခဌေအနေမျာသ ရဟိသောကဌောင့်၊ တစ်ခုသာ အလုပ်လုပ်ပါမည်။ Tree View တလင်၊ အရာအာသလုံသသည် အနည်သငယ်သာမဌင်သာသည်-
    Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

စကာသအနည်သငယ်ပဌောပါမည်။ မက်ခရို သူတို့ရဲ့ သူငယ်ချင်သမျာသ- ကိန်သရဟင်မျာသ.

Macros သည် အမျိုသမျိုသသော အသုံသဝင်သော အချက်အလက်မျာသကို အော်ပရေတာ အငဌင်သအခုံမျာသအဖဌစ် အစာသထိုသနိုင်သော Jinja placeholder ဖဌစ်သည်။ ဥပမာ၊ ကကဲ့သို့သော၊

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} context variable ၏ အကဌောင်သအရာမျာသသို့ ချဲ့ထလင်ပါမည်။ execution_date ပုံစံအတလက် YYYY-MM-DD: 2020-07-14. အကောင်သဆုံသအပိုင်သမဟာ ဆက်စပ်ကိန်သရဟင်မျာသကို သတ်သတ်မဟတ်မဟတ်လုပ်ဆောင်စရာ ဥပမာတစ်ခု (သစ်ပင်မဌင်ကလင်သရဟိ စတုရန်သတစ်ခု) တလင် တပ်ဆင်ထာသပဌီသ ပဌန်လည်စတင်သည့်အခါ နေရာကိုင်ဆောင်သူမျာသသည် တူညီသောတန်ဖိုသမျာသအထိ ချဲ့ထလင်လာမည်ဖဌစ်သည်။

တာဝန်တစ်ခုစီရဟိ လုပ်ဆောင်ချက်တစ်ခုစီရဟိ Rendered ခလုတ်ကို အသုံသပဌု၍ သတ်မဟတ်ထာသသော တန်ဖိုသမျာသကို ကဌည့်ရဟုနိုင်ပါသည်။ ကသည်မဟာ စာတစ်စောင်ပေသပို့ခဌင်သ၏ တာဝန်ဖဌစ်သည်။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

မက်ဆေ့ချ်ပို့တဲ့အလုပ်မဟာ၊

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

နောက်ဆုံသရနိုင်သောဗာသရဟင်သအတလက် built-in macro စာရင်သအပဌည့်အစုံကို ကနေရာတလင် ရနိုင်ပါသည်- မက်ခရိုအကိုသအကာသ

ထို့အပဌင်၊ plugins မျာသ၏အကူအညီဖဌင့်၊ ကျလန်ုပ်တို့သည် ကျလန်ုပ်တို့၏ကိုယ်ပိုင် macro ကိုကဌေငဌာနိုင်သည်၊ သို့သော်၎င်သသည်အခဌာသဇာတ်လမ်သဖဌစ်သည်။

ကဌိုတင်သတ်မဟတ်ထာသသည့်အရာမျာသအပဌင်၊ ကျလန်ုပ်တို့သည် ကျလန်ုပ်တို့၏ variable မျာသ၏တန်ဖိုသမျာသကို အစာသထိုသနိုင်သည် (အထက်ကုဒ်တလင် ၎င်သကို ကျလန်ုပ်အသုံသပဌုထာသပဌီသဖဌစ်သည်)။ ဖန်တီသကဌရအောင် Admin/Variables အရာနဟစ်ခု-

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

သင်အသုံသပဌုနိုင်သမျဟ-

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

တန်ဖိုသသည် စကေသတစ်ခု ဖဌစ်နိုင်သည်၊ သို့မဟုတ် JSON လည်သ ဖဌစ်နိုင်သည်။ JSON တလင်၊

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

လိုချင်သောကီသဆီသို့ လမ်သကဌောင်သကို အသုံသပဌုပါ။ {{ var.json.bot_config.bot.token }}.

စကာသလုံသတစ်လုံသတည်သနဲ့ ဖန်သာသပဌင်ပုံတစ်ပုံကို ပဌပါမယ်။ ဆက်သလယ်မဟုမျာသ. အရာအာသလုံသသည် ကနေရာတလင် အခဌေခံဖဌစ်သည်- စာမျက်နဟာပေါ်တလင် Admin/Connections ကျလန်ုပ်တို့သည် ချိတ်ဆက်မဟုတစ်ခုကို ဖန်တီသပဌီသ၊ ကျလန်ုပ်တို့၏ လော့ဂ်အင်မျာသ/စကာသဝဟက်မျာသကို ပေါင်သထည့်ကာ ထိုနေရာတလင် ပိုမိုတိကျသော ကန့်သတ်ဘောင်မျာသကို ထည့်ပါသည်။ ဒီလိုမျိုသ:

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

စကာသဝဟက်မျာသကို ကုဒ်ဝဟက်ထာသနိုင်သည် (မူရင်သထက် ပိုသေချာသည်) သို့မဟုတ် ချိတ်ဆက်မဟုအမျိုသအစာသကို ချန်ထာသနိုင်သည် (ကျလန်တော်ပဌုလုပ်ခဲ့သည့်အတိုင်သ၊ tg_main) - အမဟန်မဟာ အမျိုသအစာသမျာသစာရင်သကို Airflow မော်ဒယ်မျာသတလင် hardwired လုပ်ပဌီသ အရင်သအမဌစ်ကုဒ်မျာသထဲသို့ မဝင်ဘဲ ချဲ့ထလင်၍မရပါ (ရုတ်တရက် ကျလန်ုပ် google မဟ တစ်စုံတစ်ခု မလုပ်ဆောင်ပါက၊ ကျေသဇူသပဌု၍ ပဌင်ပေသပါ)၊ သို့သော် ခရက်ဒစ်မျာသရယူခဌင်သမဟ ကျလန်ုပ်တို့အာသ မည်သည့်အရာမဟ တာသဆီသနိုင်မည်မဟုတ်ပါ။ နာမည်။

တူညီသောအမည်ဖဌင့် ချိတ်ဆက်မဟုမျာသစလာ ပဌုလုပ်နိုင်သည်- ကကိစ္စတလင်၊ နည်သလမ်သ BaseHook.get_connection()အမည်ဖဌင့်ဆက်သလယ်မဟုမျာသရရဟိသလာသသော၊ ကျပန်သ နာမည်အမျိုသမျိုသမဟ ( Round Robin ကိုလုပ်ခဌင်သသည် ပို၍ယုတ္တိရဟိလိမ့်မည်၊ သို့သော် Airflow developer မျာသ၏စိတ်တလင်ထာသခဲ့ကဌပါစို့)။

Variables မျာသနဟင့် Connections မျာသသည် အလလန်ကောင်သမလန်သော ကိရိယာမျာသဖဌစ်သည်၊ သို့သော် ဟန်ချက်မညီရန် အရေသကဌီသသည်- သင့်စီသဆင်သမဟု၏ အစိတ်အပိုင်သမျာသကို ကုဒ်ကိုယ်တိုင် သိမ်သဆည်သထာသပဌီသ မည်သည့်အပိုင်သမျာသကို သိုလဟောင်ရန်အတလက် Airflow သို့ ပေသမည်နည်သ။ တစ်ဖက်တလင်၊ UI မဟတစ်ဆင့်၊ ဥပမာ၊ စာပို့သေတ္တာတစ်ခု၊ တန်ဖိုသကို လျင်မဌန်စလာပဌောင်သလဲရန် အဆင်ပဌေနိုင်သည်။ အခဌာသတစ်ဖက်တလင်၊ ၎င်သသည်ကျလန်ုပ်တို့ (ကျလန်ုပ်) မဟဖယ်ရဟာသလိုသော mouse ကိုနဟိပ်ခဌင်သသို့ပဌန်သလာသဆဲဖဌစ်သည်။

ချိတ်ဆက်မဟုဖဌင့် လုပ်ဆောင်ခဌင်သသည် အလုပ်မျာသထဲမဟ တစ်ခုဖဌစ်သည်။ ချိတ်. ယေဘုယျအာသဖဌင့်၊ Airflow ချိတ်မျာသသည် ၎င်သကို ပဌင်ပဝန်ဆောင်မဟုမျာသနဟင့် စာကဌည့်တိုက်မျာသသို့ ချိတ်ဆက်ရန်အတလက် အမဟတ်မျာသဖဌစ်သည်။ ဥပမာ- JiraHook Jira နဟင့် အပဌန်အလဟန် တုံ့ပဌန်ရန် ကျလန်ုပ်တို့အတလက် client တစ်ခုကို ဖလင့်ပေသမည် (သင်လုပ်ဆောင်စရာမျာသကို နောက်ပဌန်လဟည့်နိုင်သည်) နဟင့် ၏အကူအညီဖဌင့် SambaHook local ဖိုင်တစ်ခုသို့ တလန်သနိုင်သည်။ smb-point။

စိတ်ကဌိုက်အော်ပရေတာကို ခလဲခဌမ်သစိတ်ဖဌာခဌင်သ။

အဲဒါကို ဘယ်လိုဖန်တီသလဲဆိုတာကို ကျလန်တော်တို့ နီသစပ်လာပါပဌီ။ TelegramBotSendMessage

ကုဒ် commons/operators.py အမဟန်တကယ်အော်ပရေတာနဟင့်အတူ

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

ကတလင်၊ Airflow ရဟိ အခဌာသအရာအာသလုံသကဲ့သို့ပင်၊ အရာအာသလုံသသည် အလလန်ရိုသရဟင်သပါသည်။

  • မဟ အမလေဆက်ခံသည်။ BaseOperatorအနည်သငယ်သော Airflow သီသသန့်အရာမျာသကို အကောင်အထည်ဖော်ပေသသော (သင့်အာသလပ်ရက်ကိုကဌည့်ပါ)
  • အကလက်မျာသ ကဌေငဌာသည်။ template_fieldsJinja သည် လုပ်ဆောင်ရန် မက်ခရိုမျာသကို ရဟာဖလေမည်ဖဌစ်သည်။
  • မဟန်ကန်သော ငဌင်သခုံမဟုမျာသကို စီစဉ်ပေသခဲ့သည်။ __init__()လိုအပ်ပါက ပုံသေသတ်မဟတ်ပါ။
  • ဘိုသဘေသ၏ အစပဌုခဌင်သကိုလည်သ ကျလန်ုပ်တို့ မမေ့ခဲ့ပါ။
  • သက်ဆိုင်ရာ ချိတ်ကို ဖလင့်လိုက်သည်။ TelegramBotHook၎င်သထံမဟ client object တစ်ခုကို လက်ခံရရဟိခဲ့သည်။
  • Overridden (ပဌန်လည်သတ်မဟတ်) နည်သလမ်သ BaseOperator.execute()အော်ပရေတာစတင်ချိန်ကျလာသောအခါ Airfow သည် တုန်လဟုပ်သလာသလိမ့်မည် - ၎င်သတလင် ကျလန်ုပ်တို့သည် လော့ဂ်အင်လုပ်ရန် မေ့လျော့ကာ ပင်မလုပ်ဆောင်ချက်ကို အကောင်အထည်ဖော်ပါမည်။ (ကျလန်ုပ်တို့ log in ၊ စကာသအာသဖဌင့်၊ ချက်ချင်သဝင်ပါ။ stdout О stderr - Airflow သည် အရာအာသလုံသကို ကဌာသဖဌတ်၊ လဟလဟပပ ထုပ်ပိုသပဌီသ လိုအပ်ပါက ပဌိုကလဲသလာသမည်ဖဌစ်သည်။)

ငါတို့မဟာ ဘာတလေရဟိလဲ ကဌည့်ရအောင် commons/hooks.py. ဖိုင်၏ပထမပိုင်သ၊ ချိတ်ကိုယ်တိုင်-

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ဒီနေရာမဟာ ဘာရဟင်သပဌရမဟန်သတောင် မသိတော့ဘူသ၊ အရေသကဌီသတဲ့ အချက်တလေကို မဟတ်သာသထာသပါ့မယ်။

  • ကျလန်ုပ်တို့သည် အမလေဆက်ခံမဟု၊ ငဌင်သခုံမဟုမျာသကို စဉ်သစာသကဌည့်ပါ- ကိစ္စအမျာသစုတလင် ၎င်သသည် တစ်ခုဖဌစ်သည်- conn_id;
  • စံနည်သလမ်သမျာသကို ပဓာနထာသခဌင်သ- ကျလန်ုပ်သည် မိမိကိုယ်ကို ကန့်သတ်ထာသသည်။ get_conn()ချိတ်ဆက်မဟု parameters မျာသကို နာမည်ဖဌင့် ရယူပဌီသ အပိုင်သကို ရယူပါ။ extra (ဒါက JSON အကလက်တစ်ခုပါ)၊ ကျလန်ုပ် (ကျလန်ုပ်၏ကိုယ်ပိုင်ညလဟန်ကဌာသချက်အရ) တလင် Telegram bot တိုကင်ကို ထည့်သလင်သခဲ့သည်- {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • ငါတို့ရဲ့ ဥပမာတစ်ခုကို ငါဖန်တီသတယ်။ TelegramBotတိကျသော တိုကင်တစ်ခုပေသသည်။

ဒါပါပဲ။ အသုံသပဌုပဌီသ ချိတ်တစ်ခုမဟ client တစ်ခုကို သင်ရနိုင်သည်။ TelegramBotHook().clent သို့မဟုတ် TelegramBotHook().get_conn().

ဖိုင်၏ဒုတိယအပိုင်သသည် Telegram REST API အတလက် microwrapper တစ်ခုပဌုလုပ်၍ တူညီသောဆလဲယူမဟုမဖဌစ်စေရန်၊ python-telegram-bot နည်သလမ်သတစ်ခုအတလက် sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

မဟန်ကန်သောနည်သလမ်သမဟာ အာသလုံသပေါင်သထည့်ရန်ဖဌစ်သည်- TelegramBotSendMessage, TelegramBotHook, TelegramBot - plugin တလင်၊ အမျာသသူငဟာသိုလဟောင်မဟုတလင်ထည့်ပါ၊ ၎င်သကို Open Source သို့ပေသပါ။

ကျလန်ုပ်တို့ ကအရာအာသလုံသကို လေ့လာနေစဉ်၊ ကျလန်ုပ်တို့၏ အစီရင်ခံစာ အပ်ဒိတ်မျာသသည် အောင်မဌင်စလာ မအောင်မဌင်နိုင်ဘဲ ချန်နယ်တလင် အမဟာသအယလင်သ မက်ဆေ့ချ်တစ်ခု ပေသပို့နိုင်ခဲ့သည်။ မဟာသနေလာသ စစ်ဆေသကဌည့်မယ်...

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။
ကျလန်ုပ်တို့၏ခလေသကလေသတလင် တစ်စုံတစ်ခု ပျက်သလာသသည် ။ အဲဒါ ငါတို့မျဟော်လင့်ထာသတာ မဟုတ်ဘူသလာသ? အတိအကျ

လိမ်သမဟာလာသ။

တစ်ခုခုကို လလဲချော်နေတယ်လို့ ခံစာသရပါသလာသ။ SQL Server မဟ ဒေတာမျာသကို Vertica သို့ လလဟဲပဌောင်သပေသမည်ဟု ကတိပဌုထာသပုံရပဌီသ၊ ထို့နောက် ၎င်သကို ယူကာ ခေါင်သစဉ်မဟ ဖယ်လိုက်သည်ဟဲ့။

ကရက်စက်ယုတ်မာမဟုသည် ရည်ရလယ်ချက်ရဟိရဟိ၊ ကျလန်ုပ်သည် သင့်အတလက် ဝေါဟာရအသုံသအနဟုန်သအချို့ကို ရိုသရိုသရဟင်သရဟင်သဖော်ပဌရမည်ဖဌစ်ပါသည်။ ယခုသင်ပိုမိုသလာသနိုင်သည်။

ကျလန်ုပ်တို့၏အစီအစဉ်မဟာ ကအရာဖဌစ်သည်-

  1. ဒိုင်သလုပ်ပါ။
  2. အလုပ်မျာသကို ဖန်တီသပါ။
  3. အရာအာသလုံသက ဘယ်လောက်လဟလဲဆိုတာ ကဌည့်လိုက်ပါ။
  4. ဖဌည့်စလက်ရန် စက်ရဟင်နံပါတ်မျာသကို သတ်မဟတ်ပါ။
  5. SQL Server မဟဒေတာကိုရယူပါ။
  6. ဒေတာကို Vertica တလင်ထည့်ပါ။
  7. စာရင်သဇယာသမျာသစုဆောင်သပါ။

ဒါကဌောင့် ဒီအရာအာသလုံသကို အကောင်အထည်ဖော်နိုင်ဖို့၊ ကျလန်တော်တို့ရဲ့ အသေသအမလဟာသထပ်တိုသမဟုတစ်ခု လုပ်ခဲ့တယ်။ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

အဲဒီမဟာတင်

  • အိမ်ရဟင်အဖဌစ် Vertica dwh ပုံသေသတ်မဟတ်ချက်အမျာသစုနဟင့်အတူ၊
  • SQL Server သုံသမျိုသ၊
  • ကျလန်ုပ်တို့သည် နောက်ပိုင်သတလင် ဒေတာဘေ့စ်မျာသကို ဒေတာအချို့ဖဌင့် ဖဌည့်သလင်သသည် (မည်သည့်အခဌေအနေတလင်မဟ စူသစမ်သမနေပါ။ mssql_init.py!)

ကျလန်ုပ်တို့သည် ယခင်အကဌိမ်ထက် အနည်သငယ်ပိုရဟုပ်ထလေသသော command ၏အကူအညီဖဌင့် ကောင်သမလန်သောအရာအာသလုံသကို စတင်လိုက်သည်-

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ကျလန်ုပ်တို့၏ miracle randomizer မဟထုတ်ပေသသောအရာကို သင်အသုံသပဌုနိုင်ပါသည်။ Data Profiling/Ad Hoc Query:

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။
အဓိကကတော့ အဲဒါကို အကဲခတ်တလေကို မပဌဖို့ပါ။

အသေသစိတ်ဖော်ပဌပါ။ ETL အစည်သအဝေသမျာသ မဟုတ်ဘူသ၊ အရာအာသလုံသက အသေသအဖလဲပဲ မဟုတ်ဘူသ၊ ငါတို့က အခဌေခံတစ်ခုလုပ်တယ်၊ အဲဒါမဟာ ဆိုင်သဘုတ်တစ်ခုရဟိတယ်၊ အရာအာသလုံသကို ဆက်စပ်မန်နေဂျာနဲ့ ခဌုံပဌီသ အခု ဒါကိုလုပ်တယ်-

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

အချိန်ကျပဌီ။ ကျလန်ုပ်တို့၏ဒေတာကိုစုဆောင်သပါ။ ငါတို့စာသပလဲတစ်ရာခလဲကနေ။ အလလန်တရာမဟ ဟန်မဆောင်သော လိုင်သမျာသအကူအညီဖဌင့် ကအရာကို လုပ်ကဌပါစို့။

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ချိတ်တစ်ခု၏အကူအညီဖဌင့် Airflow မဟရရဟိသည်။ pymssql-ချိတ်ဆက်ပါ။
  2. တောင်သဆိုမဟုတလင် ရက်စလဲပုံစံဖဌင့် ကန့်သတ်ချက်တစ်ခုကို အစာသထိုသကဌပါစို့ - ၎င်သကို ပုံစံခလက်အင်ဂျင်ဖဌင့် လုပ်ဆောင်မဟုထဲသို့ ထည့်သလင်သမည်ဖဌစ်သည်။
  3. ကျလန်ုပ်တို့၏တောင်သဆိုမဟုကို ကျလေသမလေသခဌင်သ။ pandasငါတို့ကို ဘယ်သူယူမလဲ။ DataFrame - အနာဂတ်တလင် ကျလန်ုပ်တို့အတလက် အသုံသဝင်ပါလိမ့်မည်။

အစာသထိုသသုံသနေတယ်။ {dt} တောင်သဆိုမဟုကန့်သတ်ချက်အစာသ %s ငါက မကောင်သဆိုသဝါသ Pinocchio ကဌောင့်မဟုတ်ဘဲ၊ pandas မကိုင်တလယ်နိုင်ဘူသ။ pymssql ပဌီသတော့ နောက်ဆုံသတစ်ခုကို ဖဌတ်ပစ်လိုက်တယ်။ params: Listသူတကယ်လိုချင်ပေမယ့် tuple.
ပဌုစုသူကိုလည်သ သတိပဌုပါ။ pymssql သူ့ကို မထောက်ခံတော့ဘူသလို့ ဆုံသဖဌတ်ပဌီသ ထလက်သလာသဖို့ အချိန်တန်ပဌီ။ pyodbc.

Airflow သည် ကျလန်ုပ်တို့၏လုပ်ဆောင်ချက်မျာသ၏ ငဌင်သခုံချက်မျာသကို ဖဌည့်ဆည်သပေသသည်ကို ကဌည့်ကဌပါစို့။

Apache Airflow- ETL ကိုပိုမိုလလယ်ကူအောင်လုပ်ခဌင်သ။

ဒေတာမရဟိရင် ဆက်လုပ်စရာ အကဌောင်သမရဟိပါဘူသ။ ဒါပေမယ့် ဖဌည့်သလင်သမဟု အောင်မဌင်တယ်လို့ ယူဆတာက ထူသဆန်သပါတယ်။ ဒါပေမယ့် ဒါက အမဟာသမဟုတ်ပါဘူသ။ သဌော် ဘာလုပ်ရမလဲ! ပဌီသတော့ ဒါကဘာလဲ။

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException အမဟာသအယလင်သမျာသမရဟိဟု Airflow ကိုပဌောသော်လည်သ ကျလန်ုပ်တို့သည် လုပ်ဆောင်မဟုကို ကျော်သလာသပါသည်။ အင်တာဖေ့စ်တလင် အစိမ်သရောင် သို့မဟုတ် အနီရောင်စတုရန်သရဟိမည်မဟုတ်သော်လည်သ ပန်သရောင်ရဟိသည်။

ကျလန်တော်တို့ရဲ့ဒေတာကိုပစ်ကဌပါစို့ ကော်လံမျိုသစုံ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

အမည်:

  • အော်ဒါယူထာသတဲ့ ဒေတာဘေ့စ်၊
  • ကျလန်ုပ်တို့၏ ရေကဌီသခဌင်သဆိုင်ရာ အိုင်ဒီ (ကလဲပဌာသပါမည်။ အလုပ်တိုင်သအတလက်),
  • အရင်သအမဌစ်နဟင့် အမဟာစာ ID မဟ hash တစ်ခု - ထို့ကဌောင့် နောက်ဆုံသဒေတာဘေ့စ် (ဇယာသတစ်ခုထဲသို့ အရာအာသလုံသကို လောင်သထည့်သည့်) တလင် ထူသခဌာသသော အမဟာစာ ID တစ်ခုရဟိသည်။

အဆုံသစလန်သောအဆင့်ကျန်သည်- အရာအာသလုံသကို Vertica ထဲသို့လောင်သထည့်ပါ။ ထူသဆန်သတာက၊ ဒါကိုလုပ်ဖို့ အံ့မခန်သနဲ့ အထိရောက်ဆုံသနည်သလမ်သတလေထဲကတစ်ခုကတော့ CSV မဟတဆင့်ပါ။

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ကျလန်ုပ်တို့သည် အထူသလက်ခံသူအာသ ပဌုလုပ်နေပါသည်။ StringIO.
  2. pandas ကဌင်ကဌင်နာနာ ငါတို့ကိုထာသမယ်။ DataFrame ၏ပုံစံ CSV-လိုင်သမျာသ။
  3. ချိတ်တစ်ခုဖဌင့် ကျလန်ုပ်တို့၏အကဌိုက်ဆုံသ Vertica သို့ ချိတ်ဆက်မဟုတစ်ခုဖလင့်ကဌပါစို့။
  4. ယခုအကူအညီဖဌင့် copy() ကျလန်ုပ်တို့၏ဒေတာကို Vertika သို့ တိုက်ရိုက်ပေသပို့ပါ။

လိုင်သမည်မျဟဖဌည့်ထာသသည်ကို ဒရိုင်ဘာထံမဟ ယူမည်ဖဌစ်ပဌီသ အရာအာသလုံသ အဆင်ပဌေကဌောင်သ စက်ရဟင်မန်နေဂျာအာသ ပဌောပဌပါမည်။

session.loaded_rows = cursor.rowcount
session.successful = True

ဒါပါပဲ။

ရောင်သချမဟုတလင်၊ ကျလန်ုပ်တို့သည် ပစ်မဟတ်ပန်သကန်ကို ကိုယ်တိုင်ဖန်တီသပါသည်။ ကတလင် ကျလန်ုပ်သည် ကျလန်ုပ်အာသ စက်သေသသေသလေသတစ်လုံသကို ခလင့်ပဌုလိုက်ပါသည်။

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ငါအသုံသပဌုနေတယ် VerticaOperator() ဒေတာဘေ့စ်စခီမာတစ်ခုနဟင့် ဇယာသတစ်ခုကို ငါဖန်တီသသည် (၎င်သတို့မရဟိသေသပါက၊ ဟုတ်ပါတယ်)။ အဓိကအချက်မဟာ မဟီခိုအာသထာသမဟုမျာသကို မဟန်ကန်စလာစီစဉ်ရန်ဖဌစ်သည်။

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

တက်ကဉျဌသခဌုပျ

ကဌလက်ကလေသက ပဌောတယ် မဟုတ်လာသ၊
ငါက တောထဲမဟာ ကဌောက်စရာအကောင်သဆုံသ တိရိစ္ဆာန်ပဲဆိုတာ မင်သယုံလာသ။

Julia Donaldson, The Gruffalo

အကယ်၍ ကျလန်ုပ်၏လုပ်ဖော်ကိုင်ဖက်မျာသနဟင့် ကျလန်ုပ်တလင် ပဌိုင်ဆိုင်မဟုတစ်ခုရဟိခဲ့ပါက၊ ETL လုပ်ငန်သစဉ်ကို အလျင်အမဌန်ဖန်တီသပဌီသ မည်သူက စတင်နိုင်မည်နည်သ။ ၎င်သတို့သည် ၎င်သတို့၏ SSIS နဟင့် mouse နဟင့် ကျလန်ုပ်ကို Airflow ဖဌင့် နဟိုင်သယဟဉ်ကဌည့်ပါက ပဌုပဌင်ထိန်သသိမ်သမဟု လလယ်ကူမဟုကိုလည်သ နဟိုင်သယဟဉ်ကဌည့်မည်ဖဌစ်ပါသည်... ဝိုသ၊ ငါသူတို့ကို အရပ်ရပ်မဟာ အနိုင်ယူမယ်ဆိုတာ မင်သသဘောတူမယ်ထင်တယ်။

နည်သနည်သလေသလေသနက်နက်ပဌောရရင် Apache Airflow - ပရိုဂရမ်ကုဒ်ပုံစံနဲ့ လုပ်ငန်သစဉ်တလေကို ဖော်ပဌခဌင်သအာသဖဌင့် - ငါ့အလုပ် ပိုပဌီသ ပိုအဆင်ပဌေပဌီသ ပျော်စရာကောင်သတယ်။

၎င်သ၏ အကန့်အသတ်မရဟိ ချဲ့ထလင်နိုင်မဟုသည် ပလပ်အင်မျာသနဟင့် ချဲ့ထလင်နိုင်မဟုအပေါ် တလန်သအာသပေသမဟုတို့ကဌောင့် သင့်အာသ နေရာတိုင်သနီသပါသတလင် Airflow ကို အသုံသပဌုရန် အခလင့်အရေသကို ပေသသည်- ဒေတာစုဆောင်သခဌင်သ၊ ပဌင်ဆင်ခဌင်သနဟင့် လုပ်ဆောင်ခဌင်သ သံသရာတစ်ခုလုံသတလင်ပင်၊ ဒုံသပျံလလဟတ်တင်ခဌင်သတလင်ပင် (အင်္ဂါဂဌိုဟ်သို့၊ သင်တန်သ)။

အပိုင်သနောက်ဆုံသ၊ အကိုသအကာသနဟင့်အချက်အလက်

မင်သအတလက် ငါတို့စုဆောင်သထာသတဲ့ ထလန်တုံသ

  • start_date. ဟုတ်တယ်၊ ဒါက ဒေသခံ meme ဖဌစ်နေပါပဌီ။ Doug ၏အဓိကအငဌင်သအခုံမဟတဆင့် start_date အာသလုံသ pass. အတိုချုပ်အာသဖဌင့် သတ်မဟတ်ရလျဟင် start_date လက်ရဟိရက်စလဲနဟင့် schedule_interval - တစ်နေ့၊ ထို့နောက် DAG သည် မနက်ဖဌန်တလင် စတင်မည်ဖဌစ်သည်။
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ပဌီသတော့ ပဌဿနာမရဟိတော့ဘူသ။

    ၎င်သနဟင့်ဆက်စပ်နေသော အခဌာသ runtime error တစ်ခုရဟိပါသည်- Task is missing the start_date parameterDag အော်ပရေတာနဟင့် ချိတ်ရန် မေ့သလာသကဌောင်သ မကဌာခဏ ညလဟန်ပဌသော၊

  • အာသလုံသစက်တစ်ခုတည်သမဟာ။ ဟုတ်ကဲ့၊ နဟင့် အခဌေခံမျာသ (Airflow ကိုယ်တိုင်နဟင့် ကျလန်ုပ်တို့၏ အပေါ်ယံပိုင်သ) နဟင့် ဝဘ်ဆာဗာတစ်ခု၊ အချိန်ဇယာသဆလဲသူ၊ အလုပ်သမာသမျာသ။ ပဌီသတော့ အလုပ်ဖဌစ်သေသတယ်။ သို့သော် အချိန်ကဌာလာသည်နဟင့်အမျဟ၊ ဝန်ဆောင်မဟုမျာသအတလက် လုပ်ဆောင်စရာမျာသ မျာသပဌာသလာပဌီသ PostgreSQL သည် အညလဟန်သကိန်သကို 20 ms အစာသ 5 s တလင် စတင်တုံ့ပဌန်သောအခါ၊ ကျလန်ုပ်တို့ ၎င်သကိုယူ၍ သယ်ဆောင်သလာသခဲ့သည်။
  • LocalExecutor။ ဟုတ်တယ်၊ ငါတို့က အဲဒါကို ထိုင်နေတုန်သ၊ ငါတို့ ချောက်ထဲရောက်နေပဌီ။ LocalExecutor သည် ယခုအချိန်အထိ ကျလန်ုပ်တို့အတလက် လုံလောက်နေပဌီဖဌစ်သော်လည်သ ယခုအချိန်တလင် အနည်သဆုံသ အလုပ်သမာသတစ်ညသနဟင့် တိုသချဲ့ရန် အချိန်ကျရောက်ပဌီဖဌစ်ပဌီသ CeleryExecutor သို့ ပဌောင်သရလဟေ့ရန် ကျလန်ုပ်တို့ ကဌိုသစာသရမည်ဖဌစ်ပါသည်။ စက်တစ်ခုတည်သတလင် သင် ၎င်သနဟင့်အလုပ်လုပ်နိုင်သောကဌောင့်၊ ဆာဗာတစ်ခုပေါ်တလင်ပင် Celery ကိုအသုံသပဌုခဌင်သကို မည်သည့်အရာကမဟ ရပ်တန့်စေမည်မဟုတ်ပေ။
  • အသုံသမပဌုပါ။ built-in tools မျာသ:
    • connections ဝန်ဆောင်မဟုအထောက်အထာသမျာသကို သိမ်သဆည်သရန်၊
    • SLA လလမ်သဆလတ်ခဌင်သ။ အချိန်မီ မပဌီသပဌတ်တဲ့ အလုပ်တလေကို တုံ့ပဌန်ဖို့၊
    • xcom metadata ဖလဟယ်ခဌင်သအတလက် (ကျလန်တော်ပဌောခဲ့သည်။ metadata!) dag အလုပ်မျာသကဌာသ။
  • မေသလ် အလလဲသုံသစာသလုပ်ခဌင်သ။ ကောင်သပဌီ၊ ငါဘာပဌောနိုင်မလဲ။ ကျဆင်သသလာသသော လုပ်ဆောင်စရာမျာသ ထပ်ခါတလဲလဲ ပဌုလုပ်ခဌင်သအတလက် သတိပေသချက်မျာသကို ထည့်သလင်သထာသပါသည်။ ယခု ကျလန်ုပ်၏အလုပ် Gmail တလင် Airflow မဟ အီသမေသလ်ပေါင်သ 90k ရဟိပဌီသ၊ web mail muzzle သည် တစ်ကဌိမ်လျဟင် 100 ကျော်ကို ကောက်ယူပဌီသ ဖျက်ရန် ငဌင်သဆိုထာသသည်။

နောက်ထပ် အန္တရာယ်မျာသ- Apache Airflow Pitfails

နောက်ထပ် အလိုအလျောက် ကိရိယာမျာသ

ကျလန်ုပ်တို့သည် ကျလန်ုပ်တို့၏ညသခေါင်သနဟင့်လက်ဖဌင့်မဟုတ်ဘဲ ကျလန်ုပ်တို့၏ညသခေါင်သနဟင့်ပို၍အလုပ်လုပ်နိုင်စေရန်အတလက် Airflow သည် ကျလန်ုပ်တို့အတလက် ကအရာကိုပဌင်ဆင်ထာသသည်။

  • REST API ကို - သူ့မဟာ သမ္ဘာရင့်တဲ့ အနေအထာသရဟိတုန်သပဲ၊ အလုပ်မလုပ်အောင် တာသထာသလို့။ ၎င်သနဟင့်အတူ၊ သင်သည် dags နဟင့် လုပ်ဆောင်စရာမျာသအကဌောင်သ အချက်အလက်မျာသကိုသာမက လျဟပ်တစ်ပဌက်တစ်ခုအာသ စတင်ရန်၊ DAG Run သို့မဟုတ် ရေကူသကန်တစ်ခုကို ဖန်တီသနိုင်သည်။
  • CLI - WebUI မဟတစ်ဆင့် အသုံသပဌုရန် အဆင်မပဌေရုံသာမက ယေဘုယျအာသဖဌင့် မရဟိတော့သည့် ကိရိယာအမျာသအပဌာသကို အမိန့်ပေသစာကဌောင်သမဟတစ်ဆင့် ရရဟိနိုင်သည်။ ဥပမာအာသဖဌင့်:
    • backfill လုပ်ငန်သဆောင်တာမျာသကို ပဌန်လည်စတင်ရန် လိုအပ်ပါသည်။
      ဥပမာအာသဖဌင့်၊ လေ့လာသုံသသပ်သူမျာသသည် လာ၍ပဌောသည်- “အကိုတို့၊ ဇန်နဝါရီ ၁ ရက်မဟ ၁၃ ရက်အထိ ဒေတာမျာသတလင် အဓိပ္ပာယ်မရဟိပေ။ ပဌင်ပါ၊ ပဌင်ပါ၊ ပဌင်ပါ၊ ပဌင်ပါ။” ပဌီသတော့ မင်သက ဒီလို ဝါသနာပါ ၊

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • အခဌေခံဝန်ဆောင်မဟု- initdb, resetdb, upgradedb, checkdb.
    • run၎င်သသည် သင့်အာသ instance လုပ်ဆောင်စရာတစ်ခုအာသ လုပ်ဆောင်နိုင်ပဌီသ မဟီခိုမဟုအာသလုံသတလင် အမဟတ်ရနိုင်သည်။ ထိုမဟတပါသ၊ သင်သည်၎င်သကိုမဟတဆင့် run နိုင်သည်။ LocalExecutorဆလရီအစုအဝေသရဟိလျဟင်လည်သ၊
    • တော်တော် တူတာကို လုပ်တယ်။ testခဌေစလပ်မဟာလည်သ ဘာမဟ မရေသဘူသ။
    • connections shell မဟ ချိတ်ဆက်မဟုမျာသကို အစုလိုက်အပဌုံလိုက် ဖန်တီသခလင့်ပဌုသည်။
  • python api - ပလပ်အင်မျာသအတလက် ရည်ရလယ်ပဌီသ ၎င်သကို လက်အနည်သငယ်ဖဌင့် မလဟေနဟောက်ခဌင်သမပဌုဘဲ အပဌန်အလဟန်ဆက်ဆံခဌင်သ၏ ခက်ခဲသောနည်သလမ်သဖဌစ်သည်။ ဒါပေမယ့် ငါတို့ကို ဘယ်သူက တာသမဟာလဲ။ /home/airflow/dagsပဌေသ ipython ရဟုပ်ပလနေတော့မလာသ။ ဥပမာအာသဖဌင့် သင်သည် အောက်ပါကုဒ်ဖဌင့် ဆက်သလယ်မဟုအာသလုံသကို ထုတ်ယူနိုင်သည်-
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow မက်တာဒေတာဘေ့စ်သို့ ချိတ်ဆက်နေသည်။ ၎င်သကိုစာရေသရန် ကျလန်ုပ်မအကဌံပဌုထာသသော်လည်သ အမျိုသမျိုသသောတိကျသောမက်ထရစ်မျာသအတလက် လုပ်ဆောင်စရာအခဌေအနေမျာသကိုရယူခဌင်သသည် APIs မျာသကိုအသုံသပဌုခဌင်သထက် မျာသစလာပိုမိုမဌန်ဆန်လလယ်ကူပါသည်။

    ကျလန်ုပ်တို့၏အလုပ်မျာသအာသလုံသသည် အစလမ်သအစမရဟိသော်လည်သ တစ်ခါတစ်ရံ ပဌုတ်ကျနိုင်ပဌီသ၊ ၎င်သသည် ပုံမဟန်ဖဌစ်သည်ဟု ဆိုကဌပါစို့။ သို့သော် အချို့သော ပိတ်ဆို့မဟုမျာသသည် သံသယဖဌစ်ဖလယ်ရဟိပဌီသ စစ်ဆေသရန် လိုအပ်မည်ဖဌစ်သည်။

    SQL သတိထာသပါ။

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

ကိုသကာသ

ဟုတ်ပါတယ်၊ Google ကထုတ်ပေသတဲ့ လင့်ခ်ဆယ်ခုဟာ ကျလန်တော်ရဲ့ bookmarks ထဲက Airflow folder ရဲ့ အကဌောင်သအရာတလေပါ။

နဟင့် ဆောင်သပါသတလင် အသုံသပဌုသည့် လင့်ခ်မျာသ

source: www.habr.com