Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

မင်္ဂလာပါ၊ ကျွန်ုပ်သည် Vezet ကုမ္ပဏီအုပ်စု၏ Analytics ဌာနမှ ဒေတာအင်ဂျင်နီယာ Dmitry Logvinenko ဖြစ်ပါသည်။

ETL လုပ်ငန်းစဉ်များ ဖော်ဆောင်ရန်အတွက် အံ့သြဖွယ်ကောင်းသော ကိရိယာတစ်ခုဖြစ်သည့် Apache Airflow ကို သင့်အား ပြောပြပါမည်။ သို့သော် Airflow သည် စွယ်စုံရနှင့် ဘက်စုံသုံးသောကြောင့် သင်သည် ဒေတာစီးဆင်းမှုတွင် မပါဝင်သော်လည်း ၎င်းကို အနီးကပ်ကြည့်ရှုသင့်သော်လည်း မည်သည့်လုပ်ငန်းစဉ်ကိုမဆို အချိန်အခါအလိုက် စတင်လုပ်ဆောင်ရန်နှင့် ၎င်းတို့၏လုပ်ဆောင်မှုကို စောင့်ကြည့်ရန် လိုအပ်ပါသည်။

ဟုတ်ပါတယ်၊ ငါပြောပြရုံတင်မကဘဲ၊ ပရိုဂရမ်မှာ ကုဒ်တွေ၊ ဖန်သားပြင်ဓာတ်ပုံတွေနဲ့ အကြံပြုချက်တွေ အများကြီးပါရှိပါတယ်။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။
Airflow / Wikimedia Commons ဟူသော စကားလုံးကို Google တွင် တွေ့ရတတ်သည်။

မာတိကာ

နိဒါန်း

Apache Airflow သည် Django နှင့်တူသည်။

  • python နဲ့ရေးထားတယ်။
  • ကောင်းမွန်တဲ့ admin panel တစ်ခုရှိပါတယ်၊
  • အကန့်အသတ်မရှိ ချဲ့နိုင်သည်။

- သာ၍သာ၍ ခြားနားသော ရည်ရွယ်ချက်ဖြင့် ပြုလုပ်ထားခြင်းဖြစ်သည်၊ အတိအကျဆိုရသော် (ကတ္တီပါဒ၌ ရေးထားသကဲ့သို့)။

  • အကန့်အသတ်မရှိ စက်များပေါ်တွင် အလုပ်လုပ်ခြင်းနှင့် စောင့်ကြည့်ခြင်းလုပ်ငန်းတာဝန်များ (ဆလရီ / Kubernetes အများအပြားနှင့် သင့်အသိစိတ်က သင့်ကိုခွင့်ပြုသည်)
  • အလွန်လွယ်ကူသော Python ကုဒ်မှရေးသားရန်နှင့်နားလည်ရန် dynamic workflow မျိုးဆက်နှင့်အတူ
  • နှင့် အဆင်သင့်လုပ်ထားသော အစိတ်အပိုင်းများနှင့် အိမ်လုပ်ပလပ်အင်များ (အလွန်ရိုးရှင်းသော) ကို အသုံးပြု၍ မည်သည့်ဒေတာဘေ့စ်နှင့် API များကိုမဆို ချိတ်ဆက်နိုင်စေပါသည်။

ကျွန်ုပ်တို့သည် ဤကဲ့သို့သော Apache Airflow ကိုအသုံးပြုသည်-

  • ကျွန်ုပ်တို့သည် အမျိုးမျိုးသောရင်းမြစ်များမှ အချက်အလက်များကို စုဆောင်းပါသည် (များစွာသော SQL Server နှင့် PostgreSQL ဖြစ်ရပ်များ၊ အက်ပ်မက်ထရစ်များပါသည့် API အမျိုးမျိုး၊ DWH နှင့် ODS တို့တွင်ပင် 1C) (ကျွန်ုပ်တို့တွင် Vertica နှင့် Clickhouse ရှိသည်)။
  • ဘယ်လောက်အဆင့်မြင့်လဲ။ cronODS တွင် ဒေတာစုစည်းမှု လုပ်ငန်းစဉ်များကို စတင်ကာ ၎င်းတို့၏ ပြုပြင်ထိန်းသိမ်းမှုများကိုလည်း စောင့်ကြည့်သည်။

မကြာသေးမီအထိ၊ ကျွန်ုပ်တို့၏လိုအပ်ချက်များကို 32 cores နှင့် 50 GB RAM ပါရှိသော ဆာဗာငယ်လေးတစ်ခုက ဖြည့်ဆည်းပေးခဲ့သည်။ Airflow တွင်၊ ၎င်းသည် အလုပ်လုပ်သည်-

  • йОНоо 200 ရက် (တကယ်တော့ ကျွန်တော်တို့ အလုပ်တွေကို ထုပ်ပိုးထားတဲ့ workflows)၊
  • တစ်ခုချင်းစီတွင်ပျမ်းမျှ အလုပ် ၇၀,
  • ဤကောင်းမြတ်မှုသည် (ပျမ်းမျှအားဖြင့်လည်း) စတင်သည် တစ်နာရီတစ်ခါ.

ချဲ့ထွင်ပုံနှင့် ပတ်သက်၍ အောက်တွင် ကျွန်ုပ် ရေးပါမည်၊ သို့သော် ယခု ဖြေရှင်းမည့် über-problem ကို သတ်မှတ်ကြပါစို့။

မူလ SQL Server သုံးခုရှိပြီး တစ်ခုစီတွင် ဒေတာဘေ့စ် 50 ပါရှိသည် - ပရောဂျက်တစ်ခု၏ဥပမာအသီးသီးတွင် ၎င်းတို့တွင်တူညီသောဖွဲ့စည်းပုံ (mua-ha-ha)၊ ဆိုလိုသည်မှာ တစ်ခုစီတွင် အမှာစာဇယားတစ်ခုရှိသည် (ကံကောင်းသည်မှာ၊ ၎င်းတွင် ဇယားတစ်ခုရှိသည်။ အမည်ကို မည်သည့်လုပ်ငန်းတွင်မဆို ထည့်သွင်းနိုင်သည်။) ကျွန်ုပ်တို့သည် ဝန်ဆောင်မှုနယ်ပယ်များ (ရင်းမြစ်ဆာဗာ၊ အရင်းအမြစ်ဒေတာဘေ့စ်၊ ETL အလုပ် ID) ကိုပေါင်းထည့်ခြင်းဖြင့် ဒေတာကိုယူကာ ၎င်းတို့ကို Vertica ဟု လိမ်မာစွာပြောပါ။

သွားစို့!

အဓိက အပိုင်းကတော့ လက်တွေ့ (သီအိုရီ အနည်းငယ်)၊

ဘာကြောင့် ငါတို့ (မင်းနဲ့)

သစ်ပင်ကြီးတွေက ကြီးလာတော့ ရိုးရိုးရှင်းရှင်းပဲ။ SQL-schik သည် ရုရှားလက်လီအရောင်းဆိုင်တစ်ခုတွင်၊ ကျွန်ုပ်တို့အတွက်ရရှိနိုင်သည့်ကိရိယာနှစ်ခုကိုအသုံးပြု၍ ETL လုပ်ငန်းစဉ်များ (ခေါ်) ဒေတာစီးဆင်းမှုကို လှည့်စားခဲ့သည်-

  • Informatica ပါဝါစင်တာ - ၎င်း၏ကိုယ်ပိုင်ဟာ့ဒ်ဝဲ၊ ၎င်း၏ကိုယ်ပိုင်ဗားရှင်းဖြင့်၊ အလွန်အကျိုးဖြစ်ထွန်းသော အလွန်ပြန့်ပွားသောစနစ်။ သူ့ရဲ့ စွမ်းဆောင်နိုင်ရည် 1% ကို ဘုရားသခင် တားမြစ်ခဲ့တယ်။ အဘယ်ကြောင့်? ကောင်းပြီ၊ ပထမအချက်၊ ၂၀၀၀ ပြည့်လွန်နှစ်များဆီက တစ်နေရာရာမှာ ဒီအင်တာဖေ့စ်က ကျွန်ုပ်တို့ကို စိတ်ပိုင်းဆိုင်ရာ ဖိအားဖြစ်စေပါတယ်။ ဒုတိယအနေနှင့်၊ ဤ contraption သည် အလွန်ဆန်းကြယ်သော လုပ်ငန်းစဉ်များ၊ ဒေါသကြီးသော အစိတ်အပိုင်းကို ပြန်လည်အသုံးပြုခြင်းနှင့် အခြားသော အလွန်အရေးကြီးသော လုပ်ငန်း-လှည့်ကွက်များအတွက် ဒီဇိုင်းထုတ်ထားသည်။ Airbus A380 / year ၏ တောင်ပံကဲ့သို့ ကုန်ကျစရိတ်နှင့် ပတ်သက်၍ ကျွန်ုပ်တို့ ဘာမှ မပြောလိုပါ။

    သတိထားပါ၊ ဖန်သားပြင်ဓာတ်ပုံသည် အသက် 30 နှစ်အောက် လူများကို အနည်းငယ် ထိခိုက်နိုင်သည်။

    Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

  • SQL Server ပေါင်းစည်းခြင်း ဆာဗာ - ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ စီမံကိန်းအတွင်း စီးဆင်းမှုတွင် ဤရဲဘော်ကို အသုံးပြုခဲ့သည်။ အမှန်တော့၊ ကျွန်ုပ်တို့သည် SQL Server ကို အသုံးပြုထားပြီးဖြစ်ပြီး ၎င်း၏ ETL ကိရိယာများကို အသုံးမပြုခြင်းသည် တစ်နည်းနည်းဖြင့် ယုတ္တိမရှိပေ။ ၎င်းတွင်အရာအားလုံးကောင်းသည်- အင်တာဖေ့စ်နှစ်ခုစလုံးသည်လှပသည်၊ တိုးတက်မှုအစီရင်ခံစာများဖြစ်သည် ... သို့သော်ကျွန်ုပ်တို့သည်ဆော့ဖ်ဝဲထုတ်ကုန်များကိုနှစ်သက်ကြသည်၊ ဤအတွက်ကြောင့်မဟုတ်ပါ။ ဗားရှင်း dtsx (သိမ်းဆည်းရန် node များပါသော XML ဟူသည်) ကျွန်ုပ်တို့ တတ်နိုင်သည်၊ သို့သော် အဘယ်ကြောင့်နည်း။ ဆာဗာတစ်ခုမှ တစ်ခုသို့ စားပွဲရာပေါင်းများစွာကို ဆွဲငင်နိုင်စေမည့် Task Package တစ်ခုကို မည်သို့ပြုလုပ်ရမည်နည်း။ ဟုတ်တယ်၊ မောက်စ်ခလုတ်ကိုနှိပ်လိုက်၊ မင်းလက်ညိုးနှစ်ဆယ်ကနေ ပြုတ်ကျသွားလိမ့်မယ်။ ဒါပေမယ့် သေချာတာကတော့ ပိုဖက်ရှင်ကျပုံပါပဲ။

    Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ငါတို့သေချာပေါက်ထွက်လမ်းရှာတယ်။ ဖြစ်ရပ်မှန် နီးပါး ကိုယ်တိုင်ရေးထားတဲ့ SSIS package generator တစ်ခုရောက်လာတယ်..။

ပြီးတော့ အလုပ်သစ်တစ်ခုရှာတယ်။ ပြီးတော့ Apache Airflow က ကျွန်တော့်ကို ကျော်သွားတယ်။

ETL လုပ်ငန်းစဉ်ဖော်ပြချက်များသည် ရိုးရှင်းသော Python ကုဒ်ဖြစ်ကြောင်း ကျွန်ုပ်တွေ့ရှိသောအခါတွင် ကျွန်ုပ်သည် ပျော်ရွှင်စွာ မခုန်ခဲ့ပါ။ ဤသည်မှာ ဒေတာစီးကြောင်းများကို ဗားရှင်းပြောင်းလဲပြီး ကွဲပြားသွားကာ၊ ဒေတာဘေ့စ်ရာပေါင်းများစွာမှ ပစ်မှတ်တစ်ခုသို့ တစ်ခုတည်းဖွဲ့စည်းပုံပါရှိသော ဇယားများကို 13” ဖန်သားပြင်တစ်ခုနှင့်တစ်ခုခွဲ သို့မဟုတ် နှစ်ခုတွင် Python ကုဒ်၏ကိစ္စဖြစ်လာခဲ့သည်။

အစုအဝေးကို စုစည်းခြင်း။

လုံးဝ သူငယ်တန်းကို မစီစဉ်ရအောင်၊ Airflow ကို တပ်ဆင်ခြင်း၊ သင်ရွေးချယ်ထားသော ဒေတာဘေ့စ်၊ ဆလရီ နှင့် docks တွင်ဖော်ပြထားသော အခြားကိစ္စများကဲ့သို့ ဤနေရာတွင် လုံးဝသိသာထင်ရှားသောအရာများအကြောင်း မပြောပါနှင့်။

စမ်းသပ်မှုတွေကို ချက်ချင်းစတင်နိုင်စေဖို့ ကျွန်တော် ပုံကြမ်းဆွဲခဲ့တယ်။ docker-compose.yml ထိုအထဲတွင်-

  • အမှန်တကယ် မြှင့်တင်ကြပါစို့ လေစီးကြောင်း: အစီအစဉ်ဆွဲသူ၊ Webserver။ ပန်းပွင့်သည် ဆလရီအလုပ်များကို စောင့်ကြည့်ရန် ထိုနေရာတွင် လှည့်ပတ်နေလိမ့်မည် (၎င်းကို တွန်းပို့ထားပြီးဖြစ်သောကြောင့် ဖြစ်သည်။ apache/airflow:1.10.10-python3.7ဒါပေမယ့် ငါတို့ စိတ်မ၀င်စားဘူး)
  • PostgreSQLAirflow သည် ၎င်း၏ဝန်ဆောင်မှုအချက်အလက်များ (အစီအစဉ်ဆွဲသူဒေတာ၊ လုပ်ဆောင်မှုစာရင်းအင်းစသည်ဖြင့်) ကိုရေးသားမည်ဖြစ်ပြီး၊ Celery သည် ပြီးမြောက်သည့်အလုပ်များကို အမှတ်အသားပြုမည်ဖြစ်သည်။
  • Redisဆလရီအတွက် အလုပ်ပွဲစားအဖြစ် ဆောင်ရွက်မည့်၊
  • တရုတ်နံနံ အလုပ်သမားတိုက်ရိုက်လုပ်ဆောင်ရမည့် လုပ်ငန်းတာဝန်များ။
  • ဖိုဒါသို့ ./dags dags ၏ဖော်ပြချက်နှင့်အတူ ကျွန်ုပ်တို့၏ဖိုင်များကို ပေါင်းထည့်ပါမည်။ ၎င်းတို့ကို ပျံပေါ်တွင် ကောက်ယူသွားမည်ဖြစ်ပြီး၊ ထို့ကြောင့် နှာချေမှုတိုင်းပြီးနောက် အစုအဝေးတစ်ခုလုံးကို လှည့်ပတ်ရန် မလိုအပ်ပါ။

အချို့နေရာများတွင် ဥပမာများရှိ ကုဒ်ကို လုံး၀မပြနိုင်ဘဲ (စာသားရှုပ်ပွနေစေရန်)၊ သို့သော် အချို့နေရာများတွင် ၎င်းကို လုပ်ငန်းစဉ်တွင် မွမ်းမံထားသည်။ ပြီးပြည့်စုံသော အလုပ်ကုဒ်နမူနာများကို သိုလှောင်ရုံတွင် တွေ့နိုင်ပါသည်။ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

မှတ်ချက်တွေ:

  • တေးရေး ပရိသတ်ထဲမှာ လူသိများတဲ့ ပုံရိပ်ကို အားကိုးမိတယ်။ puckel/docker-airflow - အဲဒါကို သေချာစစ်ဆေးပါ။ သင့်ဘဝမှာ တခြားဘာမှ မလိုအပ်တော့တာလည်း ဖြစ်နိုင်ပါတယ်။
  • Airflow ဆက်တင်များအားလုံးမှတဆင့်သာ ရရှိနိုင်ပါသည်။ airflow.cfgဒါပေမယ့်လည်း ကျွန်တော် အခွင့်ကောင်းယူပြီး ဆိုးဆိုးရွားရွား အသုံးချခဲ့တဲ့ ပတ်ဝန်းကျင် ကိန်းရှင်များ ( developer များ ကျေးဇူးတင်ပါတယ်)။
  • သဘာဝအတိုင်း၊ ၎င်းသည် ထုတ်လုပ်မှုအဆင်သင့်မဟုတ်ပါ- ကျွန်ုပ်သည် ကွန်တိန်နာများပေါ်တွင် နှလုံးခုန်သံများကို တမင်တကာ မထည့်ထားဘဲ လုံခြုံရေးအတွက် စိတ်မ၀င်စားပါ။ ဒါပေမယ့် ကျွန်တော်တို့ရဲ့ စမ်းသပ်သူတွေအတွက် သင့်တော်တဲ့ အနိမ့်ဆုံးကို ကျွန်တော် လုပ်ခဲ့တယ်။
  • မှတ်ရန်:
    • Dag ဖိုင်တွဲကို အချိန်ဇယားရေးဆွဲသူနှင့် အလုပ်သမားများ နှစ်ဦးစလုံး ဝင်ရောက်ကြည့်ရှုနိုင်ရပါမည်။
    • Third-party စာကြည့်တိုက်များအားလုံးနှင့် အတူတူပင်ဖြစ်သည် - ၎င်းတို့အားလုံးကို အချိန်ဇယားဆွဲသူနှင့် အလုပ်သမားများဖြင့် စက်များတွင် ထည့်သွင်းရပါမည်။

ကဲ၊ အခုက ရိုးရှင်းပါတယ်။

$ docker-compose up --scale worker=3

အရာအားလုံးတက်လာပြီးနောက်၊ သင်သည် ဝဘ်အင်တာဖေ့စ်များကို ကြည့်ရှုနိုင်သည်-

အခြေခံသဘောတရား

ဤ "dags" များအားလုံးတွင် သင်ဘာမှနားမလည်ပါက၊ ဤသည်မှာ အဘိဓာန်အတိုဖြစ်သည်။

  • Scheduler ကို - စက်ရုပ်များ ခက်ခက်ခဲခဲ အလုပ်လုပ်၍ လူတစ်ဦးမဟုတ်သော Airflow တွင် အရေးအပါဆုံး ဦးလေးဖြစ်သူ- အချိန်ဇယားကို စောင့်ကြည့်ခြင်း၊ ဒိတ်ဒိတ်လုပ်ခြင်း၊ အလုပ်များကို လုပ်ဆောင်ခြင်းများ လုပ်ဆောင်သည်။

    ယေဘုယျအားဖြင့်၊ ဗားရှင်းအဟောင်းများတွင်၊ သူသည် မှတ်ဉာဏ်ဆိုင်ရာ ပြဿနာများ (မဟုတ်ပါ၊ သတိမေ့ခြင်းမဟုတ်သော်လည်း ပေါက်ကြားခြင်း) ရှိပြီး အမွေအနှစ်သတ်မှတ်ချက်သည် configuration တွင်ပင် ကျန်ရှိနေပါသည်။ run_duration - ၎င်း၏ပြန်လည်စတင်ချိန်ကာလ။ ဒါပေမယ့် အခုတော့ အားလုံးအဆင်ပြေသွားပါပြီ။

  • DAG (aka "dag") - "ညွှန်ကြားထားသည့် acyclic ဂရပ်"၊ သို့သော် ထိုသို့သော အဓိပ္ပါယ်ဖွင့်ဆိုချက်သည် လူအနည်းငယ်ကို ပြောပြလိမ့်မည်၊ သို့သော် အမှန်တကယ်တွင် ၎င်းသည် အချင်းချင်း အပြန်အလှန် အကျိုးပြုသည့် အလုပ်များအတွက် ကွန်တိန်နာတစ်ခု (အောက်တွင်ကြည့်ပါ) သို့မဟုတ် SSIS ရှိ Package နှင့် Informatica ရှိ Workflow တို့၏ analogue တစ်ခုဖြစ်သည်။ .

    ဓားများအပြင်၊ ဆိုင်းဘုတ်များပါရှိနိုင်သော်လည်း ၎င်းတို့ကို ကျွန်ုပ်တို့ မရောက်နိုင်ပါ။

  • DAG ပြေး - ၎င်း၏ကိုယ်ပိုင်သတ်မှတ်ထားသောရက်ကိုစတင်သည်။ execution_date. တူညီသော ဒရာဂရာန်များသည် ပြိုင်တူအလုပ်လုပ်နိုင်သည် (သင်၏တာဝန်များကို အားနည်းအောင်ပြုလုပ်ထားလျှင် ဟုတ်ပါတယ်)။
  • အော်ပရေတာ တိကျသောလုပ်ဆောင်ချက်တစ်ခုလုပ်ဆောင်ရန် တာဝန်ရှိသောကုဒ်အပိုင်းအစများဖြစ်သည်။ အော်ပရေတာသုံးမျိုးရှိသည်။
    • လှုပ်ရှားမှုငါတို့အကြိုက် PythonOperatorမည်သည့် (တရားဝင်) Python ကုဒ်ကိုမဆို လုပ်ဆောင်နိုင်သည်၊
    • လွှဲပြောင်းတစ်နေရာမှ တစ်နေရာသို့ ပို့ဆောင်ပေးသော ဒေတာ၊ MsSqlToHiveTransfer;
    • အာရုံခံကိရိယာ အခြားတစ်ဖက်တွင်၊ ၎င်းသည် ဖြစ်ရပ်တစ်ခုမဖြစ်ပွားမီအထိ ဒိုင်း၏နောက်ထပ်လုပ်ဆောင်မှုကို တုံ့ပြန်ရန် သို့မဟုတ် နှေးကွေးစေမည်ဖြစ်သည်။ HttpSensor သတ်မှတ်ထားသော အဆုံးအမှတ်ကို ဆွဲထုတ်နိုင်ပြီး အလိုရှိသော တုံ့ပြန်မှုကို စောင့်ဆိုင်းနေချိန်တွင် လွှဲပြောင်းမှုကို စတင်ပါ။ GoogleCloudStorageToS3Operator. စူးစမ်းလိုစိတ်က “ဘာကြောင့်လဲ။ နောက်ဆုံးအနေနဲ့၊ အော်ပရေတာမှာ ထပ်ခါထပ်ခါလုပ်နိုင်ပါတယ်။” ထို့နောက် ဆိုင်းငံ့ထားသော အော်ပရေတာများနှင့် အလုပ်များ ပိတ်ဆို့ခြင်း မရှိစေရေး။ နောက်တစ်ကြိမ်မကြိုးစားမီ အာရုံခံကိရိယာသည် စတင်သည်၊ စစ်ဆေးပြီး သေဆုံးသည်။
  • လုပ်ငန်း - အမျိုးအစားမခွဲခြားဘဲ ကြေငြာထားသော အော်ပရေတာများအား အလုပ်ရာထူးအဆင့်သို့ တိုးမြှင့်ပေးသည်။
  • အလုပ်ဥပမာ - ဖျော်ဖြေသူ-အလုပ်သမားများကို တိုက်ပွဲဝင်ရန် အထွေထွေစီစဉ်သူမှ တာဝန်များပေးပို့ရန် အချိန်တန်ပြီဟု ဆုံးဖြတ်သောအခါ (ကျွန်ုပ်တို့အသုံးပြုလျှင် နေရာမှန်၊ LocalExecutor သို့မဟုတ် ၏ကိစ္စရပ်တွင် အဝေးထိန်း Node တစ်ခုဆီသို့ CeleryExecutor) ၎င်းသည် ၎င်းတို့အား ဆက်စပ်အကြောင်းအရာတစ်ခု သတ်မှတ်ပေးသည် (ဆိုလိုသည်မှာ၊ ကိန်းရှင်အစုတစ်ခု - လုပ်ဆောင်မှုဘောင်များ)၊ အမိန့်ပေးချက် သို့မဟုတ် စုံစမ်းမှုပုံစံများကို ချဲ့ထွင်ပြီး ၎င်းတို့ကို ပေါင်းစည်းပေးသည်။

ကျွန်ုပ်တို့သည် အလုပ်များကို ထုတ်ပေးပါသည်။

ဦးစွာ၊ ကျွန်ုပ်တို့၏ doug ၏ ယေဘူယျအစီအစဥ်ကို အကြမ်းဖျဉ်းဖော်ပြကြပါစို့၊ ထို့နောက် ကျွန်ုပ်တို့သည် အသေးအဖွဲမဟုတ်သော ဖြေရှင်းနည်းအချို့ကို အသုံးပြုထားသောကြောင့် အသေးစိတ်အချက်အလက်များကို ပို၍ပို၍တိုး၍ အသေးစိတ်လေ့လာကြည့်ပါမည်။

ထို့ကြောင့်၊ ၎င်း၏အရိုးရှင်းဆုံးပုံစံဖြင့်၊ ထိုကဲ့သို့သော ဓားပုံသည် ဤကဲ့သို့ဖြစ်နေလိမ့်မည်-

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

အဖြေရှာကြည့်ရအောင်။

  • ပထမဦးစွာ ကျွန်ုပ်တို့သည် လိုအပ်သော libs များနှင့် တင်သွင်းသည်။ အခြားတစ်ခုခု;
  • sql_server_ds - က List[namedtuple[str, str]] Airflow Connections မှချိတ်ဆက်မှုများ၏အမည်များနှင့်ကျွန်ုပ်တို့၏ပန်းကန်ကိုယူမည့်ဒေတာဘေ့စ်များ၊
  • dag - သေချာပေါက်ဖြစ်ရမည်၊ ကျွန်ုပ်တို့၏ doug ၏ကြေငြာချက် globals()မဟုတ်ရင် Airflow က ရှာမတွေ့ဘူး။ Doug ကလည်း ပြောလိုပါသည်။
    • သူ့နာမည်က ဘာလဲ။ orders - ထို့နောက် ဤအမည်သည် ဝဘ်အင်တာဖေ့စ်တွင် ပေါ်လာလိမ့်မည်၊
    • ဇူလိုင်လ ရှစ်ရက်နေ့ ညသန်းခေါင်မှ အလုပ်ဆင်းရမည်၊
    • ခန့်မှန်းခြေအားဖြင့် 6 နာရီတိုင်း ပြေးသင့်ပါတယ်။ timedelta() ခွင့်မပြု cron-လိုင်း 0 0 0/6 ? * * *, for the less cool - ကြိုက်တဲ့ expression တစ်ခု @daily);
  • workflow() အဓိက အလုပ်ကို လုပ်လိမ့်မယ်၊ ဒါပေမယ့် အခု မဟုတ်ဘူး။ ယခုအချိန်တွင် ကျွန်ုပ်တို့၏အကြောင်းအရာကို မှတ်တမ်းထဲသို့ စွန့်ပစ်လိုက်ပါမည်။
  • ယခုတွင် လုပ်ဆောင်စရာများကို ဖန်တီးခြင်း၏ ရိုးရှင်းသော မှော်ပညာ။
    • ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ရင်းမြစ်များမှတဆင့် လုပ်ဆောင်ပါသည်။
    • စတင်လိုက်ပါ။ PythonOperatorကျွန်ုပ်တို့၏ dummy ကို အကောင်အထည်ဖော်မည့်၊ workflow(). အလုပ်၏ထူးခြားသော (ဒဂ်အတွင်း) အမည်ကို သတ်မှတ်၍ ဓားကို ချည်နှောင်ရန် မမေ့ပါနှင့်။ အလံ provide_context တစ်ဖန်၊ ကျွန်ုပ်တို့ ဂရုတစိုက် စုဆောင်းအသုံးပြုထားသော function ထဲသို့ နောက်ထပ် အကြောင်းပြချက်များကို လောင်းထည့်ပါမည်။ **context.

လောလောဆယ်တော့ ဒီလောက်ပါပဲ။ ကျွန်ုပ်တို့ရရှိသည်-

  • web interface တွင် dag အသစ်၊
  • အပြိုင်လုပ်ဆောင်မည့် အလုပ်ပေါင်း တစ်ရာခွဲ (Airflow၊ Celery ဆက်တင်များနှင့် ဆာဗာစွမ်းရည်က ခွင့်ပြုပါက)။

ကောင်းပြီ၊ ရပြီ။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။
မှီခိုမှုများကို မည်သူက ထည့်သွင်းမည်နည်း။

ဤအရာအားလုံးကို ရိုးရှင်းစေရန်အတွက် ကျွန်ုပ် လှည့်စားခဲ့ပါသည်။ docker-compose.yml လုပ်ဆောင်နေသည် requirements.txt node အားလုံးတွင်။

အခုတော့ သွားပြီ-

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

မီးခိုးရောင်စတုရန်းများသည် အချိန်ဇယားဆွဲသူမှ လုပ်ဆောင်သော အလုပ်ဖြစ်ရပ်များဖြစ်သည်။

ခဏစောင့်ပါ၊ အလုပ်သမားတွေက အလုပ်တွေ ရှုပ်ကုန်တယ်။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ဟုတ်ပါတယ် အစိမ်းတွေ ဟာ သူတို့ရဲ့ အလုပ်ကို အောင်မြင်စွာ ပြီးမြောက်ခဲ့ကြ ပါတယ်။ အနီရောင်တွေက သိပ်မအောင်မြင်ပါဘူး။

စကားမစပ်၊ ကျွန်ုပ်တို့၏ထုတ်ကုန်တွင် folder မရှိပါ။ ./dagsစက်များကြားတွင် ထပ်တူပြုမှု မရှိပါ - ဒိုင်းများ အားလုံး ပါ၀င်ပါသည်။ git ကျွန်ုပ်တို့၏ Gitlab တွင်၊ နှင့် Gitlab CI သည် ပေါင်းစည်းသည့်အခါ စက်များသို့ အပ်ဒိတ်များကို ဖြန့်ဝေပါသည်။ master.

ပန်းအကြောင်းအနည်းငယ်

အလုပ်သမားတွေက ကျွန်တော်တို့ရဲ့ ရင်ကျပ်စေတဲ့အရာတွေကို ပွတ်တိုက်နေချိန်မှာ ကျွန်တော်တို့ကို တစ်ခုခုပြပေးနိုင်တဲ့ နောက်ထပ်ကိရိယာတစ်ခု - Flower ကို သတိရလိုက်ကြရအောင်။

အလုပ်သမား ဆုံမှတ်များဆိုင်ရာ အကျဉ်းချုပ် အချက်အလက်ပါရှိသော ပထမစာမျက်နှာ

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

အလုပ်သွားခဲ့သည့် အလုပ်များပါ အပြင်းထန်ဆုံး စာမျက်နှာ

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ကျွန်ုပ်တို့၏ ပွဲစား၏ အခြေအနေနှင့် အပျင်းဆုံး စာမျက်နှာ

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

အတောက်ပဆုံးစာမျက်နှာမှာ အလုပ်အခြေအနေဂရပ်များနှင့် ၎င်းတို့၏လုပ်ဆောင်ချိန်များပါရှိသည်-

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ကျွန်ုပ်တို့သည် ဝန်ထုပ်ဝန်ပိုးကို ထမ်းရွက်သည်။

ဒါမှ အလုပ်တွေ ပြီးသွားပြီ၊ ဒဏ်ရာရသူတွေကို သယ်သွားနိုင်တယ်။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

အကြောင်းရင်းတစ်ခုမဟုတ်တစ်ခုကြောင့် ဒဏ်ရာရသူအများအပြားရှိခဲ့သည်။ Airflow ၏မှန်ကန်သောအသုံးပြုမှုကိစ္စတွင်၊ ဤစတုရန်းပုံများသည် data များသေချာပေါက်ရောက်မလာကြောင်းဖော်ပြသည်။

မှတ်တမ်းကို ကြည့်ရှုပြီး ကျဆင်းသွားသော လုပ်ဆောင်စရာဖြစ်ရပ်များကို ပြန်လည်စတင်ရန် လိုအပ်သည်။

မည်သည့်စတုရန်းကို နှိပ်ခြင်းဖြင့်၊ ကျွန်ုပ်တို့အတွက် ရရှိနိုင်သော လုပ်ဆောင်ချက်များကို ကျွန်ုပ်တို့ တွေ့ရလိမ့်မည်-

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ပြုတ်ကျတာကို ရှင်းအောင်လုပ်လို့ရတယ်။ ဆိုလိုသည်မှာ၊ ထိုနေရာတွင် တစ်စုံတစ်ခု ပျက်ကွက်သွားသည်ကို ကျွန်ုပ်တို့မေ့ထားပြီး တူညီသောလုပ်ဆောင်စရာမှာ အချိန်ဇယားဆွဲသူထံ ရောက်သွားမည်ဖြစ်သည်။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

အနီရောင်စတုရန်းအားလုံးကို မောက်စ်ဖြင့်ပြုလုပ်ခြင်းသည် လူသားဆန်ခြင်းမဟုတ်ကြောင်း ရှင်းရှင်းလင်းလင်းသိရသည် - ၎င်းသည် Airflow မှ ကျွန်ုပ်တို့မျှော်လင့်ထားသည့်အရာမဟုတ်ပါ။ သဘာဝအားဖြင့်၊ ကျွန်ုပ်တို့တွင် အစုလိုက်အပြုံလိုက် ဖျက်ဆီးနိုင်သော လက်နက်များရှိသည်။ Browse/Task Instances

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

အရာအားလုံးကို တစ်ပြိုင်နက် ရွေးချယ်ပြီး သုညသို့ ပြန်လည်သတ်မှတ်ကြပါစို့၊ မှန်ကန်သည့်အရာကို နှိပ်ပါ-

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

သန့်ရှင်းရေးလုပ်ပြီးနောက်၊ ကျွန်ုပ်တို့၏တက္ကစီများသည် ဤပုံစံအတိုင်းဖြစ်သည် (သူတို့စီစဉ်ပေးမည့်အစီအစဉ်မှူးကို စောင့်နေပြီဖြစ်သည်)။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ချိတ်ဆက်မှုများ၊ ချိတ်များနှင့် အခြားပြောင်းလဲမှုများ

နောက် DAG ကိုကြည့်ဖို့အချိန်ရောက်ပြီ၊ update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

လူတိုင်း အစီရင်ခံစာ အပ်ဒိတ်လုပ်ဖူးပါသလား။ ဤသည်မှာ သူမ၏ နောက်တစ်ကြိမ်ဖြစ်သည်- ဒေတာရယူရမည့်နေရာမှ ရင်းမြစ်များစာရင်းတစ်ခု ရှိပါသည်။ ထားရမည့်စာရင်းတစ်ခုရှိသည်။ အရာအားလုံးဖြစ်​ပျက်​သွားတဲ့အခါ ဟွန်းတီးဖို့မ​မေ့ပါနဲ့ (ဒါက ငါတို့နဲ့မဆိုင်​ပါဘူး)။

ဖိုင်ကိုတဖန်ပြန်ပြီး မထင်မရှားအရာအသစ်များကို ကြည့်ကြပါစို့။

  • from commons.operators import TelegramBotSendMessage - Unblocked သို့ မက်ဆေ့ချ်ပို့ခြင်းအတွက် သေးငယ်သော wrapper တစ်ခုပြုလုပ်ခြင်းဖြင့် ကျွန်ုပ်တို့၏ကိုယ်ပိုင်အော်ပရေတာများပြုလုပ်ခြင်းမှ ကျွန်ုပ်တို့အား မည်သည့်အရာကမှ တားဆီးပိတ်ပင်မည်မဟုတ်ပါ။ (ဤအော်ပရေတာအကြောင်းကို အောက်တွင် ဆက်လက်ဆွေးနွေးပါမည်။
  • default_args={} - dag သည် ၎င်း၏ အော်ပရေတာအားလုံးကို တူညီသော အကြောင်းပြချက်များကို ဖြန့်ဝေနိုင်သည်။
  • to='{{ var.value.all_the_kings_men }}' - အကွက် to ကျွန်ုပ်တို့တွင် hardcoded လုပ်မည်မဟုတ်သော်လည်း၊ ကျွန်ုပ်ဂရုတစိုက်ထည့်သွင်းထားသည့် အီးမေးလ်စာရင်းပါသည့် Jinja နှင့် variable တို့ကို အသုံးပြု၍ ဒိုင်းနမစ်ထုတ်ပေးပါသည်။ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - အော်ပရေတာစတင်ရန်အခြေအနေ။ ကျွန်ုပ်တို့၏အခြေအနေတွင်၊ မှီခိုမှုအားလုံး ပြေလည်သွားမှသာ သူဌေးထံ စာပို့ပါမည်။ အောင်မြင်စွာ;
  • tg_bot_conn_id='tg_main' - အငြင်းပွားမှုများ conn_id ကျွန်ုပ်တို့ ဖန်တီးထားသော ချိတ်ဆက်မှု ID များကို လက်ခံပါ။ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ပြုတ်ကျသောအလုပ်များရှိမှသာ Telegram မှမက်ဆေ့ခ်ျများသည်ဝေးကွာလိမ့်မည်။
  • task_concurrency=1 - လုပ်ငန်းတစ်ခု၏ လုပ်ငန်းဆောင်တာများစွာကို တစ်ပြိုင်နက်တည်း လုပ်ဆောင်ခြင်းကို ကျွန်ုပ်တို့ တားမြစ်ထားပါသည်။ မဟုတ်ပါက၊ ကျွန်ုပ်တို့သည် အများအပြားကို တပြိုင်နက်တည်း ပစ်လွှတ်နိုင်မည်ဖြစ်သည်။ VerticaOperator (စားပွဲတစ်လုံးကိုကြည့်ခြင်း);
  • report_update >> [email, tg] - အားလုံး VerticaOperator ဤကဲ့သို့သော စာများနှင့် မက်ဆေ့ချ်များ ပေးပို့ခြင်းတွင် ပေါင်းစပ်ပါ။
    Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

    သို့သော် အကြောင်းကြားသူ အော်ပရေတာများတွင် မတူညီသော လွှင့်တင်မှု အခြေအနေများ ရှိသောကြောင့်၊ တစ်ခုသာ အလုပ်လုပ်ပါမည်။ Tree View တွင်၊ အရာအားလုံးသည် အနည်းငယ်သာမြင်သာသည်-
    Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

စကားအနည်းငယ်ပြောပါမည်။ မက်ခရို သူတို့ရဲ့ သူငယ်ချင်းများ- ကိန်းရှင်များ.

Macros သည် အမျိုးမျိုးသော အသုံးဝင်သော အချက်အလက်များကို အော်ပရေတာ အငြင်းအခုံများအဖြစ် အစားထိုးနိုင်သော Jinja placeholder ဖြစ်သည်။ ဥပမာ၊ ဤကဲ့သို့သော၊

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} context variable ၏ အကြောင်းအရာများသို့ ချဲ့ထွင်ပါမည်။ execution_date ပုံစံအတွက် YYYY-MM-DD: 2020-07-14. အကောင်းဆုံးအပိုင်းမှာ ဆက်စပ်ကိန်းရှင်များကို သတ်သတ်မှတ်မှတ်လုပ်ဆောင်စရာ ဥပမာတစ်ခု (သစ်ပင်မြင်ကွင်းရှိ စတုရန်းတစ်ခု) တွင် တပ်ဆင်ထားပြီး ပြန်လည်စတင်သည့်အခါ နေရာကိုင်ဆောင်သူများသည် တူညီသောတန်ဖိုးများအထိ ချဲ့ထွင်လာမည်ဖြစ်သည်။

တာဝန်တစ်ခုစီရှိ လုပ်ဆောင်ချက်တစ်ခုစီရှိ Rendered ခလုတ်ကို အသုံးပြု၍ သတ်မှတ်ထားသော တန်ဖိုးများကို ကြည့်ရှုနိုင်ပါသည်။ ဤသည်မှာ စာတစ်စောင်ပေးပို့ခြင်း၏ တာဝန်ဖြစ်သည်။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

မက်ဆေ့ချ်ပို့တဲ့အလုပ်မှာ၊

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

နောက်ဆုံးရနိုင်သောဗားရှင်းအတွက် built-in macro စာရင်းအပြည့်အစုံကို ဤနေရာတွင် ရနိုင်ပါသည်- မက်ခရိုအကိုးအကား

ထို့အပြင်၊ plugins များ၏အကူအညီဖြင့်၊ ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ကိုယ်ပိုင် macro ကိုကြေငြာနိုင်သည်၊ သို့သော်၎င်းသည်အခြားဇာတ်လမ်းဖြစ်သည်။

ကြိုတင်သတ်မှတ်ထားသည့်အရာများအပြင်၊ ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ variable များ၏တန်ဖိုးများကို အစားထိုးနိုင်သည် (အထက်ကုဒ်တွင် ၎င်းကို ကျွန်ုပ်အသုံးပြုထားပြီးဖြစ်သည်)။ ဖန်တီးကြရအောင် Admin/Variables အရာနှစ်ခု-

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

သင်အသုံးပြုနိုင်သမျှ-

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

တန်ဖိုးသည် စကေးတစ်ခု ဖြစ်နိုင်သည်၊ သို့မဟုတ် JSON လည်း ဖြစ်နိုင်သည်။ JSON တွင်၊

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

လိုချင်သောကီးဆီသို့ လမ်းကြောင်းကို အသုံးပြုပါ။ {{ var.json.bot_config.bot.token }}.

စကားလုံးတစ်လုံးတည်းနဲ့ ဖန်သားပြင်ပုံတစ်ပုံကို ပြပါမယ်။ ဆက်သွယ်မှုများ. အရာအားလုံးသည် ဤနေရာတွင် အခြေခံဖြစ်သည်- စာမျက်နှာပေါ်တွင် Admin/Connections ကျွန်ုပ်တို့သည် ချိတ်ဆက်မှုတစ်ခုကို ဖန်တီးပြီး၊ ကျွန်ုပ်တို့၏ လော့ဂ်အင်များ/စကားဝှက်များကို ပေါင်းထည့်ကာ ထိုနေရာတွင် ပိုမိုတိကျသော ကန့်သတ်ဘောင်များကို ထည့်ပါသည်။ ဒီလိုမျိုး:

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

စကားဝှက်များကို ကုဒ်ဝှက်ထားနိုင်သည် (မူရင်းထက် ပိုသေချာသည်) သို့မဟုတ် ချိတ်ဆက်မှုအမျိုးအစားကို ချန်ထားနိုင်သည် (ကျွန်တော်ပြုလုပ်ခဲ့သည့်အတိုင်း၊ tg_main) - အမှန်မှာ အမျိုးအစားများစာရင်းကို Airflow မော်ဒယ်များတွင် hardwired လုပ်ပြီး အရင်းအမြစ်ကုဒ်များထဲသို့ မဝင်ဘဲ ချဲ့ထွင်၍မရပါ (ရုတ်တရက် ကျွန်ုပ် google မှ တစ်စုံတစ်ခု မလုပ်ဆောင်ပါက၊ ကျေးဇူးပြု၍ ပြင်ပေးပါ)၊ သို့သော် ခရက်ဒစ်များရယူခြင်းမှ ကျွန်ုပ်တို့အား မည်သည့်အရာမှ တားဆီးနိုင်မည်မဟုတ်ပါ။ နာမည်။

တူညီသောအမည်ဖြင့် ချိတ်ဆက်မှုများစွာ ပြုလုပ်နိုင်သည်- ဤကိစ္စတွင်၊ နည်းလမ်း BaseHook.get_connection()အမည်ဖြင့်ဆက်သွယ်မှုများရရှိသွားသော၊ ကျပန်း နာမည်အမျိုးမျိုးမှ ( Round Robin ကိုလုပ်ခြင်းသည် ပို၍ယုတ္တိရှိလိမ့်မည်၊ သို့သော် Airflow developer များ၏စိတ်တွင်ထားခဲ့ကြပါစို့)။

Variables များနှင့် Connections များသည် အလွန်ကောင်းမွန်သော ကိရိယာများဖြစ်သည်၊ သို့သော် ဟန်ချက်မညီရန် အရေးကြီးသည်- သင့်စီးဆင်းမှု၏ အစိတ်အပိုင်းများကို ကုဒ်ကိုယ်တိုင် သိမ်းဆည်းထားပြီး မည်သည့်အပိုင်းများကို သိုလှောင်ရန်အတွက် Airflow သို့ ပေးမည်နည်း။ တစ်ဖက်တွင်၊ UI မှတစ်ဆင့်၊ ဥပမာ၊ စာပို့သေတ္တာတစ်ခု၊ တန်ဖိုးကို လျင်မြန်စွာပြောင်းလဲရန် အဆင်ပြေနိုင်သည်။ အခြားတစ်ဖက်တွင်၊ ၎င်းသည်ကျွန်ုပ်တို့ (ကျွန်ုပ်) မှဖယ်ရှားလိုသော mouse ကိုနှိပ်ခြင်းသို့ပြန်သွားဆဲဖြစ်သည်။

ချိတ်ဆက်မှုဖြင့် လုပ်ဆောင်ခြင်းသည် အလုပ်များထဲမှ တစ်ခုဖြစ်သည်။ ချိတ်. ယေဘုယျအားဖြင့်၊ Airflow ချိတ်များသည် ၎င်းကို ပြင်ပဝန်ဆောင်မှုများနှင့် စာကြည့်တိုက်များသို့ ချိတ်ဆက်ရန်အတွက် အမှတ်များဖြစ်သည်။ ဥပမာ- JiraHook Jira နှင့် အပြန်အလှန် တုံ့ပြန်ရန် ကျွန်ုပ်တို့အတွက် client တစ်ခုကို ဖွင့်ပေးမည် (သင်လုပ်ဆောင်စရာများကို နောက်ပြန်လှည့်နိုင်သည်) နှင့် ၏အကူအညီဖြင့် SambaHook local ဖိုင်တစ်ခုသို့ တွန်းနိုင်သည်။ smb-point။

စိတ်ကြိုက်အော်ပရေတာကို ခွဲခြမ်းစိတ်ဖြာခြင်း။

အဲဒါကို ဘယ်လိုဖန်တီးလဲဆိုတာကို ကျွန်တော်တို့ နီးစပ်လာပါပြီ။ TelegramBotSendMessage

ကုဒ် commons/operators.py အမှန်တကယ်အော်ပရေတာနှင့်အတူ

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

ဤတွင်၊ Airflow ရှိ အခြားအရာအားလုံးကဲ့သို့ပင်၊ အရာအားလုံးသည် အလွန်ရိုးရှင်းပါသည်။

  • မှ အမွေဆက်ခံသည်။ BaseOperatorအနည်းငယ်သော Airflow သီးသန့်အရာများကို အကောင်အထည်ဖော်ပေးသော (သင့်အားလပ်ရက်ကိုကြည့်ပါ)
  • အကွက်များ ကြေငြာသည်။ template_fieldsJinja သည် လုပ်ဆောင်ရန် မက်ခရိုများကို ရှာဖွေမည်ဖြစ်သည်။
  • မှန်ကန်သော ငြင်းခုံမှုများကို စီစဉ်ပေးခဲ့သည်။ __init__()လိုအပ်ပါက ပုံသေသတ်မှတ်ပါ။
  • ဘိုးဘေး၏ အစပြုခြင်းကိုလည်း ကျွန်ုပ်တို့ မမေ့ခဲ့ပါ။
  • သက်ဆိုင်ရာ ချိတ်ကို ဖွင့်လိုက်သည်။ TelegramBotHook၎င်းထံမှ client object တစ်ခုကို လက်ခံရရှိခဲ့သည်။
  • Overridden (ပြန်လည်သတ်မှတ်) နည်းလမ်း BaseOperator.execute()အော်ပရေတာစတင်ချိန်ကျလာသောအခါ Airfow သည် တုန်လှုပ်သွားလိမ့်မည် - ၎င်းတွင် ကျွန်ုပ်တို့သည် လော့ဂ်အင်လုပ်ရန် မေ့လျော့ကာ ပင်မလုပ်ဆောင်ချက်ကို အကောင်အထည်ဖော်ပါမည်။ (ကျွန်ုပ်တို့ log in ၊ စကားအားဖြင့်၊ ချက်ချင်းဝင်ပါ။ stdout и stderr - Airflow သည် အရာအားလုံးကို ကြားဖြတ်၊ လှလှပပ ထုပ်ပိုးပြီး လိုအပ်ပါက ပြိုကွဲသွားမည်ဖြစ်သည်။)

ငါတို့မှာ ဘာတွေရှိလဲ ကြည့်ရအောင် commons/hooks.py. ဖိုင်၏ပထမပိုင်း၊ ချိတ်ကိုယ်တိုင်-

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ဒီနေရာမှာ ဘာရှင်းပြရမှန်းတောင် မသိတော့ဘူး၊ အရေးကြီးတဲ့ အချက်တွေကို မှတ်သားထားပါ့မယ်။

  • ကျွန်ုပ်တို့သည် အမွေဆက်ခံမှု၊ ငြင်းခုံမှုများကို စဉ်းစားကြည့်ပါ- ကိစ္စအများစုတွင် ၎င်းသည် တစ်ခုဖြစ်သည်- conn_id;
  • စံနည်းလမ်းများကို ပဓာနထားခြင်း- ကျွန်ုပ်သည် မိမိကိုယ်ကို ကန့်သတ်ထားသည်။ get_conn()ချိတ်ဆက်မှု parameters များကို နာမည်ဖြင့် ရယူပြီး အပိုင်းကို ရယူပါ။ extra (ဒါက JSON အကွက်တစ်ခုပါ)၊ ကျွန်ုပ် (ကျွန်ုပ်၏ကိုယ်ပိုင်ညွှန်ကြားချက်အရ) တွင် Telegram bot တိုကင်ကို ထည့်သွင်းခဲ့သည်- {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • ငါတို့ရဲ့ ဥပမာတစ်ခုကို ငါဖန်တီးတယ်။ TelegramBotတိကျသော တိုကင်တစ်ခုပေးသည်။

ဒါပါပဲ။ အသုံးပြုပြီး ချိတ်တစ်ခုမှ client တစ်ခုကို သင်ရနိုင်သည်။ TelegramBotHook().clent သို့မဟုတ် TelegramBotHook().get_conn().

ဖိုင်၏ဒုတိယအပိုင်းသည် Telegram REST API အတွက် microwrapper တစ်ခုပြုလုပ်၍ တူညီသောဆွဲယူမှုမဖြစ်စေရန်၊ python-telegram-bot နည်းလမ်းတစ်ခုအတွက် sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

မှန်ကန်သောနည်းလမ်းမှာ အားလုံးပေါင်းထည့်ရန်ဖြစ်သည်- TelegramBotSendMessage, TelegramBotHook, TelegramBot - plugin တွင်၊ အများသူငှာသိုလှောင်မှုတွင်ထည့်ပါ၊ ၎င်းကို Open Source သို့ပေးပါ။

ကျွန်ုပ်တို့ ဤအရာအားလုံးကို လေ့လာနေစဉ်၊ ကျွန်ုပ်တို့၏ အစီရင်ခံစာ အပ်ဒိတ်များသည် အောင်မြင်စွာ မအောင်မြင်နိုင်ဘဲ ချန်နယ်တွင် အမှားအယွင်း မက်ဆေ့ချ်တစ်ခု ပေးပို့နိုင်ခဲ့သည်။ မှားနေလား စစ်ဆေးကြည့်မယ်...

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။
ကျွန်ုပ်တို့၏ခွေးကလေးတွင် တစ်စုံတစ်ခု ပျက်သွားသည် ။ အဲဒါ ငါတို့မျှော်လင့်ထားတာ မဟုတ်ဘူးလား? အတိအကျ

လိမ်းမှာလား။

တစ်ခုခုကို လွဲချော်နေတယ်လို့ ခံစားရပါသလား။ SQL Server မှ ဒေတာများကို Vertica သို့ လွှဲပြောင်းပေးမည်ဟု ကတိပြုထားပုံရပြီး၊ ထို့နောက် ၎င်းကို ယူကာ ခေါင်းစဉ်မှ ဖယ်လိုက်သည်ဟဲ့။

ဤရက်စက်ယုတ်မာမှုသည် ရည်ရွယ်ချက်ရှိရှိ၊ ကျွန်ုပ်သည် သင့်အတွက် ဝေါဟာရအသုံးအနှုန်းအချို့ကို ရိုးရိုးရှင်းရှင်းဖော်ပြရမည်ဖြစ်ပါသည်။ ယခုသင်ပိုမိုသွားနိုင်သည်။

ကျွန်ုပ်တို့၏အစီအစဉ်မှာ ဤအရာဖြစ်သည်-

  1. ဒိုင်းလုပ်ပါ။
  2. အလုပ်များကို ဖန်တီးပါ။
  3. အရာအားလုံးက ဘယ်လောက်လှလဲဆိုတာ ကြည့်လိုက်ပါ။
  4. ဖြည့်စွက်ရန် စက်ရှင်နံပါတ်များကို သတ်မှတ်ပါ။
  5. SQL Server မှဒေတာကိုရယူပါ။
  6. ဒေတာကို Vertica တွင်ထည့်ပါ။
  7. စာရင်းဇယားများစုဆောင်းပါ။

ဒါကြောင့် ဒီအရာအားလုံးကို အကောင်အထည်ဖော်နိုင်ဖို့၊ ကျွန်တော်တို့ရဲ့ အသေးအမွှားထပ်တိုးမှုတစ်ခု လုပ်ခဲ့တယ်။ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

အဲဒီမှာတင်

  • အိမ်ရှင်အဖြစ် Vertica dwh ပုံသေသတ်မှတ်ချက်အများစုနှင့်အတူ၊
  • SQL Server သုံးမျိုး၊
  • ကျွန်ုပ်တို့သည် နောက်ပိုင်းတွင် ဒေတာဘေ့စ်များကို ဒေတာအချို့ဖြင့် ဖြည့်သွင်းသည် (မည်သည့်အခြေအနေတွင်မှ စူးစမ်းမနေပါ။ mssql_init.py!)

ကျွန်ုပ်တို့သည် ယခင်အကြိမ်ထက် အနည်းငယ်ပိုရှုပ်ထွေးသော command ၏အကူအညီဖြင့် ကောင်းမွန်သောအရာအားလုံးကို စတင်လိုက်သည်-

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ကျွန်ုပ်တို့၏ miracle randomizer မှထုတ်ပေးသောအရာကို သင်အသုံးပြုနိုင်ပါသည်။ Data Profiling/Ad Hoc Query:

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။
အဓိကကတော့ အဲဒါကို အကဲခတ်တွေကို မပြဖို့ပါ။

အသေးစိတ်ဖော်ပြပါ။ ETL အစည်းအဝေးများ မဟုတ်ဘူး၊ အရာအားလုံးက အသေးအဖွဲပဲ မဟုတ်ဘူး၊ ငါတို့က အခြေခံတစ်ခုလုပ်တယ်၊ အဲဒါမှာ ဆိုင်းဘုတ်တစ်ခုရှိတယ်၊ အရာအားလုံးကို ဆက်စပ်မန်နေဂျာနဲ့ ခြုံပြီး အခု ဒါကိုလုပ်တယ်-

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

အချိန်ကျပြီ။ ကျွန်ုပ်တို့၏ဒေတာကိုစုဆောင်းပါ။ ငါတို့စားပွဲတစ်ရာခွဲကနေ။ အလွန်တရာမှ ဟန်မဆောင်သော လိုင်းများအကူအညီဖြင့် ဤအရာကို လုပ်ကြပါစို့။

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ချိတ်တစ်ခု၏အကူအညီဖြင့် Airflow မှရရှိသည်။ pymssql-ချိတ်ဆက်ပါ။
  2. တောင်းဆိုမှုတွင် ရက်စွဲပုံစံဖြင့် ကန့်သတ်ချက်တစ်ခုကို အစားထိုးကြပါစို့ - ၎င်းကို ပုံစံခွက်အင်ဂျင်ဖြင့် လုပ်ဆောင်မှုထဲသို့ ထည့်သွင်းမည်ဖြစ်သည်။
  3. ကျွန်ုပ်တို့၏တောင်းဆိုမှုကို ကျွေးမွေးခြင်း။ pandasငါတို့ကို ဘယ်သူယူမလဲ။ DataFrame - အနာဂတ်တွင် ကျွန်ုပ်တို့အတွက် အသုံးဝင်ပါလိမ့်မည်။

အစားထိုးသုံးနေတယ်။ {dt} တောင်းဆိုမှုကန့်သတ်ချက်အစား %s ငါက မကောင်းဆိုးဝါး Pinocchio ကြောင့်မဟုတ်ဘဲ၊ pandas မကိုင်တွယ်နိုင်ဘူး။ pymssql ပြီးတော့ နောက်ဆုံးတစ်ခုကို ဖြတ်ပစ်လိုက်တယ်။ params: Listသူတကယ်လိုချင်ပေမယ့် tuple.
ပြုစုသူကိုလည်း သတိပြုပါ။ pymssql သူ့ကို မထောက်ခံတော့ဘူးလို့ ဆုံးဖြတ်ပြီး ထွက်သွားဖို့ အချိန်တန်ပြီ။ pyodbc.

Airflow သည် ကျွန်ုပ်တို့၏လုပ်ဆောင်ချက်များ၏ ငြင်းခုံချက်များကို ဖြည့်ဆည်းပေးသည်ကို ကြည့်ကြပါစို့။

Apache Airflow- ETL ကိုပိုမိုလွယ်ကူအောင်လုပ်ခြင်း။

ဒေတာမရှိရင် ဆက်လုပ်စရာ အကြောင်းမရှိပါဘူး။ ဒါပေမယ့် ဖြည့်သွင်းမှု အောင်မြင်တယ်လို့ ယူဆတာက ထူးဆန်းပါတယ်။ ဒါပေမယ့် ဒါက အမှားမဟုတ်ပါဘူး။ သြော် ဘာလုပ်ရမလဲ! ပြီးတော့ ဒါကဘာလဲ။

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException အမှားအယွင်းများမရှိဟု Airflow ကိုပြောသော်လည်း ကျွန်ုပ်တို့သည် လုပ်ဆောင်မှုကို ကျော်သွားပါသည်။ အင်တာဖေ့စ်တွင် အစိမ်းရောင် သို့မဟုတ် အနီရောင်စတုရန်းရှိမည်မဟုတ်သော်လည်း ပန်းရောင်ရှိသည်။

ကျွန်တော်တို့ရဲ့ဒေတာကိုပစ်ကြပါစို့ ကော်လံမျိုးစုံ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

အမည်:

  • အော်ဒါယူထားတဲ့ ဒေတာဘေ့စ်၊
  • ကျွန်ုပ်တို့၏ ရေကြီးခြင်းဆိုင်ရာ အိုင်ဒီ (ကွဲပြားပါမည်။ အလုပ်တိုင်းအတွက်),
  • အရင်းအမြစ်နှင့် အမှာစာ ID မှ hash တစ်ခု - ထို့ကြောင့် နောက်ဆုံးဒေတာဘေ့စ် (ဇယားတစ်ခုထဲသို့ အရာအားလုံးကို လောင်းထည့်သည့်) တွင် ထူးခြားသော အမှာစာ ID တစ်ခုရှိသည်။

အဆုံးစွန်သောအဆင့်ကျန်သည်- အရာအားလုံးကို Vertica ထဲသို့လောင်းထည့်ပါ။ ထူးဆန်းတာက၊ ဒါကိုလုပ်ဖို့ အံ့မခန်းနဲ့ အထိရောက်ဆုံးနည်းလမ်းတွေထဲကတစ်ခုကတော့ CSV မှတဆင့်ပါ။

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ကျွန်ုပ်တို့သည် အထူးလက်ခံသူအား ပြုလုပ်နေပါသည်။ StringIO.
  2. pandas ကြင်ကြင်နာနာ ငါတို့ကိုထားမယ်။ DataFrame ၏ပုံစံ CSV-လိုင်းများ။
  3. ချိတ်တစ်ခုဖြင့် ကျွန်ုပ်တို့၏အကြိုက်ဆုံး Vertica သို့ ချိတ်ဆက်မှုတစ်ခုဖွင့်ကြပါစို့။
  4. ယခုအကူအညီဖြင့် copy() ကျွန်ုပ်တို့၏ဒေတာကို Vertika သို့ တိုက်ရိုက်ပေးပို့ပါ။

လိုင်းမည်မျှဖြည့်ထားသည်ကို ဒရိုင်ဘာထံမှ ယူမည်ဖြစ်ပြီး အရာအားလုံး အဆင်ပြေကြောင်း စက်ရှင်မန်နေဂျာအား ပြောပြပါမည်။

session.loaded_rows = cursor.rowcount
session.successful = True

ဒါပါပဲ။

ရောင်းချမှုတွင်၊ ကျွန်ုပ်တို့သည် ပစ်မှတ်ပန်းကန်ကို ကိုယ်တိုင်ဖန်တီးပါသည်။ ဤတွင် ကျွန်ုပ်သည် ကျွန်ုပ်အား စက်သေးသေးလေးတစ်လုံးကို ခွင့်ပြုလိုက်ပါသည်။

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ငါအသုံးပြုနေတယ် VerticaOperator() ဒေတာဘေ့စ်စခီမာတစ်ခုနှင့် ဇယားတစ်ခုကို ငါဖန်တီးသည် (၎င်းတို့မရှိသေးပါက၊ ဟုတ်ပါတယ်)။ အဓိကအချက်မှာ မှီခိုအားထားမှုများကို မှန်ကန်စွာစီစဉ်ရန်ဖြစ်သည်။

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

တက်ကဉျြးခြုပျ

ကြွက်ကလေးက ပြောတယ် မဟုတ်လား၊
ငါက တောထဲမှာ ကြောက်စရာအကောင်းဆုံး တိရိစ္ဆာန်ပဲဆိုတာ မင်းယုံလား။

Julia Donaldson, The Gruffalo

အကယ်၍ ကျွန်ုပ်၏လုပ်ဖော်ကိုင်ဖက်များနှင့် ကျွန်ုပ်တွင် ပြိုင်ဆိုင်မှုတစ်ခုရှိခဲ့ပါက၊ ETL လုပ်ငန်းစဉ်ကို အလျင်အမြန်ဖန်တီးပြီး မည်သူက စတင်နိုင်မည်နည်း။ ၎င်းတို့သည် ၎င်းတို့၏ SSIS နှင့် mouse နှင့် ကျွန်ုပ်ကို Airflow ဖြင့် နှိုင်းယှဉ်ကြည့်ပါက ပြုပြင်ထိန်းသိမ်းမှု လွယ်ကူမှုကိုလည်း နှိုင်းယှဉ်ကြည့်မည်ဖြစ်ပါသည်... ဝိုး၊ ငါသူတို့ကို အရပ်ရပ်မှာ အနိုင်ယူမယ်ဆိုတာ မင်းသဘောတူမယ်ထင်တယ်။

နည်းနည်းလေးလေးနက်နက်ပြောရရင် Apache Airflow - ပရိုဂရမ်ကုဒ်ပုံစံနဲ့ လုပ်ငန်းစဉ်တွေကို ဖော်ပြခြင်းအားဖြင့် - ငါ့အလုပ် ပိုပြီး ပိုအဆင်ပြေပြီး ပျော်စရာကောင်းတယ်။

၎င်း၏ အကန့်အသတ်မရှိ ချဲ့ထွင်နိုင်မှုသည် ပလပ်အင်များနှင့် ချဲ့ထွင်နိုင်မှုအပေါ် တွန်းအားပေးမှုတို့ကြောင့် သင့်အား နေရာတိုင်းနီးပါးတွင် Airflow ကို အသုံးပြုရန် အခွင့်အရေးကို ပေးသည်- ဒေတာစုဆောင်းခြင်း၊ ပြင်ဆင်ခြင်းနှင့် လုပ်ဆောင်ခြင်း သံသရာတစ်ခုလုံးတွင်ပင်၊ ဒုံးပျံလွှတ်တင်ခြင်းတွင်ပင် (အင်္ဂါဂြိုဟ်သို့၊ သင်တန်း)။

အပိုင်းနောက်ဆုံး၊ အကိုးအကားနှင့်အချက်အလက်

မင်းအတွက် ငါတို့စုဆောင်းထားတဲ့ ထွန်တုံး

  • start_date. ဟုတ်တယ်၊ ဒါက ဒေသခံ meme ဖြစ်နေပါပြီ။ Doug ၏အဓိကအငြင်းအခုံမှတဆင့် start_date အားလုံး pass. အတိုချုပ်အားဖြင့် သတ်မှတ်ရလျှင် start_date လက်ရှိရက်စွဲနှင့် schedule_interval - တစ်နေ့၊ ထို့နောက် DAG သည် မနက်ဖြန်တွင် စတင်မည်ဖြစ်သည်။
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ပြီးတော့ ပြဿနာမရှိတော့ဘူး။

    ၎င်းနှင့်ဆက်စပ်နေသော အခြား runtime error တစ်ခုရှိပါသည်- Task is missing the start_date parameterDag အော်ပရေတာနှင့် ချိတ်ရန် မေ့သွားကြောင်း မကြာခဏ ညွှန်ပြသော၊

  • အားလုံးစက်တစ်ခုတည်းမှာ။ ဟုတ်ကဲ့၊ နှင့် အခြေခံများ (Airflow ကိုယ်တိုင်နှင့် ကျွန်ုပ်တို့၏ အပေါ်ယံပိုင်း) နှင့် ဝဘ်ဆာဗာတစ်ခု၊ အချိန်ဇယားဆွဲသူ၊ အလုပ်သမားများ။ ပြီးတော့ အလုပ်ဖြစ်သေးတယ်။ သို့သော် အချိန်ကြာလာသည်နှင့်အမျှ၊ ဝန်ဆောင်မှုများအတွက် လုပ်ဆောင်စရာများ များပြားလာပြီး PostgreSQL သည် အညွှန်းကိန်းကို 20 ms အစား 5 s တွင် စတင်တုံ့ပြန်သောအခါ၊ ကျွန်ုပ်တို့ ၎င်းကိုယူ၍ သယ်ဆောင်သွားခဲ့သည်။
  • LocalExecutor။ ဟုတ်တယ်၊ ငါတို့က အဲဒါကို ထိုင်နေတုန်း၊ ငါတို့ ချောက်ထဲရောက်နေပြီ။ LocalExecutor သည် ယခုအချိန်အထိ ကျွန်ုပ်တို့အတွက် လုံလောက်နေပြီဖြစ်သော်လည်း ယခုအချိန်တွင် အနည်းဆုံး အလုပ်သမားတစ်ဦးနှင့် တိုးချဲ့ရန် အချိန်ကျရောက်ပြီဖြစ်ပြီး CeleryExecutor သို့ ပြောင်းရွှေ့ရန် ကျွန်ုပ်တို့ ကြိုးစားရမည်ဖြစ်ပါသည်။ စက်တစ်ခုတည်းတွင် သင် ၎င်းနှင့်အလုပ်လုပ်နိုင်သောကြောင့်၊ ဆာဗာတစ်ခုပေါ်တွင်ပင် Celery ကိုအသုံးပြုခြင်းကို မည်သည့်အရာကမှ ရပ်တန့်စေမည်မဟုတ်ပေ။
  • အသုံးမပြုပါ။ built-in tools များ:
    • connections ဝန်ဆောင်မှုအထောက်အထားများကို သိမ်းဆည်းရန်၊
    • SLA လွမ်းဆွတ်ခြင်း။ အချိန်မီ မပြီးပြတ်တဲ့ အလုပ်တွေကို တုံ့ပြန်ဖို့၊
    • xcom metadata ဖလှယ်ခြင်းအတွက် (ကျွန်တော်ပြောခဲ့သည်။ metadata!) dag အလုပ်များကြား။
  • မေးလ် အလွဲသုံးစားလုပ်ခြင်း။ ကောင်းပြီ၊ ငါဘာပြောနိုင်မလဲ။ ကျဆင်းသွားသော လုပ်ဆောင်စရာများ ထပ်ခါတလဲလဲ ပြုလုပ်ခြင်းအတွက် သတိပေးချက်များကို ထည့်သွင်းထားပါသည်။ ယခု ကျွန်ုပ်၏အလုပ် Gmail တွင် Airflow မှ အီးမေးလ်ပေါင်း 90k ရှိပြီး၊ web mail muzzle သည် တစ်ကြိမ်လျှင် 100 ကျော်ကို ကောက်ယူပြီး ဖျက်ရန် ငြင်းဆိုထားသည်။

နောက်ထပ် အန္တရာယ်များ- Apache Airflow Pitfails

နောက်ထပ် အလိုအလျောက် ကိရိယာများ

ကျွန်ုပ်တို့သည် ကျွန်ုပ်တို့၏ဦးခေါင်းနှင့်လက်ဖြင့်မဟုတ်ဘဲ ကျွန်ုပ်တို့၏ဦးခေါင်းနှင့်ပို၍အလုပ်လုပ်နိုင်စေရန်အတွက် Airflow သည် ကျွန်ုပ်တို့အတွက် ဤအရာကိုပြင်ဆင်ထားသည်။

  • REST API ကို - သူ့မှာ သမ္ဘာရင့်တဲ့ အနေအထားရှိတုန်းပဲ၊ အလုပ်မလုပ်အောင် တားထားလို့။ ၎င်းနှင့်အတူ၊ သင်သည် dags နှင့် လုပ်ဆောင်စရာများအကြောင်း အချက်အလက်များကိုသာမက လျှပ်တစ်ပြက်တစ်ခုအား စတင်ရန်၊ DAG Run သို့မဟုတ် ရေကူးကန်တစ်ခုကို ဖန်တီးနိုင်သည်။
  • CLI - WebUI မှတစ်ဆင့် အသုံးပြုရန် အဆင်မပြေရုံသာမက ယေဘုယျအားဖြင့် မရှိတော့သည့် ကိရိယာအများအပြားကို အမိန့်ပေးစာကြောင်းမှတစ်ဆင့် ရရှိနိုင်သည်။ ဥပမာအားဖြင့်:
    • backfill လုပ်ငန်းဆောင်တာများကို ပြန်လည်စတင်ရန် လိုအပ်ပါသည်။
      ဥပမာအားဖြင့်၊ လေ့လာသုံးသပ်သူများသည် လာ၍ပြောသည်- “အကိုတို့၊ ဇန်နဝါရီ ၁ ရက်မှ ၁၃ ရက်အထိ ဒေတာများတွင် အဓိပ္ပာယ်မရှိပေ။ ပြင်ပါ၊ ပြင်ပါ၊ ပြင်ပါ၊ ပြင်ပါ။” ပြီးတော့ မင်းက ဒီလို ဝါသနာပါ ၊
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • အခြေခံဝန်ဆောင်မှု- initdb, resetdb, upgradedb, checkdb.
    • run၎င်းသည် သင့်အား instance လုပ်ဆောင်စရာတစ်ခုအား လုပ်ဆောင်နိုင်ပြီး မှီခိုမှုအားလုံးတွင် အမှတ်ရနိုင်သည်။ ထိုမှတပါး၊ သင်သည်၎င်းကိုမှတဆင့် run နိုင်သည်။ LocalExecutorဆလရီအစုအဝေးရှိလျှင်လည်း၊
    • တော်တော် တူတာကို လုပ်တယ်။ testခြေစွပ်မှာလည်း ဘာမှ မရေးဘူး။
    • connections shell မှ ချိတ်ဆက်မှုများကို အစုလိုက်အပြုံလိုက် ဖန်တီးခွင့်ပြုသည်။
  • python api - ပလပ်အင်များအတွက် ရည်ရွယ်ပြီး ၎င်းကို လက်အနည်းငယ်ဖြင့် မွှေနှောက်ခြင်းမပြုဘဲ အပြန်အလှန်ဆက်ဆံခြင်း၏ ခက်ခဲသောနည်းလမ်းဖြစ်သည်။ ဒါပေမယ့် ငါတို့ကို ဘယ်သူက တားမှာလဲ။ /home/airflow/dagsပြေး ipython ရှုပ်ပွနေတော့မလား။ ဥပမာအားဖြင့် သင်သည် အောက်ပါကုဒ်ဖြင့် ဆက်သွယ်မှုအားလုံးကို ထုတ်ယူနိုင်သည်-
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow မက်တာဒေတာဘေ့စ်သို့ ချိတ်ဆက်နေသည်။ ၎င်းကိုစာရေးရန် ကျွန်ုပ်မအကြံပြုထားသော်လည်း အမျိုးမျိုးသောတိကျသောမက်ထရစ်များအတွက် လုပ်ဆောင်စရာအခြေအနေများကိုရယူခြင်းသည် APIs များကိုအသုံးပြုခြင်းထက် များစွာပိုမိုမြန်ဆန်လွယ်ကူပါသည်။

    ကျွန်ုပ်တို့၏အလုပ်များအားလုံးသည် အစွမ်းအစမရှိသော်လည်း တစ်ခါတစ်ရံ ပြုတ်ကျနိုင်ပြီး၊ ၎င်းသည် ပုံမှန်ဖြစ်သည်ဟု ဆိုကြပါစို့။ သို့သော် အချို့သော ပိတ်ဆို့မှုများသည် သံသယဖြစ်ဖွယ်ရှိပြီး စစ်ဆေးရန် လိုအပ်မည်ဖြစ်သည်။

    SQL သတိထားပါ။

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

ကိုးကား

ဟုတ်ပါတယ်၊ Google ကထုတ်ပေးတဲ့ လင့်ခ်ဆယ်ခုဟာ ကျွန်တော်ရဲ့ bookmarks ထဲက Airflow folder ရဲ့ အကြောင်းအရာတွေပါ။

နှင့် ဆောင်းပါးတွင် အသုံးပြုသည့် လင့်ခ်များ

source: www.habr.com

DDoS ကာကွယ်ရေး၊ VPS VDS ဆာဗာများပါသည့် ဆိုက်များအတွက် ယုံကြည်စိတ်ချရသော hosting ကို ဝယ်ယူပါ။ 🔥 DDoS ကာကွယ်မှု၊ VPS VDS ဆာဗာများပါရှိသော ယုံကြည်စိတ်ချရသော ဝဘ်ဆိုက် hosting ကို ဝယ်ယူပါ | ProHoster