Apache Airflow: ETL පහසු කිරීම

හායි, මම Dmitry Logvinenko - Vezet සමාගම් සමූහයේ විශ්ලේෂණ දෙපාර්තමේන්තුවේ දත්ත ඉංජිනේරු.

ETL ක්‍රියාවලි දියුණු කිරීම සඳහා අපූරු මෙවලමක් ගැන මම ඔබට කියමි - Apache Airflow. නමුත් ගුවන් ප්‍රවාහය කෙතරම් බහුකාර්ය සහ බහුවිධද යත්, ඔබ දත්ත ප්‍රවාහයන්ට සම්බන්ධ නොවූවත් ඔබ එය දෙස සමීපව බැලිය යුතුය, නමුත් වරින් වර ඕනෑම ක්‍රියාවලියක් දියත් කිරීමට සහ ඒවා ක්‍රියාත්මක කිරීම නිරීක්ෂණය කිරීමට අවශ්‍ය වේ.

ඔව්, මම කියන්න පමණක් නොව, පෙන්වන්නම්: වැඩසටහනට කේතයන්, තිරපිටපත් සහ නිර්දේශ රාශියක් ඇත.

Apache Airflow: ETL පහසු කිරීම
ඔබ Airflow / Wikimedia Commons යන වචනය ගූගල් කළ විට ඔබ සාමාන්‍යයෙන් දකින දේ

පටුන

හැඳින්වීම

Apache Airflow හරියට Django වගේ:

  • python වලින් ලියා ඇත
  • නියම පරිපාලක මණ්ඩලයක් ඇත,
  • දින නියමයක් නොමැතිව පුළුල් කළ හැකිය

- වඩා හොඳ පමණක් වන අතර එය සම්පූර්ණයෙන්ම වෙනස් අරමුණු සඳහා සාදන ලදී, එනම් (එය කටාට පෙර ලියා ඇති පරිදි):

  • අසීමිත යන්ත්‍ර සංඛ්‍යාවක් මත කාර්යයන් ධාවනය කිරීම සහ අධීක්ෂණය කිරීම (බොහෝ සැල්දිරි / කුබර්නෙට් සහ ඔබේ හෘද සාක්ෂිය ඔබට ඉඩ දෙනු ඇත)
  • පයිතන් කේතය ලිවීමට සහ තේරුම් ගැනීමට ඉතා පහසු සිට ගතික කාර්ය ප්‍රවාහ උත්පාදනය සමඟ
  • සහ සූදානම් කළ සංරචක සහ ගෙදර හැදූ ප්ලගීන (එය අතිශයින්ම සරල) භාවිතා කරමින් ඕනෑම දත්ත සමුදායන් සහ API එකිනෙක සම්බන්ධ කිරීමේ හැකියාව.

අපි මේ ආකාරයට Apache Airflow භාවිතා කරමු:

  • අපි DWH සහ ODS (අපට Vertica සහ Clickhouse ඇත) විවිධ මූලාශ්‍රවලින් (බොහෝ SQL Server සහ PostgreSQL අවස්ථා, යෙදුම් ප්‍රමිතික සහිත විවිධ API, 1C පවා) දත්ත රැස් කරන්නෙමු.
  • කොච්චර දියුණුද කියලා cron, ODS මත දත්ත ඒකාබද්ධ කිරීමේ ක්‍රියාවලීන් ආරම්භ කරන අතර, ඒවායේ නඩත්තුව ද අධීක්ෂණය කරයි.

මෑතක් වන තුරුම, අපගේ අවශ්‍යතා ආවරණය කළේ කෝර් 32 ක් සහ 50 GB RAM සහිත කුඩා සේවාදායකයක් මගිනි. වායු ප්රවාහයේ, මෙය ක්රියා කරයි:

  • более දින 200 ක් (ඇත්ත වශයෙන්ම අපි කාර්යයන් පිරවූ වැඩ ප්‍රවාහයන්),
  • එක් එක් සාමාන්යයෙන් කාර්යයන් 70 ක්,
  • මෙම යහපත්කම ආරම්භ වේ (සාමාන්‍යයෙන් ද) පැයකට වරක්.

අපි පුළුල් කළ ආකාරය ගැන, මම පහත ලියන්නෙමි, නමුත් දැන් අපි විසඳන über ගැටලුව නිර්වචනය කරමු:

මූලාශ්‍ර SQL සේවාදායකයන් තුනක් ඇත, සෑම එකක්ම දත්ත සමුදායන් 50 ක් ඇත - එක් ව්‍යාපෘතියක අවස්ථා, පිළිවෙලින්, ඒවාට එකම ව්‍යුහයක් ඇත (සෑම තැනකම පාහේ, mua-ha-ha), එයින් අදහස් වන්නේ එක් එක් ඇණවුම් වගුවක් ඇති බවයි (වාසනාවකට මෙන්, එය සහිත වගුවක්. නම ඕනෑම ව්යාපාරයකට තල්ලු කළ හැකිය). අපි සේවා ක්ෂේත්‍ර (මූලාශ්‍ර සේවාදායකය, මූලාශ්‍ර දත්ත ගබඩාව, ETL කාර්ය හැඳුනුම්පත) එකතු කිරීමෙන් දත්ත ලබාගෙන ඒවා බොළඳ ලෙස වර්ටිකා තුළට විසි කරමු.

අපි යමු!

ප්රධාන කොටස, ප්රායෝගික (සහ ටිකක් න්යායික)

ඇයි අපි (සහ ඔබ)

ගස් විශාල වූ විට සහ මම සරල විය SQLඑක් රුසියානු සිල්ලර වෙළඳාමක -schik, අපි අපට ලබා ගත හැකි මෙවලම් දෙකක් භාවිතා කරමින් ETL ක්‍රියාවලි හෙවත් දත්ත ප්‍රවාහයන් වංචා කළෙමු:

  • Informatica Power Center - අතිශයින් පැතිරෙන පද්ධතියක්, අතිශයින්ම ඵලදායී, ස්වකීය දෘඩාංග සමග, එහිම අනුවාදනය. මම එහි හැකියාවන්ගෙන් 1% ක් දෙවියන් තහනම් කළෙමි. ඇයි? හොඳයි, පළමුවෙන්ම, මෙම අතුරුමුහුණත, 380 ගණන්වල කොතැනක හෝ මානසිකව අප මත පීඩනයක් ඇති කළේය. දෙවනුව, මෙම contraption නිර්මාණය කර ඇත්තේ අතිශය විචිත්‍රවත් ක්‍රියාවලීන්, කෝපාවිෂ්ඨ සංරචක නැවත භාවිතය සහ අනෙකුත් ඉතා වැදගත්-ව්‍යවසාය-උපක්‍රම සඳහා ය. එයාර්බස් AXNUMX / වසරකට පියාපත් වැනි එහි පිරිවැය ගැන අපි කිසිවක් නොකියමු.

    ප්‍රවේශම් වන්න, තිර රුවක් අවුරුදු 30 ට අඩු අයට ටිකක් රිදවිය හැකිය

    Apache Airflow: ETL පහසු කිරීම

  • SQL Server Integration Server - අපි අපේ අභ්‍යන්තර ව්‍යාපෘති ප්‍රවාහයේදී මෙම සහෝදරයාව භාවිතා කළෙමු. හොඳයි, ඇත්ත වශයෙන්ම: අපි දැනටමත් SQL සේවාදායකය භාවිතා කරන අතර, එහි ETL මෙවලම් භාවිතා නොකිරීම කෙසේ හෝ අසාධාරණ වනු ඇත. එහි ඇති සෑම දෙයක්ම හොඳයි: අතුරු මුහුණත දෙකම ලස්සනයි, සහ ප්‍රගති වාර්තා ... නමුත් අපි මෘදුකාංග නිෂ්පාදන වලට ආදරය කරන්නේ මේ නිසා නොවේ, ඔහ්, මේ සඳහා නොවේ. එය අනුවාදය කරන්න dtsx (සුරකින විට නෝඩ් කලවම් කර ඇති XML යනු) අපට හැක, නමුත් ප්‍රයෝජනය කුමක්ද? එක් සේවාදායකයකින් තවත් මේස සිය ගණනක් ඇදගෙන යා හැකි කාර්ය පැකේජයක් සාදා ගන්නේ කෙසේද? ඔව්, මොන සීයක්ද, මූසික බොත්තම මත ක්ලික් කිරීමෙන් ඔබේ දබරැඟිල්ල කෑලි විස්සකින් වැටෙනු ඇත. නමුත් එය අනිවාර්යයෙන්ම වඩාත් විලාසිතාවෙන් පෙනේ:

    Apache Airflow: ETL පහසු කිරීම

අපි නිසැකවම මග සොයා බැලුවෙමු. නඩුව පවා වාගේ ස්වයං-ලිඛිත SSIS පැකේජ උත්පාදක යන්ත්රයක් වෙත පැමිණියේය ...

…ඊට පස්සේ මට අලුත් වැඩක් හම්බ වුණා. සහ Apache Airflow එය මත මා අභිබවා ගියේය.

ETL ක්‍රියාවලි විස්තර සරල පයිතන් කේතයක් බව දැනගත් විට, මම සතුටට නැටුවේ නැත. දත්ත ප්‍රවාහයන් අනුවාදනය කර වෙනස් කරන ලද අතර, දත්ත සමුදායන් සිය ගණනකින් තනි ව්‍යුහයක් සහිත වගු එක් ඉලක්කයකට වත් කිරීම තිර එකහමාරක හෝ 13 ”XNUMX ක පයිතන් කේතයක් බවට පත් විය.

පොකුර එකලස් කිරීම

අපි සම්පුර්ණයෙන්ම ළදරු පාසලක් සංවිධානය නොකර, ගුවන් ප්‍රවාහය ස්ථාපනය කිරීම, ඔබ තෝරාගත් දත්ත ගබඩාව, සැල්දිරි සහ තටාකවල විස්තර කර ඇති වෙනත් අවස්ථා වැනි සම්පූර්ණයෙන්ම පැහැදිලි දේවල් ගැන මෙහි කතා නොකරමු.

අපි වහාම අත්හදා බැලීම් ආරම්භ කළ හැකි වන පරිදි, මම සිතුවම් කළෙමි docker-compose.yml එහි:

  • අපි ඇත්තටම උස්සමු වායු දහරාව: Scheduler, Webserver. සැල්දිරි කාර්යයන් නිරීක්ෂණය කිරීම සඳහා මල් ද එහි කැරකෙනු ඇත (එය දැනටමත් තල්ලු කර ඇති නිසා apache/airflow:1.10.10-python3.7, නමුත් අපට කමක් නැහැ)
  • PostgreSQL, එහි Airflow එහි සේවා තොරතුරු (කාලසටහන් දත්ත, ක්රියාත්මක කිරීමේ සංඛ්යා ලේඛන, ආදිය) ලියන අතර, Celery විසින් සම්පූර්ණ කරන ලද කාර්යයන් සලකුණු කරනු ඇත;
  • Redis, සැල්දිරි සඳහා කාර්ය තැරැව්කරුවකු ලෙස කටයුතු කරනු ඇත;
  • සැල්දිරි සේවකයා, කර්තව්යයන් සෘජුව ක්රියාත්මක කිරීමෙහි නිරත වනු ඇත.
  • ෆෝල්ඩරයට ./dags අපි dags විස්තරය සමඟ අපගේ ගොනු එකතු කරන්නෙමු. ඔවුන් පියාසර කරන විට රැගෙන යනු ඇත, එබැවින් එක් එක් කිවිසුම් යාමෙන් පසු මුළු තොගයම හසුරුවන්න අවශ්ය නොවේ.

සමහර ස්ථානවල, උදාහරණවල කේතය සම්පූර්ණයෙන්ම නොපෙන්වයි (පෙළ අවුල් නොකිරීමට), නමුත් යම් තැනක එය ක්‍රියාවලියේදී වෙනස් කර ඇත. සම්පූර්ණ ක්‍රියාකාරී කේත උදාහරණ ගබඩාවෙන් සොයාගත හැකිය https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Примечания:

  • සංයුතියේ එකලස් කිරීමේදී, මම බොහෝ දුරට සුප්රසිද්ධ රූපය මත රඳා පැවතුනි puckel / docker-වායු ප්රවාහය - එය පරීක්ෂා කිරීමට වග බලා ගන්න. සමහර විට ඔබට ඔබේ ජීවිතයට වෙනත් කිසිවක් අවශ්‍ය නොවනු ඇත.
  • සියලුම වායු ප්රවාහ සැකසුම් ලබා ගත හැක්කේ හරහා පමණක් නොවේ airflow.cfg, නමුත් පරිසර විචල්‍යයන් හරහා (සංවර්ධකයින්ට ස්තූතියි), මම ද්වේෂ සහගත ලෙස ප්‍රයෝජන ගත්තා.
  • ස්වාභාවිකවම, එය නිෂ්පාදනයට සූදානම් නැත: මම හිතාමතාම කන්ටේනර් මත හෘද ස්පන්දන තැබුවේ නැත, මම ආරක්ෂාව ගැන කරදර වූයේ නැත. නමුත් මම අපේ අත්හදා බැලීම් කරන්නන්ට සුදුසු අවමය කළා.
  • එය සටහන් කර ගන්න:
    • ඩැග් ෆෝල්ඩරය කාලසටහන් කරන්නාට සහ කම්කරුවන්ට ප්‍රවේශ විය යුතුය.
    • සියලුම තෙවන පාර්ශවීය පුස්තකාල සඳහාද මෙය අදාළ වේ - ඒවා සියල්ලම උපලේඛකයක් සහ කම්කරුවන් සහිත යන්ත්‍රවල ස්ථාපනය කළ යුතුය.

හොඳයි, දැන් එය සරලයි:

$ docker-compose up --scale worker=3

සෑම දෙයක්ම ඉහළ ගිය පසු, ඔබට වෙබ් අතුරු මුහුණත් දෙස බැලිය හැකිය:

මූලික සංකල්ප

මෙම සියලු "dags" තුළ ඔබට කිසිවක් තේරුණේ නැත්නම්, මෙන්න කෙටි ශබ්දකෝෂයක්:

  • උපලේඛකයා - ගුවන් ප්‍රවාහයේ වැදගත්ම මාමා, රොබෝවරු වෙහෙස මහන්සි වී වැඩ කරන බව පාලනය කිරීම මිස පුද්ගලයෙකු නොවේ: කාලසටහන නිරීක්ෂණය කරයි, ඩැග්ස් යාවත්කාලීන කරයි, කාර්යයන් දියත් කරයි.

    පොදුවේ ගත් කල, පැරණි අනුවාද වලදී, ඔහුට මතකයේ ගැටළු ඇති විය (නැත, ඇම්නේෂියාව නොව කාන්දුවීම්) සහ උරුම පරාමිතිය වින්‍යාසය තුළ පවා පැවතුනි. run_duration - එහි නැවත ආරම්භ පරතරය. නමුත් දැන් සියල්ල හොඳින්.

  • ඩැග් (aka "dag") - "directed acyclic graph", නමුත් එවැනි නිර්වචනයක් කිහිප දෙනෙකුට කියනු ඇත, නමුත් ඇත්ත වශයෙන්ම එය එකිනෙකා සමඟ අන්තර් ක්‍රියා කරන කාර්යයන් සඳහා බහාලුමක් වේ (පහත බලන්න) හෝ SSIS හි පැකේජයේ ප්‍රතිසමයක් සහ Informatica හි කාර්ය ප්‍රවාහයකි .

    ඩැග් වලට අමතරව, තවමත් උප-ඩැග් තිබිය හැක, නමුත් අපි බොහෝ විට ඒවා වෙත නොයනු ඇත.

  • DAG ධාවනය - ආරම්භක දාගය, එය තමන්ගේම පවරා ඇත execution_date. එකම දාගයේ ඩැග්‍රන්ට සමාන්තරව ක්‍රියා කළ හැකිය (ඔබ ඔබේ කාර්යයන් දුර්වල කර ඇත්නම්, ඇත්ත වශයෙන්ම).
  • ක්රියාකරු නිශ්චිත ක්‍රියාවක් සිදු කිරීම සඳහා වගකිව යුතු කේත කොටස් වේ. ක්රියාකරුවන් වර්ග තුනක් ඇත:
    • කටයුතුඅපේ ප්රියතම වගේ PythonOperator, ඕනෑම (වලංගු) පයිතන් කේතයක් ක්‍රියාත්මක කළ හැකි;
    • මාරු කිරීම, තැනින් තැනට දත්ත ප්‍රවාහනය කරන, කියන්න, MsSqlToHiveTransfer;
    • සංවේදකය අනෙක් අතට, එය සිදුවීමක් සිදු වන තුරු ප්‍රතික්‍රියා කිරීමට හෝ ඩැග් තවදුරටත් ක්‍රියාත්මක කිරීම මන්දගාමී කිරීමට ඔබට ඉඩ සලසයි. HttpSensor නිශ්චිත අන්ත ලක්ෂ්‍යය ඇද ගත හැකි අතර, අපේක්ෂිත ප්‍රතිචාරය බලා සිටින විට, මාරු කිරීම ආරම්භ කරන්න GoogleCloudStorageToS3Operator. විමසිලිමත් මනසක් අසනු ඇත: "ඇයි? සියල්ලට පසු, ඔබට ක්‍රියාකරු තුළම පුනරාවර්තන කළ හැකිය! ඉන්පසුව, අත්හිටුවන ලද ක්රියාකරුවන් සමඟ කාර්යයන් සංචිතය අවහිර නොකිරීමට. ඊළඟ උත්සාහයට පෙර සංවේදකය ආරම්භ වේ, පරීක්ෂා කර මිය යයි.
  • කාර්ය - ප්‍රකාශිත ක්‍රියාකරුවන්, වර්ගය කුමක් වුවත්, සහ දාගයට අනුයුක්ත කර ඇති කාර්ය භාරයට උසස් කරනු ලැබේ.
  • කාර්ය අවස්ථාව - සාමාන්‍ය සැලසුම්කරු තීරණය කළ විට, කාර්ය සාධනය-සේවකයින් මත සටනට කාර්යයන් යැවීමට කාලය පැමිණ ඇති බව (අපි භාවිතා කරන්නේ නම් එම ස්ථානයේදීම) LocalExecutor හෝ නඩුවේ දුරස්ථ නෝඩයකට CeleryExecutor), එය ඔවුන්ට සන්දර්භයක් පවරයි (එනම්, විචල්‍ය සමූහයක් - ක්‍රියාත්මක කිරීමේ පරාමිති), විධාන හෝ විමසුම් සැකිලි පුළුල් කර ඒවා සංචිත කරයි.

අපි කාර්යයන් උත්පාදනය කරමු

පළමුව, අපි අපගේ ඩෝගයේ සාමාන්‍ය යෝජනා ක්‍රමය ගෙනහැර දක්වමු, ඉන්පසු අපි සුළු නොවන විසඳුම් කිහිපයක් යොදන නිසා අපි වැඩි වැඩියෙන් විස්තර වෙත කිමිදෙමු.

එබැවින්, එහි සරලම ස්වරූපයෙන්, එවැනි දාගයක් මේ ආකාරයෙන් පෙනෙනු ඇත:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

අපි එය තේරුම් ගනිමු:

  • පළමුව, අපි අවශ්ය ලිබ් සහ ආනයනය කරමු වෙන මොනවා හරි;
  • sql_server_ds - මෙය List[namedtuple[str, str]] වායු ප්‍රවාහ සම්බන්ධතා වලින් සම්බන්ධතා වල නම් සහ අපි අපගේ තහඩුව ලබා ගන්නා දත්ත සමුදායන් සමඟ;
  • dag - අනිවාර්යයෙන්ම තිබිය යුතු අපගේ ඩැග්ගේ නිවේදනය globals(), එසේ නොමැතිනම් Airflow එය සොයා නොගනු ඇත. ඩග් මෙසේද පැවසිය යුතුය.
    • ඔහුගේ නම කුමක්ද? orders - මෙම නම පසුව වෙබ් අතුරු මුහුණතෙහි දිස්වනු ඇත,
    • ඔහු ජූලි අටවැනිදා මධ්‍යම රාත්‍රියේ සිට වැඩ කරන බව,
    • සහ එය ආසන්න වශයෙන් සෑම පැය 6කට වරක් ක්‍රියාත්මක විය යුතුය (දැඩි මිනිසුන් සඳහා මෙහි වෙනුවට timedelta() පිළිගත හැකි cron- රේඛාව 0 0 0/6 ? * * *, අඩු සිසිල් සඳහා - වැනි ප්රකාශනයක් @daily);
  • workflow() ප්රධාන කාර්යය ඉටු කරනු ඇත, නමුත් දැන් නොවේ. දැනට, අපි අපගේ සන්දර්භය ලොගයට දමන්නෙමු.
  • දැන් කාර්යයන් නිර්මාණය කිරීමේ සරල මැජික්:
    • අපි අපගේ මූලාශ්‍ර හරහා දුවමු;
    • ආරම්භ කරන්න PythonOperator, අපගේ ව්යාජය ක්රියාත්මක කරනු ඇත workflow(). කාර්යයේ අද්විතීය (ඩැග් තුළ) නමක් සඳහන් කිරීමට අමතක නොකරන්න සහ දාරයම බැඳ තබන්න. ධජ provide_context අනෙක් අතට, ශ්‍රිතයට අමතර තර්ක වත් කරනු ඇත, එය අපි ප්‍රවේශමෙන් එකතු කරමු **context.

දැනට නම් එච්චරයි. අපට ලැබුණු දේ:

  • වෙබ් අතුරු මුහුණතේ නව dag,
  • සමාන්තරව ක්‍රියාත්මක වන කාර්යයන් එකහමාරක් (වායු ප්‍රවාහය, සැල්දිරි සැකසුම් සහ සේවාදායක ධාරිතාව එයට ඉඩ දෙන්නේ නම්).

හොඳයි, එය පාහේ ලැබුණා.

Apache Airflow: ETL පහසු කිරීම
පරායත්තතා ස්ථාපනය කරන්නේ කවුද?

මේ සියල්ල සරල කිරීම සඳහා, මම ඇණ ගැසුවෙමි docker-compose.yml සැකසීම requirements.txt සියලුම නෝඩ් මත.

දැන් එය නැති වී ඇත:

Apache Airflow: ETL පහසු කිරීම

අළු කොටු යනු උපලේඛකයා විසින් සකසන ලද කාර්ය අවස්ථා වේ.

අපි ටිකක් බලා සිටිමු, සේවකයින් විසින් කාර්යයන් කපා හරිනු ලැබේ:

Apache Airflow: ETL පහසු කිරීම

හරිතයන්, ඇත්ත වශයෙන්ම, ඔවුන්ගේ කාර්යය සාර්ථකව නිම කර ඇත. රතු ඉතා සාර්ථක නොවේ.

මාර්ගය වන විට, අපගේ නිෂ්පාදනයේ ෆෝල්ඩරයක් නොමැත ./dags, යන්ත්‍ර අතර සමමුහුර්ත වීමක් නොමැත - සියලුම දාගයන් ඇත git අපගේ Gitlab මත, සහ Gitlab CI ඒකාබද්ධ වන විට යන්ත්‍ර වෙත යාවත්කාලීන බෙදා දෙයි master.

මල් ගැන ටිකක්

කම්කරුවන් අපගේ පැසිෆියර්වලට පහර දෙන අතර, අපට යමක් පෙන්විය හැකි තවත් මෙවලමක් සිහිපත් කරමු - මල්.

සේවක නෝඩ් පිළිබඳ සාරාංශ තොරතුරු සහිත පළමු පිටුව:

Apache Airflow: ETL පහසු කිරීම

වැඩ කිරීමට ගිය කාර්යයන් සහිත වඩාත් තීව්‍ර පිටුව:

Apache Airflow: ETL පහසු කිරීම

අපගේ තැරැව්කරුගේ තත්ත්වය සහිත වඩාත්ම නීරස පිටුව:

Apache Airflow: ETL පහසු කිරීම

දීප්තිමත්ම පිටුව කාර්ය තත්ත්‍ව ප්‍රස්ථාර සහ ඒවා ක්‍රියාත්මක කිරීමේ කාලය සමඟ වේ:

Apache Airflow: ETL පහසු කිරීම

අපි යට පටවන ලද ඒවා පටවන්නෙමු

එබැවින්, සියලු කාර්යයන් සාර්ථක වී ඇත, ඔබට තුවාලකරුවන් රැගෙන යා හැකිය.

Apache Airflow: ETL පහසු කිරීම

තුවාල ලැබූ බොහෝ දෙනෙක් සිටියහ - එක් හේතුවක් හෝ වෙනත් හේතුවක් නිසා. වායු ප්‍රවාහය නිවැරදිව භාවිතා කිරීමේදී, මෙම වර්ග වලින් පෙන්නුම් කරන්නේ දත්ත නියත වශයෙන්ම නොපැමිණි බවයි.

ඔබ ලොගය නැරඹිය යුතු අතර වැටුණු කාර්ය අවස්ථා නැවත ආරම්භ කළ යුතුය.

ඕනෑම චතුරස්රයක් මත ක්ලික් කිරීමෙන්, අපට ලබා ගත හැකි ක්‍රියා අපි දකිමු:

Apache Airflow: ETL පහසු කිරීම

ඔබට ගෙන ගොස් වැටී ඇති දේ ඉවත් කළ හැකිය. එනම්, එහි යම් දෙයක් අසාර්ථක වී ඇති බව අපට අමතක වන අතර, එම අවස්ථාවෙහි කාර්යය උපලේඛකයා වෙත යයි.

Apache Airflow: ETL පහසු කිරීම

සියලුම රතු කොටු සහිත මූසිකය සමඟ මෙය කිරීම එතරම් මානුෂීය නොවන බව පැහැදිලිය - මෙය වාතය ප්‍රවාහයෙන් අප අපේක්ෂා කරන්නේ නැත. ස්වාභාවිකවම, අපට මහා විනාශකාරී ආයුධ තිබේ: Browse/Task Instances

Apache Airflow: ETL පහසු කිරීම

අපි සියල්ල එකවර තෝරා ශුන්‍යයට යළි පිහිටුවන්න, නිවැරදි අයිතමය ක්ලික් කරන්න:

Apache Airflow: ETL පහසු කිරීම

පිරිසිදු කිරීමෙන් පසු, අපගේ කුලී රථ මේ ආකාරයට පෙනේ (ඔවුන් දැනටමත් කාලසටහන් කරන්නා ඒවා උපලේඛනගත කරන තෙක් බලා සිටී):

Apache Airflow: ETL පහසු කිරීම

සම්බන්ධතා, කොකු සහ අනෙකුත් විචල්යයන්

ඊළඟ DAG දෙස බැලීමට කාලයයි, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

සෑම කෙනෙකුම වාර්තා යාවත්කාලීන කිරීමක් කර තිබේද? මෙය නැවතත් ඇයයි: දත්ත ලබා ගත හැකි මූලාශ්ර ලැයිස්තුවක් තිබේ; තැබිය යුතු ලැයිස්තුවක් තිබේ; සෑම දෙයක්ම සිදු වූ විට හෝ කැඩී ගිය විට හෝන් කිරීමට අමතක නොකරන්න (හොඳයි, මෙය අප ගැන නොවේ, නැත).

අපි නැවතත් ගොනුව හරහා ගොස් නව අපැහැදිලි දේවල් දෙස බලමු:

  • from commons.operators import TelegramBotSendMessage - අවහිර නොකළ වෙත පණිවිඩ යැවීම සඳහා කුඩා දවටනයක් සෑදීමෙන් අපි ප්‍රයෝජන ගත් අපගේම ක්‍රියාකරුවන් සෑදීමෙන් කිසිවක් අපට වළක්වන්නේ නැත. (අපි මෙම ක්රියාකරු ගැන වැඩි විස්තර පහතින් කතා කරමු);
  • default_args={} - dag හට එහි සියලුම ක්‍රියාකරුවන්ට එකම තර්ක බෙදා හැරිය හැක;
  • to='{{ var.value.all_the_kings_men }}' - ක්ෂේත්රය to අපි දෘඪ කේත නොකරනු ඇත, නමුත් ජින්ජා සහ ඊමේල් ලැයිස්තුවක් සහිත විචල්‍යයක් භාවිතයෙන් ගතිකව ජනනය කරනු ලැබේ, මම එය ප්‍රවේශමෙන් තැබුවෙමි Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ක්රියාකරු ආරම්භ කිරීම සඳහා කොන්දේසිය. අපගේ නඩුවේදී, ලිපිය ලොක්කන් වෙත පියාසර කරනු ලබන්නේ සියලු යැපීම් ක්‍රියාත්මක වී ඇත්නම් පමණි සාර්ථකව;
  • tg_bot_conn_id='tg_main' - තර්ක conn_id අපි නිර්මාණය කරන සම්බන්ධතා හැඳුනුම්පත් පිළිගන්න Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ටෙලිග්‍රාම් හි පණිවිඩ ඉවතට පියාසර කරන්නේ වැටී ඇති කාර්යයන් තිබේ නම් පමණි;
  • task_concurrency=1 - එක් කාර්යයක කාර්ය අවස්ථා කිහිපයක් එකවර දියත් කිරීම අපි තහනම් කරමු. එසේ නොමැතිනම්, අපට එකවර කිහිපයක් දියත් කිරීම ලැබෙනු ඇත VerticaOperator (එක් මේසයක් දෙස බැලීම);
  • report_update >> [email, tg] - සෑම VerticaOperator මෙවැනි ලිපි සහ පණිවිඩ යැවීමේදී අභිසාරී වන්න:
    Apache Airflow: ETL පහසු කිරීම

    නමුත් දැනුම්දීමේ ක්‍රියාකරුවන්ට විවිධ දියත් කිරීමේ කොන්දේසි ඇති බැවින්, එකක් පමණක් ක්‍රියා කරයි. Tree View හි, සෑම දෙයක්ම ටිකක් අඩු දෘශ්‍ය ලෙස පෙනේ:
    Apache Airflow: ETL පහසු කිරීම

මම ගැන වචන කිහිපයක් කියන්නම් මැක්රෝස් සහ ඔවුන්ගේ මිතුරන් - විචල්යයන්.

මැක්‍රෝස් යනු විවිධ ප්‍රයෝජනවත් තොරතුරු ක්‍රියාකරු තර්කවලට ආදේශ කළ හැකි ජින්ජා ස්ථාන දරණ වේ. උදාහරණයක් ලෙස, මේ වගේ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} සන්දර්භය විචල්‍යයේ අන්තර්ගතය දක්වා ව්‍යාප්ත වනු ඇත execution_date ආකෘතියෙන් YYYY-MM-DD: 2020-07-14. හොඳම කොටස නම් සන්දර්භය විචල්‍යයන් නිශ්චිත කාර්ය අවස්ථාවකට (ගස දසුනෙහි චතුරස්‍රයක්) ඇණ ගැසීම සහ නැවත ආරම්භ කළ විට, ස්ථාන දරන්නන් එකම අගයන් දක්වා විහිදේ.

එක් එක් කාර්ය අවස්ථාවෙහි Rendered බොත්තම භාවිතයෙන් පවරා ඇති අගයන් බැලිය හැක. ලිපියක් යැවීමේ කාර්යය පහත පරිදි වේ:

Apache Airflow: ETL පහසු කිරීම

පණිවිඩයක් යැවීමේ කාර්යයේදී:

Apache Airflow: ETL පහසු කිරීම

පවතින නවතම අනුවාදය සඳහා ගොඩනඟන ලද මැක්‍රෝ වල සම්පූර්ණ ලැයිස්තුවක් මෙහි ඇත: macros යොමුව

එපමණක් නොව, ප්ලගීන ආධාරයෙන්, අපට අපගේම මැක්රෝස් ප්රකාශ කළ හැකිය, නමුත් එය තවත් කතාවකි.

පූර්ව නිශ්චිත දේවල් වලට අමතරව, අපට අපගේ විචල්‍යවල අගයන් ආදේශ කළ හැකිය (මම මෙය දැනටමත් ඉහත කේතයේ භාවිතා කර ඇත). අපි නිර්මාණය කරමු Admin/Variables කරුණු කිහිපයක්:

Apache Airflow: ETL පහසු කිරීම

ඔබට භාවිතා කළ හැකි සියල්ල:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

අගය අදිශයක් විය හැක, නැතහොත් එය JSON ද විය හැක. JSON නම්:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

අපේක්ෂිත යතුර සඳහා මාර්ගය භාවිතා කරන්න: {{ var.json.bot_config.bot.token }}.

මම වචනාර්ථයෙන් එක වචනයක් කියන්නම් සහ එක තිර රුවක් පෙන්වන්නම් සම්බන්ධතා. මෙහි සෑම දෙයක්ම මූලික වේ: පිටුවේ Admin/Connections අපි සම්බන්ධතාවයක් සාදා, අපගේ පිවිසුම් / මුරපද සහ වඩාත් නිශ්චිත පරාමිති එහි එක් කරන්න. මෙවැනි:

Apache Airflow: ETL පහසු කිරීම

මුරපද සංකේතනය කළ හැක (පෙරනිමියට වඩා හොඳින්), නැතහොත් ඔබට සම්බන්ධතා වර්ගය අත්හැරිය හැක (මා කළ පරිදි tg_main) - සත්‍යය නම්, වර්ග ලැයිස්තුව වායු ප්‍රවාහ මාදිලිවල දෘඩ රැහැන්ගත කර ඇති අතර ප්‍රභව කේතවලට ඇතුළු නොවී පුළුල් කළ නොහැක (හදිසියේම මම යමක් ගූගල් නොකළේ නම්, කරුණාකර මාව නිවැරදි කරන්න), නමුත් කිසිවක් අපට ණය ලබා ගැනීමෙන් වළක්වන්නේ නැත. නම.

ඔබට එකම නම සමඟ සම්බන්ධතා කිහිපයක් ද කළ හැකිය: මෙම අවස්ථාවේදී, ක්රමය BaseHook.get_connection(), නමින් අපට සම්බන්ධතා ලබා දෙන, ලබා දෙනු ඇත අහඹු නාමික කිහිපයකින් (රවුන්ඩ් රොබින් සෑදීම වඩාත් තාර්කික වනු ඇත, නමුත් අපි එය එයාර්ෆ්ලෝ සංවර්ධකයින්ගේ හෘදසාක්ෂිය මත තබමු).

විචල්‍යයන් සහ සම්බන්ධතා නිසැකවම සිසිල් මෙවලම් වේ, නමුත් ශේෂය නැති කර නොගැනීම වැදගත් වේ: ඔබ කේතය තුළම ගබඩා කරන ඔබේ ප්‍රවාහවල කොටස් සහ ගබඩා කිරීම සඳහා ඔබ වායු ප්‍රවාහයට ලබා දෙන කොටස්. එක් අතකින්, UI හරහා අගය ඉක්මනින් වෙනස් කිරීම පහසු විය හැකිය, උදාහරණයක් ලෙස, තැපැල් පෙට්ටියක්. අනෙක් අතට, මෙය තවමත් මූසික ක්ලික් වෙත ආපසු යාමකි, එයින් අපට (මම) ඉවත් කිරීමට අවශ්‍ය විය.

සම්බන්ධතා සමඟ වැඩ කිරීම එක් කාර්යයකි කොකු. සාමාන්‍යයෙන්, වායු ප්‍රවාහ කොකු එය තෙවන පාර්ශවීය සේවා සහ පුස්තකාල වෙත සම්බන්ධ කිරීම සඳහා ලකුණු වේ. උදා, JiraHook අපට ජිරා සමඟ අන්තර් ක්‍රියා කිරීමට සේවාදායකයෙකු විවෘත කරනු ඇත (ඔබට කාර්යයන් එහාට මෙහාට ගෙන යා හැක), සහ සහාය ඇතිව SambaHook ඔබට දේශීය ගොනුවක් තල්ලු කළ හැකිය smb- ලක්ෂ්යය.

අභිරුචි ක්‍රියාකරු විග්‍රහ කිරීම

ඒවගේම අපි ඒක හදපු හැටි බලන්න ළං වුණා TelegramBotSendMessage

කේතය commons/operators.py සැබෑ ක්රියාකරු සමඟ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

මෙන්න, වායු ප්‍රවාහයේ අනෙක් සියල්ල මෙන්, සියල්ල ඉතා සරල ය:

  • උරුම වී ඇත BaseOperator, එය ගුවන් ප්‍රවාහ-විශේෂිත දේවල් කිහිපයක් ක්‍රියාත්මක කරයි (ඔබේ විවේකය දෙස බලන්න)
  • ප්‍රකාශිත ක්ෂේත්‍ර template_fields, ජින්ජා ක්‍රියාවට නැංවීම සඳහා මැක්‍රෝ සොයනු ඇත.
  • සඳහා නිවැරදි තර්ක සකස් කර ඇත __init__(), අවශ්‍ය තැන්වල පෙරනිමි සකසන්න.
  • මුතුන් මිත්තන් ආරම්භ කිරීම ගැන අපි අමතක නොකළෙමු.
  • අනුරූප කොක්ක විවෘත කළා TelegramBotHookඑයින් සේවාදායක වස්තුවක් ලැබුණි.
  • අතිච්ඡාදනය වූ (නැවත අර්ථ දක්වා ඇති) ක්‍රමය BaseOperator.execute(), ක්‍රියාකරු දියත් කිරීමට කාලය පැමිණි විට කුමන Airfow ඇඹරෙනු ඇත - එහි අපි ප්‍රධාන ක්‍රියාව ක්‍රියාත්මක කරන්නෙමු, ලොග් වීමට අමතක කරමු. (අපි ලොග් වෙමු, මාර්ගය වන විට, කෙලින්ම ඇතුලට stdout и stderr - වායු ප්‍රවාහය සියල්ලට බාධා කරයි, අලංකාර ලෙස ඔතා, අවශ්‍ය තැන දිරාපත් කරයි.)

අපි බලමු මොනවද තියෙන්නේ කියලා commons/hooks.py. ගොනුවේ පළමු කොටස, කොක්කෙන්ම:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

මෙහි පැහැදිලි කළ යුත්තේ කුමක්දැයි මම නොදනිමි, මම වැදගත් කරුණු පමණක් සටහන් කරමි:

  • අපට උරුමයි, තර්ක ගැන සිතන්න - බොහෝ අවස්ථාවලදී එය එකක් වනු ඇත: conn_id;
  • සම්මත ක්‍රම අභිබවා යාම: මම මා සීමා කළෙමි get_conn(), මම නමෙන් සම්බන්ධතා පරාමිතීන් ලබා ගන්නා අතර කොටස ලබා ගන්න extra (මෙය JSON ක්ෂේත්‍රයකි), එහි මම (මගේම උපදෙස් අනුව!) Telegram bot ටෝකනය තැබුවෙමි: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • මම අපේ උදාහරණයක් නිර්මාණය කරමි TelegramBot, එය නිශ්චිත සංකේතයක් ලබා දීම.

එච්චරයි. භාවිතා කරන කොක්කකින් ඔබට සේවාදායකයෙකු ලබා ගත හැකිය TelegramBotHook().clent හෝ TelegramBotHook().get_conn().

ගොනුවේ දෙවන කොටස, මම ටෙලිග්‍රාම් REST API සඳහා මයික්‍රෝ දවර්පර් එකක් සාදන අතර, එයම ඇදගෙන නොයන්න python-telegram-bot එක් ක්රමයක් සඳහා sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

නිවැරදි ක්‍රමය නම් සියල්ල එකතු කිරීමයි. TelegramBotSendMessage, TelegramBotHook, TelegramBot - ප්ලගිනය තුළ, පොදු ගබඩාවක තබා, එය විවෘත මූලාශ්‍රය වෙත ලබා දෙන්න.

අපි මේ සියල්ල අධ්‍යයනය කරමින් සිටියදී, අපගේ වාර්තා යාවත්කාලීන කිරීම් සාර්ථක ලෙස අසාර්ථක වී මට නාලිකාවේ දෝෂ පණිවිඩයක් යැවීමට සමත් විය. මම බලන්නම් ඒක වැරදිද කියලා...

Apache Airflow: ETL පහසු කිරීම
අපේ බල්ලා තුළ යමක් කැඩී ගියේය! අපි බලාපොරොත්තු වූ දේ එය නොවේද? හරියටම!

ඔබ වත් කිරීමට යන්නේ?

මට යමක් මග හැරී ඇති බව ඔබට හැඟෙනවාද? SQL සේවාදායකයේ සිට Vertica වෙත දත්ත මාරු කිරීමට ඔහු පොරොන්දු වූ බව පෙනේ, පසුව ඔහු එය ගෙන මාතෘකාවෙන් ඉවත් විය, අපතයා!

මෙම සාහසික ක්‍රියාව හිතාමතා කළ දෙයක්, මට ඔබ වෙනුවෙන් පාරිභාෂික වචන කිහිපයක් තේරුම් ගැනීමට සිදු විය. දැන් ඔබට තවත් ඉදිරියට යා හැකිය.

අපගේ සැලැස්ම මෙසේ විය.

  1. ඩග් කරන්න
  2. කාර්යයන් උත්පාදනය කරන්න
  3. බලන්න හැම දෙයක්ම කොච්චර ලස්සනද කියලා
  4. පිරවීම සඳහා සැසි අංක පවරන්න
  5. SQL සේවාදායකයෙන් දත්ත ලබා ගන්න
  6. දත්ත Vertica වෙත දමන්න
  7. සංඛ්යා ලේඛන එකතු කරන්න

ඉතින්, මේ සියල්ල ක්‍රියාත්මක වීමට, මම අපේ එකට කුඩා එකතු කිරීමක් කළෙමි docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

එහිදී අපි මතු කරන්නේ:

  • සත්කාරක ලෙස Vertica dwh වඩාත්ම පෙරනිමි සැකසුම් සමඟ,
  • SQL සේවාදායකයේ අවස්ථා තුනක්,
  • අපි දෙවැන්නෙහි දත්ත සමුදායන් සමහර දත්ත වලින් පුරවන්නෙමු (කිසිම අවස්ථාවකදී සොයා නොබලන්න mssql_init.py!)

පසුගිය වතාවට වඩා තරමක් සංකීර්ණ විධානයක ආධාරයෙන් අපි සියලු යහපත් දේ දියත් කරමු:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

අපගේ ආශ්චර්ය සසම්භාවීකාරකය ජනනය කළ දේ, ඔබට අයිතමය භාවිතා කළ හැකිය Data Profiling/Ad Hoc Query:

Apache Airflow: ETL පහසු කිරීම
ප්රධාන දෙය නම් එය විශ්ලේෂකයින්ට පෙන්වීම නොවේ

විස්තර කරන්න ETL සැසි මම එසේ නොකරමි, එහි සෑම දෙයක්ම සුළුපටු නොවේ: අපි පදනමක් සාදන්නෙමු, එහි ලකුණක් තිබේ, අපි සියල්ල සන්දර්භය කළමණාකරුවෙකු සමඟ ඔතා, දැන් අපි මෙය කරමු:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

කාලය පැමිණ ඇත අපගේ දත්ත රැස් කරන්න අපේ මේස එකහමාරකින්. ඉතා අව්‍යාජ රේඛා ආධාරයෙන් අපි මෙය කරමු:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. කොක්කක් ආධාරයෙන් අපි Airflow වෙතින් ලබා ගනිමු pymssql- සම්බන්ධ කරන්න
  2. ඉල්ලීමට දිනයක් ආකාරයෙන් සීමාවක් ආදේශ කරමු - එය අච්චු එන්ජිම මඟින් ශ්‍රිතයට දමනු ඇත.
  3. අපගේ ඉල්ලීම පෝෂණය කිරීම pandasකවුද අපිව ගන්නේ DataFrame - එය අනාගතයේදී අපට ප්රයෝජනවත් වනු ඇත.

මම ආදේශනය භාවිතා කරමි {dt} ඉල්ලීම් පරාමිතියක් වෙනුවට %s මම නපුරු පිනෝචියෝ නිසා නොව, ඒ නිසා pandas හැසිරවිය නොහැක pymssql සහ අන්තිම එක ලිස්සා යයි params: Listඔහුට ඇත්තටම අවශ්ය වුවද tuple.
සංවර්ධකයා බව ද සලකන්න pymssql ඔහුට තවදුරටත් සහාය නොදීමට තීරණය කළ අතර, පිටතට යාමට කාලයයි pyodbc.

වායු ප්‍රවාහය අපගේ ක්‍රියාකාරකම්වල තර්ක පුරවා ඇත්තේ කුමක් දැයි බලමු:

Apache Airflow: ETL පහසු කිරීම

දත්ත නොමැති නම්, දිගටම කරගෙන යාමේ තේරුමක් නැත. එහෙත් පිරවීම සාර්ථක යැයි සැලකීම ද අරුමයකි. නමුත් මෙය වරදක් නොවේ. ආහ්, මොනවා කරන්නද?! සහ මෙන්න මේ දේ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException දෝෂ නොමැති බව Airflow හට කියයි, නමුත් අපි කාර්යය මඟහරිමු. අතුරු මුහුණතට කොළ හෝ රතු චතුරස්රයක් නොව රෝස පැහැයක් ඇත.

අපි අපේ දත්ත විසි කරමු බහු තීරු:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

එනම්:

  • අපි ඇණවුම් ලබා ගත් දත්ත ගබඩාව,
  • අපගේ ගංවතුර සැසියේ ID (එය වෙනස් වනු ඇත සෑම කාර්යයක් සඳහාම),
  • මූලාශ්‍ර සහ ඇණවුම් හැඳුනුම්පතෙන් හැෂ් එකක් - අවසාන දත්ත ගබඩාවේ (සියල්ල එක් වගුවකට වත් කරනු ලබන) අපට අද්විතීය ඇණවුම් හැඳුනුම්පතක් ඇත.

අවසාන පියවර ඉතිරිව ඇත: සියල්ල Vertica වෙත වත් කරන්න. තවද, පුදුමයට කරුණක් නම්, මෙය කිරීමට වඩාත්ම දර්ශනීය හා කාර්යක්ෂම ක්‍රමයක් වන්නේ CSV හරහාය!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. අපි විශේෂ ග්රාහකයක් සාදන්නෙමු StringIO.
  2. pandas කරුණාවෙන් අපේ දාන්නම් DataFrame ස්වරූපයෙන් CSV- රේඛා.
  3. අපි අපේ ප්‍රියතම Vertica වෙත කොක්කකින් සම්බන්ධතාවයක් විවෘත කරමු.
  4. සහ දැන් උදව්වෙන් copy() අපගේ දත්ත කෙලින්ම Vertika වෙත යවන්න!

අපි රේඛා කීයක් පුරවා ඇත්දැයි රියදුරුගෙන් ලබාගෙන සැසි කළමනාකරුට සියල්ල හරි බව කියන්නෙමු:

session.loaded_rows = cursor.rowcount
session.successful = True

එච්චරයි.

විකිණීමේදී, අපි ඉලක්ක තහඩුව අතින් නිර්මාණය කරමු. මෙන්න මම මට කුඩා යන්ත්රයකට ඉඩ දුන්නා:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

මම පාවිච්චි කරනවා VerticaOperator() මම දත්ත සමුදා සැලැස්මක් සහ වගුවක් නිර්මාණය කරමි (ඒවා දැනටමත් නොපවතියි නම්, ඇත්ත වශයෙන්ම). ප්රධාන දෙය නම් පරායත්තයන් නිවැරදිව සකස් කිරීමයි:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

සාරාංශගත කිරීම

- හොඳයි, - කුඩා මූසිකය කිව්වා, - ඒක නේද, දැන්
වනාන්තරයේ සිටින භයානකම සත්වයා මම බව ඔබට විශ්වාසද?

ජූලියා ඩොනල්ඩ්සන්, ද ග්‍රෆලෝ

මම හිතන්නේ මගේ සගයන්ට සහ මට තරඟයක් තිබුනේ නම්: මුල සිටම ETL ක්‍රියාවලියක් ඉක්මනින් නිර්මාණය කර දියත් කරන්නේ කවුද: ඔවුන් ඔවුන්ගේ SSIS සහ මූසිකය සමඟ සහ මා වායු ප්‍රවාහය සමඟ ... එවිට අපි නඩත්තු කිරීමේ පහසුව ද සංසන්දනය කරමු ... වාව්, මම හිතන්නේ මම ඔවුන්ව සෑම පැත්තකින්ම පරාජය කරන බවට ඔබ එකඟ වනු ඇත!

ටිකක් බරපතල නම්, Apache Airflow - ක්‍රමලේඛ කේතයේ ආකාරයෙන් ක්‍රියාවලි විස්තර කිරීමෙන් - මගේ කාර්යය ඉටු කළේය බොහෝ වඩාත් සුවපහසු සහ විනෝදජනකයි.

එහි අසීමිත විස්තීරණය, ප්ලග්-ඉන් සහ පරිමාණයට ඇති නැඹුරුව යන දෙඅංශයෙන්ම, ඔබට ඕනෑම ප්‍රදේශයක වාගේ වායු ප්‍රවාහය භාවිතා කිරීමට අවස්ථාව ලබා දෙයි: දත්ත එකතු කිරීමේ, සැකසීමේ සහ සැකසීමේ සම්පූර්ණ චක්‍රය තුළ පවා, රොකට් දියත් කිරීමේදී පවා (අඟහරු වෙත, පාඨමාලාව).

අවසාන කොටස, යොමු සහ තොරතුරු

අපි ඔබ වෙනුවෙන් එකතු කර ඇති පෝරකය

  • start_date. ඔව්, මෙය දැනටමත් දේශීය මතකයක්. ඩග්ගේ ප්‍රධාන තර්කය හරහා start_date සියල්ල සමත්. කෙටියෙන්, ඔබ සඳහන් කරන්නේ නම් start_date වත්මන් දිනය, සහ schedule_interval - එක් දිනක්, පසුව DAG හෙට ආරම්භ වනු ඇත.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    සහ තවත් ගැටළු නොමැත.

    එයට සම්බන්ධ තවත් ධාවන දෝෂයක් ඇත: Task is missing the start_date parameter, එය බොහෝ විට පෙන්නුම් කරන්නේ ඔබට ඩැග් ක්‍රියාකරුට බැඳීමට අමතක වූ බවයි.

  • ඔක්කොම එක මැෂින් එකක. ඔව්, සහ පදනම් (වායු ප්‍රවාහය සහ අපගේ ආලේපනය), සහ වෙබ් සේවාදායකයක්, උපලේඛකයක් සහ කම්කරුවන්. සහ එය පවා වැඩ කළා. නමුත් කාලයත් සමඟම, සේවා සඳහා කාර්යයන් සංඛ්යාව වර්ධනය වූ අතර, PostgreSQL 20 ms වෙනුවට තත්පර 5 කින් දර්ශකයට ප්රතිචාර දැක්වීමට පටන් ගත් විට, අපි එය රැගෙන එය රැගෙන ගියා.
  • LocalExecutor. ඔව්, අපි තවමත් එය මත වාඩි වී සිටින අතර, අපි දැනටමත් අගාධයේ අද්දරට පැමිණ ඇත. LocalExecutor අපට මෙතෙක් ප්‍රමාණවත් වී ඇත, නමුත් දැන් එය අවම වශයෙන් එක් සේවකයෙකු සමඟ පුළුල් කිරීමට කාලය පැමිණ ඇති අතර, CeleryExecutor වෙත යාමට අපට වෙහෙස මහන්සි වී වැඩ කිරීමට සිදුවනු ඇත. ඔබට එය එක් යන්ත්‍රයක වැඩ කළ හැකි බැවින්, සේවාදායකයක පවා සැල්දිරි භාවිතා කිරීමෙන් කිසිවක් ඔබව වළක්වන්නේ නැත, එය “ඇත්ත වශයෙන්ම, කිසි විටෙකත් නිෂ්පාදනයට නොයනු ඇත, අවංකව!”
  • භාවිතා නොකරන බිල්ට් මෙවලම්:
    • සම්බන්ධතා සේවා අක්තපත්‍ර ගබඩා කිරීමට,
    • SLA මිස් නියමිත වේලාවට වැඩ නොකළ කාර්යයන් සඳහා ප්රතිචාර දැක්වීමට,
    • xcom පාරදත්ත හුවමාරුව සඳහා (මම කීවෙමි මෙටාදත්ත!) dag කාර්යයන් අතර.
  • තැපැල් අපයෝජනය. හොඳයි, මම කුමක් කියන්නද? වැටුණු කාර්යයන්වල සියලු පුනරාවර්තන සඳහා ඇඟවීම් පිහිටුවා ඇත. දැන් මගේ වැඩ Gmail හි Airflow වෙතින් ඊමේල් 90k ඇති අතර, වෙබ් තැපැල් මූසිකය වරකට 100කට වඩා ලබා ගැනීම සහ මකා දැමීම ප්‍රතික්ෂේප කරයි.

තවත් අන්තරායන්: Apache Airflow Pitfails

තවත් ස්වයංක්‍රීය මෙවලම්

අපගේ දෑතින් නොව අපගේ හිසෙන් ඊටත් වඩා වැඩ කිරීම සඳහා, Airflow අප වෙනුවෙන් මෙය සූදානම් කර ඇත:

  • REST API - ඔහුට තවමත් පර්යේෂණාත්මක තත්ත්වය ඇත, එය ඔහු වැඩ කිරීමෙන් වළක්වන්නේ නැත. එය සමඟ, ඔබට ඩැග් සහ කර්තව්‍යයන් පිළිබඳ තොරතුරු ලබා ගැනීම පමණක් නොව, ඩැග් එකක් නැවැත්වීමට / ආරම්භ කිරීමට, DAG ධාවනයක් හෝ සංචිතයක් සාදන්න.
  • CLI - WebUI හරහා භාවිතා කිරීමට අපහසු නොවන නමුත් සාමාන්‍යයෙන් නොපවතින බොහෝ මෙවලම් විධාන රේඛාව හරහා ලබා ගත හැකිය. උදාහරණ වශයෙන්:
    • backfill කාර්ය අවස්ථා නැවත ආරම්භ කිරීමට අවශ්‍ය වේ.
      උදාහරණයක් ලෙස, විශ්ලේෂකයින් පැමිණ මෙසේ පැවසීය: “සහෝදර, ඔබට ජනවාරි 1 සිට 13 දක්වා දත්තවල විකාරයක් තිබේ! එය නිවැරදි කරන්න, එය නිවැරදි කරන්න, එය නිවැරදි කරන්න, එය නිවැරදි කරන්න!" ඔබ එවැනි විනෝදාංශයකි:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • මූලික සේවාව: initdb, resetdb, upgradedb, checkdb.
    • run, එය ඔබට එක් අවස්ථා කාර්යයක් ක්‍රියාත්මක කිරීමට සහ සියලු පරායත්ත මත ලකුණු කිරීමට ඉඩ සලසයි. ඊට අමතරව, ඔබට එය හරහා ධාවනය කළ හැකිය LocalExecutor, ඔබට සැල්දිරි පොකුරක් තිබුණත්.
    • බොහෝ දුරට එකම දේ කරයි test, පමණක් ද පදනම් කිසිවක් ලියන්නේ නැත.
    • connections කවචයෙන් සම්බන්ධතා විශාල වශයෙන් නිර්මාණය කිරීමට ඉඩ සලසයි.
  • පයිතන් ඒපීඅයි - ප්ලගීන සඳහා අදහස් කරන තරමක් දැඩි අන්තර්ක්‍රියා ක්‍රමයක් වන අතර එය කුඩා අත්වලින් රංචු ගැසෙන්නේ නැත. ඒත් අපි යන එක වළක්වන්න කවුද /home/airflow/dags, දුවන්න ipython සහ අවුල් කරන්න පටන් ගන්නද? උදාහරණයක් ලෙස, ඔබට පහත කේතය සමඟ සියලු සම්බන්ධතා අපනයනය කළ හැකිය:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • වායු ප්‍රවාහ පාරදත්ත සමුදාය වෙත සම්බන්ධ වෙමින්. මම එයට ලිවීම නිර්දේශ නොකරමි, නමුත් විවිධ නිශ්චිත ප්‍රමිතික සඳහා කාර්ය තත්වයන් ලබා ගැනීම ඕනෑම API භාවිතා කරනවාට වඩා වේගවත් සහ පහසු විය හැක.

    අපගේ සියලු කාර්යයන් දුර්වල නොවන බව කියමු, නමුත් ඒවා සමහර විට වැටිය හැකි අතර මෙය සාමාන්‍ය දෙයකි. නමුත් අවහිර කිරීම් කිහිපයක් දැනටමත් සැක සහිත වන අතර, එය පරීක්ෂා කිරීම අවශ්ය වනු ඇත.

    SQL පරෙස්සම් වන්න!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

යොමු

ඇත්ත වශයෙන්ම, Google නිකුත් කිරීමේ පළමු සබැඳි දහය මගේ පිටු සලකුණු වලින් Airflow ෆෝල්ඩරයේ අන්තර්ගතය වේ.

සහ ලිපියේ භාවිතා කර ඇති සබැඳි:

මූලාශ්රය: www.habr.com

DDoS ආරක්ෂාව, VPS VDS සේවාදායකයන් සහිත අඩවි සඳහා විශ්වාසදායක සත්කාරකත්වය මිලදී ගන්න 🔥 DDoS ආරක්ෂාව, VPS VDS සේවාදායකයන් සහිත විශ්වාසදායක වෙබ් අඩවි සත්කාරකත්වය මිලදී ගන්න | ProHoster