זרימת אוויר של אפאצ'י: מקל על ETL

היי, אני דמיטרי לוגביננקו - מהנדס נתונים של מחלקת האנליטיקה של קבוצת החברות Vezet.

אספר לכם על כלי נפלא לפיתוח תהליכי ETL – Apache Airflow. אבל Airflow הוא כל כך רב-תכליתי ורב-צדדי שכדאי לך להסתכל עליו מקרוב גם אם אינך מעורב בתזרימי נתונים, אך יש לך צורך להפעיל מעת לעת כל תהליכים ולפקח על ביצועם.

וכן, אני לא רק אספר, אלא גם אראה: לתוכנית יש הרבה קוד, צילומי מסך והמלצות.

זרימת אוויר של אפאצ'י: מקל על ETL
מה שאתה רואה בדרך כלל כשאתה מחפש בגוגל את המילה Airflow / Wikimedia Commons

תוכן העניינים

מבוא

Apache Airflow הוא בדיוק כמו ג'נגו:

  • כתוב בפיתון
  • יש פאנל ניהול מעולה,
  • ניתן להרחבה ללא הגבלת זמן

- רק טוב יותר, והוא נעשה למטרות אחרות לגמרי, כלומר (כפי שכתוב לפני הקאטה):

  • ריצה וניטור משימות במספר בלתי מוגבל של מכונות (כפי שרבים סלרי / Kubernetes והמצפון שלך יאפשרו לך)
  • עם יצירת זרימת עבודה דינמית מקוד Python קל מאוד לכתיבה והבנה
  • והיכולת לחבר כל מסדי נתונים וממשקי API אחד עם השני באמצעות רכיבים מוכנים וגם תוספים תוצרת בית (שהיא פשוטה ביותר).

אנו משתמשים ב- Apache Airflow כך:

  • אנו אוספים נתונים ממקורות שונים (מקרים רבים של SQL Server ו-PostgreSQL, ממשקי API שונים עם מדדי יישומים, אפילו 1C) ב-DWH ו-ODS (יש לנו Vertica ו-Clickhouse).
  • כמה מתקדם cron, שמתחיל את תהליכי איחוד הנתונים ב-ODS, וגם עוקב אחר תחזוקתם.

עד לאחרונה, הצרכים שלנו היו מכוסים על ידי שרת אחד קטן עם 32 ליבות ו-50 GB של זיכרון RAM. בזרימת אוויר, זה עובד:

  • יותר 200 יום (למעשה זרימות עבודה, בהן מילאנו משימות),
  • בכל אחד בממוצע 70 משימות,
  • הטוב הזה מתחיל (גם בממוצע) פעם בשעה.

ולגבי איך הרחבנו, אכתוב להלן, אבל עכשיו בוא נגדיר את הבעיה העליונה שנפתור:

ישנם שלושה שרתי SQL מקור, כל אחד עם 50 מסדי נתונים - מופעים של פרויקט אחד, בהתאמה, יש להם אותו מבנה (כמעט בכל מקום, mua-ha-ha), מה שאומר שלכל אחד יש טבלת Orders (למרבה המזל, טבלה עם זה ניתן לדחוף את השם לכל עסק). אנו לוקחים את הנתונים על ידי הוספת שדות שירות (שרת מקור, מסד נתונים מקור, מזהה משימות ETL) וזורקים אותם בתמימות, למשל, ל-Vertica.

בואו נלך!

החלק העיקרי, מעשי (וקצת תיאורטי)

למה אנחנו (ואתה)

כשהעצים היו גדולים ואני הייתי פשוט SQL-schik בקמעונאות רוסית אחת, הונינו תהליכי ETL, כלומר זרימות נתונים באמצעות שני כלים הזמינים לנו:

  • Informatica Power Center - מערכת מתפשטת במיוחד, פרודוקטיבית במיוחד, עם חומרה משלה, גרסאות משלה. השתמשתי חלילה ב-1% מהיכולות שלו. למה? ובכן, קודם כל, הממשק הזה, אי שם משנות ה-380, הפעיל עלינו לחץ נפשי. שנית, המתקן הזה מיועד לתהליכים מפוארים במיוחד, שימוש חוזר ברכיבים זועם ועוד טריקים חשובים מאוד לארגונים. על העובדה שזה עולה, כמו הכנף של ה-Airbus AXNUMX לשנה, לא נגיד כלום.

    היזהר, צילום מסך יכול לפגוע קצת באנשים מתחת לגיל 30

    זרימת אוויר של אפאצ'י: מקל על ETL

  • שרת אינטגרציה של שרת SQL - השתמשנו בחבר הזה בזרימות הפנים-פרויקטיות שלנו. ובכן, למעשה: אנחנו כבר משתמשים ב-SQL Server, וזה יהיה איכשהו לא הגיוני לא להשתמש בכלי ה-ETL שלו. הכל בו טוב: גם הממשק יפה וגם דוחות ההתקדמות... אבל זו לא הסיבה שאנחנו אוהבים מוצרי תוכנה, הו, לא בשביל זה. גרסה את זה dtsx (שזה XML עם צמתים מדשדשים בשמירה) אנחנו יכולים, אבל מה הטעם? מה דעתך להכין חבילת משימות שתגרור מאות טבלאות משרת אחד למשנהו? כן, איזה מאה, האצבע המורה שלך תיפול מעשרים חתיכות, לחיצה על כפתור העכבר. אבל זה בהחלט נראה אופנתי יותר:

    זרימת אוויר של אפאצ'י: מקל על ETL

בהחלט חיפשנו דרכים לצאת. מקרה אפילו כמעט הגיע למחולל חבילות SSIS שנכתב בעצמו ...

...ואז מצאה אותי עבודה חדשה. ו- Apache Airflow עקפה אותי על זה.

כשגיליתי שתיאורי תהליך ETL הם קוד פייתון פשוט, פשוט לא רקדתי משמחה. כך זרמי נתונים עברו גרסאות ושונו, ושפיכת טבלאות בעלות מבנה יחיד ממאות מסדי נתונים למטרה אחת הפכה לעניין של קוד Python במסך אחד וחצי או שניים של 13 אינץ'.

הרכבת האשכול

בואו לא נסדר גן ילדים לגמרי, ולא נדבר כאן על דברים ברורים לחלוטין, כמו התקנת Airflow, מסד הנתונים שבחרת, סלרי ומקרים נוספים המתוארים ברציפים.

כדי שנוכל מיד להתחיל בניסויים, שרטטתי docker-compose.yml שבו:

  • בוא נעלה בעצם זרימת אוויר: מתזמן, שרת אינטרנט. פרח גם יסתובב שם כדי לפקח על משימות סלרי (כי זה כבר נדחף apache/airflow:1.10.10-python3.7, אבל לא אכפת לנו)
  • PostgreSQL, שבו Airflow תכתוב את פרטי השירות שלה (נתוני לוח זמנים, סטטיסטיקות ביצוע וכו'), וסלרי תסמן משימות שהושלמו;
  • Redis, אשר ישמש כמתווך משימות עבור סלרי;
  • עובד סלרי, שתעסוק בביצוע ישיר של משימות.
  • לתיקיה ./dags נוסיף את הקבצים שלנו עם התיאור של dags. הם ייאספו תוך כדי תנועה, כך שאין צורך ללהטט עם כל הערימה לאחר כל עיטוש.

במקומות מסוימים, הקוד בדוגמאות לא מוצג במלואו (כדי לא לבלבל את הטקסט), אבל איפשהו הוא משתנה תוך כדי. ניתן למצוא דוגמאות קוד עבודה מלאות במאגר https://github.com/dm-logv/airflow-tutorial.

docker-compose.ym

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

הערות:

  • בהרכבת הקומפוזיציה הסתמכתי במידה רבה על התמונה הידועה פאקל/דוקר-זרימת אוויר - הקפד לבדוק את זה. אולי אתה לא צריך שום דבר אחר בחיים שלך.
  • כל הגדרות זרימת האוויר זמינות לא רק דרך airflow.cfg, אבל גם באמצעות משתני סביבה (תודה למפתחים), שניצלתי בזדון.
  • באופן טבעי, זה לא מוכן לייצור: בכוונה לא שמתי פעימות לב על מכולות, לא התעסקתי באבטחה. אבל עשיתי את המינימום שמתאים לנסיינים שלנו.
  • שים לב ש:
    • תיקיית dag חייבת להיות נגישה הן למתזמן והן לעובדים.
    • אותו דבר חל על כל הספריות של צד שלישי - כולן חייבות להיות מותקנות על מכונות עם מתזמן ועובדים.

ובכן, עכשיו זה פשוט:

$ docker-compose up --scale worker=3

אחרי שהכל עולה, אתה יכול להסתכל על ממשקי האינטרנט:

מושגים בסיסיים

אם לא הבנת כלום בכל ה"ימים" האלה, אז הנה מילון קצר:

  • מתזמן - הדוד הכי חשוב ב-Airflow, ששולט שרובוטים עובדים קשה, ולא אדם: עוקב אחר לוח הזמנים, מעדכן את היום, משיק משימות.

    באופן כללי, בגרסאות ישנות יותר, היו לו בעיות בזיכרון (לא, לא אמנזיה, אלא דליפות) והפרמטר ה-legacy אפילו נשאר בתצורה run_duration - מרווח ההפעלה מחדש שלו. אבל עכשיו הכל בסדר.

  • DAG (aka "dag") - "גרף א-ציקלי מכוון", אבל הגדרה כזו תספר לאנשים מעטים, אבל למעשה זה מיכל למשימות המקיימות אינטראקציה זו עם זו (ראה להלן) או אנלוגי של Package ב-SSIS ו-Workflow ב-Informatica .

    בנוסף ל-days, אולי עדיין יהיו תת-דגים, אבל סביר להניח שלא נגיע אליהם.

  • DAG Run - דאג אתחול, אשר מוקצה לו execution_date. דגרנים של אותו דאג יכולים לעבוד במקביל (אם הפכתם את המשימות שלכם לאדמוניות, כמובן).
  • מַפעִיל הם פיסות קוד שאחראיות לביצוע פעולה ספציפית. ישנם שלושה סוגים של אופרטורים:
    • פעולהכמו האהוב עלינו PythonOperator, שיכול להפעיל כל קוד (תקף) של Python;
    • להעביר, אשר מעביר נתונים ממקום למקום, נניח, MsSqlToHiveTransfer;
    • חיישן מצד שני, זה יאפשר לך להגיב או להאט את המשך הביצוע של הדאג עד להתרחשות אירוע. HttpSensor יכול למשוך את נקודת הקצה שצוינה, וכאשר התגובה הרצויה ממתינה, התחל את ההעברה GoogleCloudStorageToS3Operator. מוח סקרן ישאל: "למה? אחרי הכל, אתה יכול לעשות חזרות ישירות במפעיל!" ואז, כדי לא לסתום את מאגר המשימות עם מפעילים מושעים. החיישן מתחיל, בודק ומת לפני הניסיון הבא.
  • המשימות - מפעילים מוצהרים, ללא קשר לסוג, ומצורפים לדאג מקודמים לדרגת משימה.
  • מופע משימה - כאשר המתכנן הכללי החליט שהגיע הזמן לשלוח משימות לקרב על עובדי מבצעים (ממש במקום, אם נשתמש LocalExecutor או לצומת מרוחק במקרה של CeleryExecutor), הוא מקצה להם הקשר (כלומר, קבוצה של משתנים - פרמטרי ביצוע), מרחיב תבניות פקודות או שאילתות ומאגד אותן.

אנחנו מייצרים משימות

ראשית, בואו נתאר את הסכימה הכללית של הדאג שלנו, ואז נצלול לפרטים יותר ויותר, כי אנחנו מיישמים כמה פתרונות לא טריוויאליים.

אז בצורתו הפשוטה ביותר, דג כזה ייראה כך:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

בואו נבין את זה:

  • ראשית, אנו מייבאים את ה-libs הדרושים ו משהו אחר;
  • sql_server_ds - האם List[namedtuple[str, str]] עם שמות החיבורים מ-Airflow Connections ומאגרי הנתונים מהם ניקח את הצלחת שלנו;
  • dag - ההכרזה על הדאג שלנו, שבהכרח חייבת להיות ב globals(), אחרת Airflow לא תמצא אותו. דאג גם צריך לומר:
    • מה השם שלו orders - שם זה יופיע אז בממשק האינטרנט,
    • שהוא יעבוד מחצות הלילה בשמיני ביולי,
    • והוא אמור לפעול, בערך כל 6 שעות (עבור בחורים קשוחים כאן במקום timedelta() קָבִיל cron-קַו 0 0 0/6 ? * * *, לפחות מגניבים - ביטוי כמו @daily);
  • workflow() יעשה את העבודה העיקרית, אבל לא עכשיו. לעת עתה, רק נזרוק את ההקשר שלנו ביומן.
  • ועכשיו הקסם הפשוט של יצירת משימות:
    • אנו רצים במקורותינו;
    • לְאַתחֵל PythonOperator, שתוציא להורג את הדמה שלנו workflow(). אל תשכח לציין שם ייחודי (בתוך הדאג) של המשימה ולקשור את הדג עצמו. דֶגֶל provide_context בתורו, ישפוך ארגומנטים נוספים לפונקציה, אותם נאסוף בקפידה באמצעות **context.

לעת עתה, זה הכל. מה קיבלנו:

  • דאג חדש בממשק האינטרנט,
  • מאה וחצי משימות שיבוצעו במקביל (אם הגדרות Airflow, סלרי וקיבולת השרת מאפשרות זאת).

טוב, כמעט הבנתי.

זרימת אוויר של אפאצ'י: מקל על ETL
מי יתקין את התלות?

כדי לפשט את כל העניין הזה, נדפקתי docker-compose.yml עיבוד requirements.txt בכל הצמתים.

עכשיו זה נעלם:

זרימת אוויר של אפאצ'י: מקל על ETL

ריבועים אפורים הם מופעי משימות המעובדים על ידי המתזמן.

אנחנו מחכים קצת, המשימות נחטפות על ידי העובדים:

זרימת אוויר של אפאצ'י: מקל על ETL

הירוקים, כמובן, סיימו את עבודתם בהצלחה. אדומים לא ממש מצליחים.

אגב, אין תיקייה בפרוד שלנו ./dags, אין סנכרון בין מכונות - כל הימים שוכבים git ב- Gitlab שלנו, ו- Gitlab CI מפיץ עדכונים למכונות בעת מיזוג master.

קצת על פרח

בזמן שהעובדים דופקים לנו את המוצצים, בואו ניזכר בכלי אחר שיכול להראות לנו משהו - פרח.

העמוד הראשון עם מידע סיכום על צמתי עובדים:

זרימת אוויר של אפאצ'י: מקל על ETL

הדף האינטנסיבי ביותר עם משימות שיצאו לעבודה:

זרימת אוויר של אפאצ'י: מקל על ETL

העמוד הכי משעמם עם סטטוס הברוקר שלנו:

זרימת אוויר של אפאצ'י: מקל על ETL

הדף הבהיר ביותר הוא עם גרפי סטטוס משימות וזמן הביצוע שלהם:

זרימת אוויר של אפאצ'י: מקל על ETL

אנחנו מעמיסים את החסר

אז, כל המשימות הסתדרו, אתה יכול לסחוב את הפצועים.

זרימת אוויר של אפאצ'י: מקל על ETL

והיו פצועים רבים - מסיבה זו או אחרת. במקרה של שימוש נכון ב-Airflow, הריבועים הללו מצביעים על כך שהנתונים בהחלט לא הגיעו.

עליך לצפות ביומן ולהפעיל מחדש את מקרי המשימות שנפלו.

בלחיצה על ריבוע כלשהו, ​​נראה את הפעולות הזמינות לנו:

זרימת אוויר של אפאצ'י: מקל על ETL

אתה יכול לקחת ולעשות לנקות את הנופלים. כלומר, אנחנו שוכחים שמשהו נכשל שם, ואותה משימת מופע תעבור למתזמן.

זרימת אוויר של אפאצ'י: מקל על ETL

ברור שלעשות את זה עם העכבר עם כל הריבועים האדומים זה לא מאוד הומאני – זה לא מה שאנחנו מצפים מ-Airflow. כמובן, יש לנו נשק להשמדה המונית: Browse/Task Instances

זרימת אוויר של אפאצ'י: מקל על ETL

בוא נבחר הכל בבת אחת ונאפס לאפס, לחץ על הפריט הנכון:

זרימת אוויר של אפאצ'י: מקל על ETL

לאחר הניקוי, המוניות שלנו נראות כך (הן כבר מחכות שהמתזמן יתזמן אותן):

זרימת אוויר של אפאצ'י: מקל על ETL

חיבורים, ווים ומשתנים אחרים

הגיע הזמן להסתכל על ה-DAG הבא, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

כולם ביצעו אי פעם עדכון דיווח? זו שוב היא: יש רשימה של מקורות מהיכן להשיג את הנתונים; יש רשימה איפה לשים; אל תשכח לצפור כשהכל קרה או נשבר (טוב, זה לא קשור אלינו, לא).

בואו נעבור שוב על הקובץ ונסתכל על הדברים הלא ברורים החדשים:

  • from commons.operators import TelegramBotSendMessage - שום דבר לא מונע מאיתנו ליצור מפעילים משלנו, אותם ניצלנו על ידי יצירת מעטפת קטנה לשליחת הודעות ל-Unblocked. (נדבר יותר על המפעיל הזה להלן);
  • default_args={} - Dag יכול להפיץ את אותם טיעונים לכל המפעילים שלו;
  • to='{{ var.value.all_the_kings_men }}' - שדה to לא נעשה קוד קשיח, אלא נוצר באופן דינמי באמצעות Jinja ומשתנה עם רשימת מיילים, שאותם הכנסתי בקפידה Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - תנאי להפעלת המפעיל. במקרה שלנו, המכתב יטוס לבוסים רק אם כל התלות הסתדרה בהצלחה;
  • tg_bot_conn_id='tg_main' - טיעונים conn_id קבל מזהי חיבור שאנו יוצרים בהם Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - הודעות בטלגרם יעופו רק אם יש משימות שנפלו;
  • task_concurrency=1 - אנו אוסרים על השקה בו-זמנית של מספר מקרי משימה של משימה אחת. אחרת, נקבל השקה בו-זמנית של כמה VerticaOperator (מסתכל על שולחן אחד);
  • report_update >> [email, tg] - את כל VerticaOperator מתכנסים בשליחת מכתבים והודעות, כך:
    זרימת אוויר של אפאצ'י: מקל על ETL

    אבל מכיוון שלמפעילי ההודעות יש תנאי השקה שונים, רק אחד יעבוד. בתצוגת העץ, הכל נראה קצת פחות ויזואלי:
    זרימת אוויר של אפאצ'י: מקל על ETL

אני אגיד כמה מילים על פקודות מאקרו והחברים שלהם - משתנים.

פקודות מאקרו הם מצייני מיקום של Jinja שיכולים להחליף מידע שימושי שונים בטיעוני אופרטור. לדוגמה, כך:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} יתרחב לתוכן משתנה ההקשר execution_date בפורמט YYYY-MM-DD: 2020-07-14. החלק הטוב ביותר הוא שמשתני הקשר ממוסמרים למופע משימה ספציפי (ריבוע בתצוגת העץ), וכאשר יופעל מחדש, מצייני המיקום יתרחבו לאותם ערכים.

ניתן לראות את הערכים שהוקצו באמצעות כפתור העיבוד בכל מופע של משימה. כך המשימה עם שליחת מכתב:

זרימת אוויר של אפאצ'י: מקל על ETL

וכך במשימה עם שליחת הודעה:

זרימת אוויר של אפאצ'י: מקל על ETL

רשימה מלאה של פקודות מאקרו מובנות עבור הגרסה האחרונה הזמינה זמינה כאן: הפניה לפקודות מאקרו

יתרה מכך, בעזרת תוספים, אנחנו יכולים להכריז על פקודות מאקרו משלנו, אבל זה כבר סיפור אחר.

בנוסף לדברים המוגדרים מראש, אנחנו יכולים להחליף את הערכים של המשתנים שלנו (כבר השתמשתי בזה בקוד למעלה). בואו ליצור פנימה Admin/Variables כמה דברים:

זרימת אוויר של אפאצ'י: מקל על ETL

כל מה שאתה יכול להשתמש:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

הערך יכול להיות סקלרי, או שהוא יכול להיות גם JSON. במקרה של JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

פשוט השתמש בנתיב למפתח הרצוי: {{ var.json.bot_config.bot.token }}.

אני ממש אגיד מילה אחת ואראה צילום מסך אחד על קשרים. הכל אלמנטרי כאן: בדף Admin/Connections אנו יוצרים חיבור, מוסיפים שם את כניסות / סיסמאות ופרמטרים ספציפיים יותר. ככה:

זרימת אוויר של אפאצ'י: מקל על ETL

ניתן להצפין סיסמאות (ביסודיות יותר מהברירת מחדל), או שאתה יכול להשאיר את סוג החיבור (כפי שעשיתי עבור tg_main) - העובדה היא שרשימת הסוגים מקובעת בדגמי Airflow ולא ניתנת להרחבה מבלי להיכנס לקודי המקור (אם פתאום לא חיפשתי משהו בגוגל, אנא תקן אותי), אבל שום דבר לא ימנע מאיתנו לקבל קרדיטים רק על ידי שֵׁם.

אתה יכול גם ליצור מספר חיבורים עם אותו שם: במקרה זה, השיטה BaseHook.get_connection(), שמביא לנו קשרים לפי השם, ייתן אקראי מכמה שמות (זה יהיה הגיוני יותר לעשות Round Robin, אבל בואו נשאיר את זה על מצפונם של מפתחי Airflow).

משתנים וחיבורים הם בהחלט כלים מגניבים, אבל חשוב לא לאבד את האיזון: אילו חלקים מהזרימות שלכם אתם מאחסנים בקוד עצמו, ואילו חלקים אתם נותנים ל-Airflow לאחסון. מצד אחד, זה יכול להיות נוח לשנות במהירות את הערך, למשל, תיבת דיוור, דרך ממשק המשתמש. מצד שני, זו עדיין חזרה ללחיצת העכבר, ממנה רצינו (אני) להיפטר.

עבודה עם קשרים היא אחת המשימות ווים. באופן כללי, ווי Airflow הם נקודות לחיבורו לשירותים וספריות של צד שלישי. לְמָשָׁל, JiraHook יפתח לנו לקוח לאינטראקציה עם Jira (תוכל להעביר משימות הלוך ושוב), ובעזרת SambaHook אתה יכול לדחוף קובץ מקומי ל smb-נְקוּדָה.

ניתוח האופרטור המותאם אישית

והתקרבנו להסתכל על איך זה עשוי TelegramBotSendMessage

קוד commons/operators.py עם המפעיל בפועל:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

כאן, כמו כל דבר אחר ב-Airflow, הכל מאוד פשוט:

  • בירושה מ BaseOperator, שמיישמת לא מעט דברים ספציפיים לזרימת אוויר (תסתכל על הפנאי שלך)
  • שדות מוצהרים template_fields, שבו Jinja יחפש פקודות מאקרו לעיבוד.
  • סידר את הטיעונים הנכונים עבור __init__(), הגדר את ברירות המחדל במידת הצורך.
  • גם את האתחול של האב הקדמון לא שכחנו.
  • פתח את הקרס המתאים TelegramBotHookקיבל ממנו חפץ לקוח.
  • שיטה עוקפת (מוגדרת מחדש). BaseOperator.execute(), ש-Airfow תעוות כשיגיע הזמן להשיק את המפעיל - בו ניישם את הפעולה העיקרית, שוכחים להתחבר. (אנחנו נכנסים, אגב, ישר stdout и stderr - זרימת האוויר יירטה הכל, תעטוף אותו יפה, תפרק אותו היכן שצריך.)

בוא נראה מה יש לנו commons/hooks.py. החלק הראשון של הקובץ, עם הוו עצמו:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

אני אפילו לא יודע מה להסביר כאן, אני רק אציין את הנקודות החשובות:

  • אנחנו יורשים, חושבים על הטיעונים - ברוב המקרים זה יהיה אחד: conn_id;
  • עוקף שיטות סטנדרטיות: הגבלתי את עצמי get_conn(), שבו אני מקבל את פרמטרי החיבור לפי השם ופשוט מקבל את הקטע extra (זהו שדה JSON), שבו שמתי (לפי ההוראות שלי!) את אסימון הבוט של Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • אני יוצר מופע שלנו TelegramBot, נותן לו אסימון ספציפי.

זה הכל. אתה יכול לקבל לקוח מהוק באמצעות TelegramBotHook().clent או TelegramBotHook().get_conn().

והחלק השני של הקובץ, בו אני מכין מיקרו-עטיפה ל- Telegram REST API, כדי לא לגרור אותו דבר python-telegram-bot לשיטה אחת sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

הדרך הנכונה היא לחבר את הכל: TelegramBotSendMessage, TelegramBotHook, TelegramBot - בתוסף, הכנס למאגר ציבורי ותן אותו לקוד פתוח.

בזמן שלמדנו את כל זה, עדכוני הדוחות שלנו הצליחו להיכשל בהצלחה ושלחו לי הודעת שגיאה בערוץ. אני הולך לבדוק אם זה לא בסדר...

זרימת אוויר של אפאצ'י: מקל על ETL
משהו נשבר בדוג'ה שלנו! זה לא מה שציפינו? בְּדִיוּק!

אתה הולך למזוג?

אתה מרגיש שפספסתי משהו? נראה שהוא הבטיח להעביר נתונים מ-SQL Server ל-Vertica, ואז הוא לקח את זה וזז מהנושא, הנבל!

הזוועה הזו הייתה מכוונת, פשוט הייתי צריך לפענח עבורך מינוח כלשהו. עכשיו אתה יכול ללכת רחוק יותר.

התוכנית שלנו הייתה כזו:

  1. תעשה דאג
  2. יצירת משימות
  3. תראה כמה הכל יפה
  4. הקצה מספרי הפעלה למילוי
  5. קבל נתונים מ-SQL Server
  6. הכנס נתונים לתוך Vertica
  7. אסוף סטטיסטיקות

אז כדי להפעיל את כל זה, הכנתי תוספת קטנה לנו docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

שם אנחנו מעלים:

  • ורטיקה כמארח dwh עם הגדרות ברירת המחדל הרבות ביותר,
  • שלושה מופעים של SQL Server,
  • אנו ממלאים את מסדי הנתונים של האחרונים בכמה נתונים (בשום מקרה אל תבדוק mssql_init.py!)

אנחנו משיקים את כל הטוב בעזרת פקודה קצת יותר מסובכת מהפעם הקודמת:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

מה שה-Randomizer הפלא שלנו יצר, אתה יכול להשתמש בפריט Data Profiling/Ad Hoc Query:

זרימת אוויר של אפאצ'י: מקל על ETL
העיקר לא להראות את זה לאנליסטים

לפרט על מפגשי ETL אני לא, הכל טריוויאלי שם: אנחנו יוצרים בסיס, יש בו שלט, אנחנו עוטפים הכל עם מנהל הקשר, ועכשיו אנחנו עושים את זה:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

הגיע הזמן לאסוף את הנתונים שלנו ממאה וחצי השולחנות שלנו. בואו נעשה זאת בעזרת שורות מאוד לא יומרות:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. בעזרת וו אנחנו מקבלים מ-Airflow pymssql-לְחַבֵּר
  2. בואו נחליף הגבלה בצורת תאריך לבקשה - היא תיזרק לפונקציה על ידי מנוע התבנית.
  3. מזין את הבקשה שלנו pandasמי יקבל אותנו DataFrame - זה יהיה שימושי עבורנו בעתיד.

אני משתמש בהחלפה {dt} במקום פרמטר בקשה %s לא בגלל שאני פינוקיו מרושע, אלא בגלל pandas לא יכול להתמודד pymssql ומחליק את האחרון params: Listלמרות שהוא באמת רוצה tuple.
שימו לב גם שהמפתח pymssql החליט לא לתמוך בו יותר, והגיע הזמן לצאת pyodbc.

בואו נראה במה מילא Airflow את הטיעונים של הפונקציות שלנו:

זרימת אוויר של אפאצ'י: מקל על ETL

אם אין נתונים, אז אין טעם להמשיך. אבל זה גם מוזר לראות במילוי מוצלח. אבל זו לא טעות. אה-אה-אה, מה לעשות?! והנה מה:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException אומר ל-Airflow שאין שגיאות, אבל אנחנו מדלגים על המשימה. בממשק לא יהיה ריבוע ירוק או אדום, אלא ורוד.

בואו נזרוק את הנתונים שלנו עמודות מרובות:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

כלומר

  • מסד הנתונים ממנו לקחנו את ההזמנות,
  • מזהה של הפעלת ההצפה שלנו (זה יהיה שונה לכל משימה),
  • Hash ממזהה המקור ומזהה ההזמנה - כך שבבסיס הנתונים הסופי (שבו הכל נשפך לטבלה אחת) יש לנו מזהה הזמנה ייחודי.

השלב הלפני אחרון נשאר: שפכו הכל לוורטיקה. ובאופן מוזר, אחת הדרכים המרהיבות והיעילות ביותר לעשות זאת היא באמצעות CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. אנחנו מכינים מקלט מיוחד StringIO.
  2. pandas ישים בחביבות שלנו DataFrame בצורה של CSV-שורות.
  3. בואו נפתח חיבור לוורטיקה האהובה עלינו עם וו.
  4. ועכשיו בעזרת העזרה copy() שלח את הנתונים שלנו ישירות ל-Vertika!

ניקח מהנהג כמה שורות מולאו, ונאמר למנהל הפגישה שהכל בסדר:

session.loaded_rows = cursor.rowcount
session.successful = True

זה הכול.

במכירה, אנו יוצרים את לוחית היעד באופן ידני. הנה הרשיתי לעצמי מכונה קטנה:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

אני משתמש VerticaOperator() אני יוצר סכימת מסד נתונים וטבלה (אם הם עדיין לא קיימים, כמובן). העיקר הוא לסדר נכון את התלות:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

סכום

– נו, – אמר העכבר הקטן, – הלא כן, עכשיו
אתה משוכנע שאני החיה הכי נוראית ביער?

ג'וליה דונלדסון, הגרופלו

אני חושב שאם לעמיתיי ולי הייתה תחרות: מי יצור ויזניק במהירות תהליך ETL מאפס: הם עם ה-SSIS שלהם ועכבר ואני עם Airflow... ואז גם היינו משווים את קלות התחזוקה... וואו, אני חושב שתסכימו שאני אנצח אותם בכל החזיתות!

אם קצת יותר ברצינות, אז Apache Airflow - על ידי תיאור תהליכים בצורה של קוד תוכנית - עשה את העבודה שלי הרבה יותר נוח ומהנה.

יכולת ההרחבה הבלתי מוגבלת שלו, הן מבחינת תוספות והן בנטייה להרחבה, נותנת לך את ההזדמנות להשתמש ב- Airflow כמעט בכל אזור: אפילו במחזור המלא של איסוף, הכנה ועיבוד נתונים, אפילו בשיגור רקטות (למאדים, של קוּרס).

חלק גמר, הפניה ומידע

המגרפה אספנו עבורכם

  • start_date. כן, זה כבר מם מקומי. דרך הטיעון העיקרי של דאג start_date כולם עוברים. בקצרה, אם אתה מציין ב start_date תאריך נוכחי, ו schedule_interval - יום אחד, ואז DAG יתחיל מחר לא מוקדם יותר.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ואין יותר בעיות.

    קיימת שגיאת זמן ריצה נוספת הקשורה אליו: Task is missing the start_date parameter, מה שמצביע לרוב על כך ששכחת לקשור לאופרטור דאג.

  • הכל במכונה אחת. כן, ובסיסים (Airflow עצמה והציפוי שלנו), ושרת אינטרנט, ומתזמן, ועובדים. וזה אפילו עבד. אבל עם הזמן, מספר המשימות לשירותים גדל, וכאשר PostgreSQL החלה להגיב לאינדקס תוך 20 שניות במקום 5 אלפיות השנייה, לקחנו אותו וסחבנו אותו.
  • LocalExecutor. כן, אנחנו עדיין יושבים עליו, וכבר הגענו לקצה התהום. LocalExecutor הספיק לנו עד כה, אבל עכשיו הגיע הזמן להתרחב עם עובד אחד לפחות, ונצטרך לעבוד קשה כדי לעבור ל-SeleryExecutor. ולאור העובדה שאתה יכול לעבוד איתו על מכונה אחת, שום דבר לא מונע ממך להשתמש בסלרי אפילו בשרת, ש"כמובן, לעולם לא ייכנס לייצור, בכנות!"
  • לא בשימוש כלים מובנים:
    • חיבורי לאחסון אישורי שירות,
    • SLA פספוסים להגיב למשימות שלא הסתדרו בזמן,
    • xcom לחילופי מטא נתונים (אמרתי מטאנתונים!) בין משימות יום.
  • שימוש לרעה בדואר. ובכן, מה אני יכול להגיד? הוקמו התראות על כל החזרות על משימות שנפלו. כעת ל-Gmail בעבודה שלי יש יותר מ-90 אימיילים מ-Airflow, ולוע דואר האינטרנט מסרב לאסוף ולמחוק יותר מ-100 בכל פעם.

עוד מלכודות: Apache Airflow Pitfails

עוד כלי אוטומציה

כדי שנוכל לעבוד עוד יותר עם הראש ולא עם הידיים, Airflow הכינה לנו את זה:

  • REST API - עדיין יש לו מעמד של נסיוני, מה שלא מונע ממנו לעבוד. בעזרתו, אתה יכול לא רק לקבל מידע על יומיים ומשימות, אלא גם לעצור/להתחיל יום, ליצור DAG Run או בריכה.
  • CLI - כלים רבים זמינים דרך שורת הפקודה שאינם רק לא נוחים לשימוש דרך ה-WebUI, אלא בדרך כלל נעדרים. לדוגמה:
    • backfill נדרש כדי להפעיל מחדש מופעי משימה.
      למשל, באו אנליסטים ואמרו: "ואתה, חבר, יש שטויות בנתונים מה-1 עד ה-13 בינואר! תקן את זה, תקן את זה, תקן את זה, תקן את זה!" ואתה כזה כיריים:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • שירות בסיס: initdb, resetdb, upgradedb, checkdb.
    • run, המאפשר לך להריץ משימת מופע אחת, ואפילו ניקוד על כל התלות. יתר על כן, אתה יכול להפעיל אותו באמצעות LocalExecutor, גם אם יש לך אשכול סלרי.
    • עושה בערך אותו דבר test, רק גם בבסיסים לא כותב כלום.
    • connections מאפשר יצירה המונית של קשרים מהקליפה.
  • ממשק API של פייתון - דרך הארדקור למדי של אינטראקציה, המיועדת לתוספים, ולא רוחשת בה עם ידיים קטנות. אבל מי ימנע מאיתנו ללכת אליו /home/airflow/dags, לרוץ ipython ולהתחיל להתעסק? אתה יכול, למשל, לייצא את כל החיבורים עם הקוד הבא:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • התחברות למטא-בסיס הנתונים של Airflow. אני לא ממליץ לכתוב אליו, אבל השגת מצבי משימה עבור מדדים ספציפיים שונים יכולה להיות הרבה יותר מהירה וקלה משימוש בכל אחד מהממשקי API.

    בוא נגיד שלא כל המשימות שלנו אינן יכולות, אבל הן יכולות לפעמים ליפול, וזה נורמלי. אבל כמה חסימות כבר חשודות, ויהיה צורך לבדוק.

    היזהרו עם SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

תזכור

וכמובן, עשרת הקישורים הראשונים מהנפקת גוגל הם התוכן של תיקיית Airflow מהסימניות שלי.

והקישורים שבהם נעשה שימוש במאמר:

מקור: www.habr.com