Apache Airflow: ทำให้ ETL ง่ายขึ้น

สวัสดี ฉันชื่อ Dmitry Logvinenko - วิศวกรข้อมูลของแผนกวิเคราะห์ของกลุ่มบริษัท Vezet

ฉันจะบอกคุณเกี่ยวกับเครื่องมือที่ยอดเยี่ยมสำหรับการพัฒนากระบวนการ ETL - Apache Airflow แต่ Airflow มีความหลากหลายและมีหลายแง่มุม ซึ่งคุณควรพิจารณาให้ละเอียดยิ่งขึ้น แม้ว่าคุณจะไม่ได้เกี่ยวข้องกับกระแสข้อมูล แต่จำเป็นต้องเปิดใช้กระบวนการใดๆ เป็นระยะๆ และติดตามการดำเนินการของกระบวนการเหล่านั้น

และใช่ ฉันจะไม่เพียงแค่บอก แต่ยังแสดง: โปรแกรมมีโค้ด ภาพหน้าจอ และคำแนะนำมากมาย

Apache Airflow: ทำให้ ETL ง่ายขึ้น
สิ่งที่คุณมักจะเห็นเมื่อคุณ Google คำว่า Airflow / Wikimedia Commons

สารบัญ

การแนะนำ

Apache Airflow ก็เหมือนกับ Django:

  • เขียนด้วยภาษาไพธอน
  • มีแผงผู้ดูแลระบบที่ยอดเยี่ยม
  • ขยายได้ไม่จำกัด

- ดีกว่าเท่านั้นและถูกสร้างขึ้นเพื่อจุดประสงค์ที่แตกต่างไปจากเดิมอย่างสิ้นเชิง กล่าวคือ (ตามที่เขียนไว้ก่อนหน้า kat):

  • เรียกใช้และตรวจสอบงานบนเครื่องไม่ จำกัด จำนวน (เท่าที่ Celery / Kubernetes และมโนธรรมของคุณจะอนุญาต)
  • ด้วยการสร้างเวิร์กโฟลว์แบบไดนามิกจากการเขียนและทำความเข้าใจโค้ด Python ที่ง่ายมาก
  • และความสามารถในการเชื่อมต่อฐานข้อมูลและ APIs เข้าด้วยกันโดยใช้ทั้งส่วนประกอบสำเร็จรูปและปลั๊กอินทำเอง (ซึ่งง่ายมาก)

เราใช้ Apache Airflow ดังนี้:

  • เรารวบรวมข้อมูลจากแหล่งต่างๆ (อินสแตนซ์ SQL Server และ PostgreSQL จำนวนมาก, API ต่างๆ พร้อมเมตริกแอปพลิเคชัน หรือแม้แต่ 1C) ใน DWH และ ODS (เรามี Vertica และ Clickhouse)
  • ก้าวหน้าแค่ไหน cronซึ่งเริ่มกระบวนการรวมข้อมูลบน ODS และตรวจสอบการบำรุงรักษาด้วย

จนกระทั่งเมื่อไม่นานมานี้ ความต้องการของเราถูกครอบคลุมโดยเซิร์ฟเวอร์ขนาดเล็กหนึ่งเครื่องที่มี 32 คอร์และ RAM ขนาด 50 GB ใน Airflow สิ่งนี้ใช้ได้:

  • ขึ้น 200 เหรียญ (เวิร์กโฟลว์จริง ๆ ที่เรายัดงานเข้าไป)
  • ในแต่ละอันโดยเฉลี่ย 70 งาน,
  • ความดีนี้เริ่มต้น (โดยเฉลี่ยด้วย) หนึ่งครั้งต่อชั่วโมง.

และเกี่ยวกับวิธีการขยาย ฉันจะเขียนด้านล่าง แต่ตอนนี้เรามากำหนดปัญหา über ที่เราจะแก้ไข:

มีเซิร์ฟเวอร์ SQL ดั้งเดิมสามเครื่อง แต่ละฐานข้อมูลมี 50 ฐานข้อมูล - อินสแตนซ์ของหนึ่งโครงการ ตามลำดับ มีโครงสร้างเหมือนกัน (เกือบทุกที่ มึน-ฮ่า-ฮ่า) ซึ่งหมายความว่าแต่ละรายการมีตารางคำสั่งซื้อ (โชคดีที่ตารางนั้น ชื่อสามารถผลักดันในธุรกิจใด ๆ ) เรารับข้อมูลโดยการเพิ่มฟิลด์บริการ (เซิร์ฟเวอร์ต้นทาง, ฐานข้อมูลต้นทาง, ID งาน ETL) และใส่ลงใน Vertica อย่างไร้เดียงสา

Here we go!

ส่วนหลัก ภาคปฏิบัติ (และภาคทฤษฎีเล็กน้อย)

ทำไมเรา (และคุณ)

เมื่อต้นไม้ใหญ่และฉันเป็นคนธรรมดา SQL-schik ในร้านค้าปลีกแห่งหนึ่งของรัสเซีย เราหลอกลวง ETL ประมวลผลกระแสข้อมูลโดยใช้เครื่องมือสองอย่างที่เรามีให้:

  • อินฟอร์มาติกา พาวเวอร์ เซ็นเตอร์ - ระบบที่แพร่กระจายอย่างมาก มีประสิทธิผลสูง ด้วยฮาร์ดแวร์ของตัวเอง การกำหนดเวอร์ชันของตัวเอง ฉันใช้ God forbid 1% ของความสามารถของมัน ทำไม ก่อนอื่นอินเทอร์เฟซนี้ที่ไหนสักแห่งในยุค 380 ทำให้เรากดดันทางจิตใจ ประการที่สอง อุปกรณ์นี้ได้รับการออกแบบมาสำหรับกระบวนการที่แฟนซีมาก การนำส่วนประกอบกลับมาใช้ซ้ำ และกลอุบายที่สำคัญมากสำหรับองค์กรอื่นๆ เกี่ยวกับความจริงที่ว่ามันมีค่าใช้จ่ายเช่นเดียวกับปีกของ Airbus AXNUMX / ปีเราจะไม่พูดอะไรเลย

    ระวัง ภาพหน้าจออาจทำร้ายผู้ที่อายุต่ำกว่า 30 ได้เล็กน้อย

    Apache Airflow: ทำให้ ETL ง่ายขึ้น

  • เซิร์ฟเวอร์การรวมเซิร์ฟเวอร์ SQL - เราใช้เพื่อนคนนี้ในโฟลว์ภายในโครงการของเรา อันที่จริง: เราใช้ SQL Server อยู่แล้ว และมันก็ไม่มีเหตุผลที่จะไม่ใช้เครื่องมือ ETL ของมัน ทุกอย่างในนั้นดี: ทั้งอินเทอร์เฟซสวยงามและรายงานความคืบหน้า ... แต่นี่ไม่ใช่เหตุผลที่เรารักผลิตภัณฑ์ซอฟต์แวร์ ไม่ใช่สำหรับสิ่งนี้ รุ่นมัน dtsx (ซึ่งเป็น XML ที่มีการสับโหนดเมื่อบันทึก) เราทำได้ แต่ประเด็นคืออะไร ลองสร้างแพ็คเกจงานที่จะลากตารางหลายร้อยตารางจากเซิร์ฟเวอร์หนึ่งไปยังอีกเซิร์ฟเวอร์หนึ่ง ใช่ร้อยนิ้วชี้ของคุณจะหลุดออกจากยี่สิบชิ้นโดยคลิกที่ปุ่มเมาส์ แต่ดูทันสมัยกว่าแน่นอน:

    Apache Airflow: ทำให้ ETL ง่ายขึ้น

แน่นอนเรามองหาทางออก กรณีด้วยซ้ำ เกือบจะ มาถึงตัวสร้างแพ็คเกจ SSIS ที่เขียนขึ้นเอง ...

…และแล้วงานใหม่ก็พบฉัน และ Apache Airflow ก็แซงหน้าฉันไป

เมื่อฉันพบว่าคำอธิบายกระบวนการ ETL เป็นโค้ด Python ง่ายๆ ฉันไม่ได้เต้นด้วยความดีใจ นี่คือวิธีที่สตรีมข้อมูลถูกสร้างเวอร์ชันและแตกต่างกัน และการเทตารางที่มีโครงสร้างเดียวจากฐานข้อมูลหลายร้อยฐานข้อมูลไปยังเป้าหมายเดียวกลายเป็นเรื่องของโค้ด Python ในหนึ่งหน้าจอครึ่งหรือสองหน้าจอขนาด 13 นิ้ว

การประกอบคลัสเตอร์

อย่าจัดโรงเรียนอนุบาลอย่างสมบูรณ์ และอย่าพูดถึงสิ่งที่ชัดเจนโดยสิ้นเชิงที่นี่ เช่น การติดตั้ง Airflow ฐานข้อมูลที่คุณเลือก ขึ้นฉ่าย และกรณีอื่นๆ ที่อธิบายไว้ในท่าเทียบเรือ

เพื่อให้เราสามารถเริ่มการทดลองได้ทันที ฉันร่างภาพ docker-compose.yml ซึ่งใน:

  • มาเลี้ยงกันจริงๆ Airflow: ตัวจัดกำหนดการ, เว็บเซิร์ฟเวอร์ ดอกไม้จะหมุนไปที่นั่นเพื่อเฝ้าดูงานขึ้นฉ่ายด้วย (เพราะถูกผลักเข้าไปแล้ว apache/airflow:1.10.10-python3.7แต่เราไม่ถือสา)
  • PostgreSQLซึ่ง Airflow จะเขียนข้อมูลบริการของตน (ข้อมูลกำหนดการ สถิติการดำเนินการ ฯลฯ) และ Celery จะทำเครื่องหมายงานที่เสร็จสมบูรณ์
  • Redisซึ่งจะทำหน้าที่เป็นนายหน้าซื้อขายให้กับขึ้นฉ่าย
  • คนขึ้นฉ่ายซึ่งจะมีส่วนร่วมในการปฏิบัติงานโดยตรง
  • ไปยังโฟลเดอร์ ./dags เราจะเพิ่มไฟล์ของเราพร้อมคำอธิบายของ dags พวกเขาจะถูกหยิบขึ้นมาทันที ดังนั้นจึงไม่จำเป็นต้องเล่นกลทั้งกองหลังจากจามแต่ละครั้ง

ในบางแห่ง โค้ดในตัวอย่างไม่ได้แสดงอย่างสมบูรณ์ (เพื่อไม่ให้ข้อความรกรุงรัง) แต่มีการแก้ไขบางส่วนในกระบวนการ ตัวอย่างรหัสการทำงานที่สมบูรณ์สามารถพบได้ในที่เก็บ https://github.com/dm-logv/airflow-tutorial.

นักเทียบท่า-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

หมายเหตุ:

  • ในการประกอบองค์ประกอบภาพ ผมอาศัยภาพที่เป็นที่รู้จักเป็นส่วนใหญ่ pukel/docker-airflow - ให้แน่ใจว่าได้ตรวจสอบออก บางทีคุณอาจไม่ต้องการอะไรอีกแล้วในชีวิต
  • การตั้งค่า Airflow ทั้งหมดไม่ได้มีเฉพาะผ่านเท่านั้น airflow.cfgแต่ยังผ่านตัวแปรสภาพแวดล้อม (ขอบคุณผู้พัฒนา) ซึ่งฉันใช้ประโยชน์จากมันโดยเจตนาร้าย
  • โดยธรรมชาติแล้ว มันไม่พร้อมสำหรับการผลิต: ฉันจงใจไม่ใส่ฮาร์ทบีทลงบนคอนเทนเนอร์ ฉันไม่ได้กังวลเรื่องความปลอดภัย แต่ฉันทำสิ่งที่เหมาะสมที่สุดสำหรับผู้ทดลองของเรา
  • โปรดทราบว่า:
    • โฟลเดอร์ dag ต้องสามารถเข้าถึงได้ทั้งผู้วางกำหนดการและผู้ปฏิบัติงาน
    • เช่นเดียวกับไลบรารีของบุคคลที่สามทั้งหมด - ทั้งหมดจะต้องติดตั้งบนเครื่องที่มีตัวกำหนดตารางเวลาและผู้ปฏิบัติงาน

ตอนนี้มันง่าย:

$ docker-compose up --scale worker=3

หลังจากเพิ่มทุกอย่างแล้ว คุณสามารถดูอินเทอร์เฟซเว็บได้:

แนวคิดพื้นฐาน

หากคุณไม่เข้าใจอะไรใน "dags" ทั้งหมด นี่คือพจนานุกรมสั้นๆ:

  • ตารางเวลา - ลุงที่สำคัญที่สุดใน Airflow ควบคุมว่าหุ่นยนต์ทำงานหนัก ไม่ใช่มนุษย์: ตรวจสอบตารางเวลา อัปเดต dags เรียกใช้งาน

    โดยทั่วไปในเวอร์ชันเก่าเขามีปัญหากับหน่วยความจำ (ไม่ ไม่ใช่ความจำเสื่อม แต่รั่วไหล) และพารามิเตอร์เดิมยังคงอยู่ในการกำหนดค่า run_duration - ช่วงเวลารีสตาร์ท แต่ตอนนี้ทุกอย่างเรียบร้อยดี

  • DAG (aka "dag") - "กราฟวงกลมกำกับ" แต่คำจำกัดความดังกล่าวจะบอกคนไม่กี่คน แต่ในความเป็นจริงมันเป็นคอนเทนเนอร์สำหรับงานที่มีปฏิสัมพันธ์ซึ่งกันและกัน (ดูด้านล่าง) หรืออะนาล็อกของ Package ใน SSIS และ Workflow ใน Informatica .

    นอกจาก dags แล้วอาจมี subdags แต่เรามักจะไม่ไปที่นั่น

  • วิ่งแด๊ก - เริ่มต้น dag ซึ่งได้รับมอบหมายให้เป็นของตัวเอง execution_date. Dagrans ของ dag เดียวกันสามารถทำงานควบคู่กันไปได้ (ถ้าคุณทำให้งานของคุณหมดอำนาจลงแน่นอน)
  • ผู้ประกอบการ เป็นส่วนของรหัสที่รับผิดชอบในการดำเนินการบางอย่าง ตัวดำเนินการมีสามประเภท:
    • การกระทำอย่างที่เราชื่นชอบ PythonOperatorซึ่งสามารถเรียกใช้รหัส Python ใด ๆ (ที่ถูกต้อง);
    • โอนซึ่งขนส่งข้อมูลจากที่หนึ่งไปยังอีกที่หนึ่ง กล่าวคือ MsSqlToHiveTransfer;
    • เซ็นเซอร์ ในทางกลับกัน มันจะช่วยให้คุณตอบสนองหรือชะลอการดำเนินการต่อไปของ dag จนกว่าจะมีเหตุการณ์เกิดขึ้น HttpSensor สามารถดึงจุดสิ้นสุดที่ระบุ และเมื่อการตอบสนองที่ต้องการรออยู่ ให้เริ่มการถ่ายโอน GoogleCloudStorageToS3Operator. จิตใจที่อยากรู้อยากเห็นจะถามว่า “ทำไม? ท้ายที่สุดคุณสามารถทำซ้ำได้ในตัวดำเนินการ!” จากนั้นเพื่อไม่ให้กลุ่มของงานอุดตันด้วยตัวดำเนินการที่ถูกระงับ เซ็นเซอร์เริ่มทำงาน ตรวจสอบ และหยุดทำงานก่อนความพยายามครั้งต่อไป
  • งาน - ตัวดำเนินการที่ประกาศโดยไม่คำนึงถึงประเภทและแนบกับ dag จะได้รับการเลื่อนตำแหน่งเป็นระดับของงาน
  • ตัวอย่างงาน - เมื่อผู้วางแผนทั่วไปตัดสินใจว่าถึงเวลาส่งงานเข้าสู้รบกับนักแสดง-คนงาน (ตรงจุด ถ้าเราใช้ LocalExecutor หรือไปยังโหนดระยะไกลในกรณีของ CeleryExecutor) จะกำหนดบริบทให้กับพวกเขา (เช่น ชุดของตัวแปร - พารามิเตอร์การดำเนินการ) ขยายคำสั่งหรือเทมเพลตคิวรี และรวมเข้าด้วยกัน

เราสร้างงาน

ขั้นแรก เรามาร่างโครงร่างทั่วไปของ doug ของเรา จากนั้นเราจะลงลึกในรายละเอียดมากขึ้นเรื่อย ๆ เนื่องจากเราใช้วิธีแก้ปัญหาที่ไม่สำคัญบางอย่าง

ดังนั้นในรูปแบบที่ง่ายที่สุด dag ดังกล่าวจะมีลักษณะดังนี้:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

ลองคิดออก:

  • ก่อนอื่น เรานำเข้า libs ที่จำเป็นและ อื่น ๆ อีก;
  • sql_server_ds - เป็น List[namedtuple[str, str]] ด้วยชื่อของการเชื่อมต่อจาก Airflow Connections และฐานข้อมูลที่เราจะใช้จานของเรา
  • dag - ประกาศ dag ของเราซึ่งจำเป็นต้องอยู่ใน globals()มิฉะนั้น Airflow จะไม่พบ ดั๊กยังต้องพูดว่า:
    • เขาชื่ออะไร orders - ชื่อนี้จะปรากฏในเว็บอินเตอร์เฟส
    • ว่าเขาจะทำงานตั้งแต่เที่ยงคืนของวันที่ XNUMX กรกฎาคม
    • และควรทำงานทุกๆ 6 ชั่วโมงโดยประมาณ (สำหรับคนแกร่งที่นี่แทนที่จะเป็น timedelta() ยอมรับได้ cron-เส้น 0 0 0/6 ? * * *เพื่อความเท่ห์น้อยลง - การแสดงออกเช่น @daily);
  • workflow() จะทำหน้าที่หลักแต่ยังไม่ใช่ตอนนี้ สำหรับตอนนี้ เราจะทิ้งบริบทของเราลงในบันทึก
  • และตอนนี้ความมหัศจรรย์ง่ายๆ ของการสร้างงาน:
    • เราวิ่งผ่านแหล่งที่มาของเรา
    • เริ่มต้น PythonOperatorซึ่งจะประหารชีวิตหุ่นของเรา workflow(). อย่าลืมระบุชื่อเฉพาะ (ภายใน dag) ของงานและผูก dag เอง ธง provide_context ในทางกลับกัน จะใส่อาร์กิวเมนต์เพิ่มเติมลงในฟังก์ชัน ซึ่งเราจะรวบรวมอย่างระมัดระวังโดยใช้ **context.

สำหรับตอนนี้ นั่นคือทั้งหมด สิ่งที่เราได้รับ:

  • dag ใหม่ในเว็บอินเตอร์เฟส
  • หนึ่งร้อยครึ่งงานที่จะดำเนินการพร้อมกัน (หากการตั้งค่า Airflow, Celery และความจุของเซิร์ฟเวอร์อนุญาต)

เกือบได้แล้ว

Apache Airflow: ทำให้ ETL ง่ายขึ้น
ใครจะเป็นผู้ติดตั้งการพึ่งพา

เพื่อทำให้เรื่องทั้งหมดนี้ง่ายขึ้น ฉันเลยพลาดพลั้ง docker-compose.yml กำลังประมวลผล requirements.txt บนโหนดทั้งหมด

ตอนนี้มันหายไปแล้ว:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

สี่เหลี่ยมสีเทาคืออินสแตนซ์งานที่ประมวลผลโดยตัวกำหนดตารางเวลา

เรารอสักครู่คนงานจะสรุปงาน:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

แน่นอนว่าพวกสีเขียวได้ทำงานสำเร็จแล้ว หงส์แดงไม่ประสบความสำเร็จมากนัก

อย่างไรก็ตาม ไม่มีโฟลเดอร์ในผลิตภัณฑ์ของเรา ./dagsไม่มีการซิงโครไนซ์ระหว่างเครื่อง - dags ทั้งหมดอยู่ใน git บน Gitlab ของเรา และ Gitlab CI จะแจกจ่ายการอัปเดตไปยังเครื่องต่างๆ เมื่อรวมเข้าด้วยกัน master.

เล็กน้อยเกี่ยวกับดอกไม้

ในขณะที่คนงานกำลังฟาดจุกนมหลอกของเรา เรามานึกถึงเครื่องมืออีกอย่างที่สามารถแสดงให้เราเห็นบางอย่างได้ นั่นคือดอกไม้

หน้าแรกที่มีข้อมูลสรุปเกี่ยวกับโหนดผู้ปฏิบัติงาน:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

หน้าที่เข้มข้นที่สุดกับงานที่ไปทำงาน:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

หน้าที่น่าเบื่อที่สุดพร้อมสถานะของนายหน้าของเรา:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

หน้าที่สว่างที่สุดคือกราฟสถานะงานและเวลาดำเนินการ:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

เราโหลดอันเดอร์โหลด

ดังนั้นงานทั้งหมดได้ผลคุณสามารถนำผู้บาดเจ็บออกไปได้

Apache Airflow: ทำให้ ETL ง่ายขึ้น

และมีผู้บาดเจ็บจำนวนมาก - ไม่ว่าจะด้วยเหตุผลใดก็ตาม ในกรณีของการใช้ Airflow อย่างถูกต้อง สี่เหลี่ยมเหล่านี้บ่งชี้ว่าข้อมูลมาไม่ถึงอย่างแน่นอน

คุณต้องดูบันทึกและรีสตาร์ทอินสแตนซ์งานที่ล้มเหลว

เมื่อคลิกที่ช่องสี่เหลี่ยมใด ๆ เราจะเห็นการกระทำที่เราทำได้:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

คุณสามารถรับและทำให้ Clear the fall ได้ นั่นคือเราลืมว่ามีบางอย่างล้มเหลวที่นั่น และงานอินสแตนซ์เดียวกันจะไปที่ตัวกำหนดตารางเวลา

Apache Airflow: ทำให้ ETL ง่ายขึ้น

เป็นที่ชัดเจนว่าการใช้เมาส์ที่มีสี่เหลี่ยมสีแดงทั้งหมดนั้นไม่มีมนุษยธรรมมากนัก - นี่ไม่ใช่สิ่งที่เราคาดหวังจาก Airflow โดยธรรมชาติแล้ว เรามีอาวุธทำลายล้างสูง: Browse/Task Instances

Apache Airflow: ทำให้ ETL ง่ายขึ้น

เลือกทุกอย่างพร้อมกันและรีเซ็ตเป็นศูนย์ คลิกรายการที่ถูกต้อง:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

หลังจากทำความสะอาดแล้ว แท็กซี่ของเราจะมีลักษณะดังนี้ (พวกเขากำลังรอผู้จัดกำหนดการเพื่อกำหนดเวลา):

Apache Airflow: ทำให้ ETL ง่ายขึ้น

การเชื่อมต่อ ขอเกี่ยว และตัวแปรอื่นๆ

ได้เวลาดู DAG ถัดไปแล้ว update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ทุกคนเคยทำการปรับปรุงรายงานหรือไม่? นี่คือเธออีกครั้ง: มีรายการแหล่งข้อมูลจากที่ที่จะได้รับข้อมูล มีรายการที่จะใส่; อย่าลืมบีบแตรเมื่อทุกอย่างเกิดขึ้นหรือพัง (ไม่เกี่ยวกับเรา ไม่)

ลองดูไฟล์อีกครั้งและดูสิ่งที่คลุมเครือใหม่:

  • from commons.operators import TelegramBotSendMessage - ไม่มีอะไรขัดขวางไม่ให้เราสร้างโอเปอเรเตอร์ของเราเอง ซึ่งเราใช้ประโยชน์จากการสร้าง wrapper ขนาดเล็กเพื่อส่งข้อความไปยัง Unblocked (เราจะพูดถึงโอเปอเรเตอร์นี้เพิ่มเติมด้านล่าง);
  • default_args={} - dag สามารถแจกจ่ายอาร์กิวเมนต์เดียวกันกับตัวดำเนินการทั้งหมด
  • to='{{ var.value.all_the_kings_men }}' - สนาม to เราจะไม่มีฮาร์ดโค้ด แต่สร้างแบบไดนามิกโดยใช้ Jinja และตัวแปรที่มีรายชื่ออีเมล ซึ่งฉันใส่อย่างระมัดระวัง Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — เงื่อนไขในการเริ่มต้นตัวดำเนินการ ในกรณีของเรา จดหมายจะส่งถึงผู้บังคับบัญชาก็ต่อเมื่อการอ้างอิงทั้งหมดได้ผล ประสบความสำเร็จ;
  • tg_bot_conn_id='tg_main' - ข้อโต้แย้ง conn_id ยอมรับ ID การเชื่อมต่อที่เราสร้างขึ้น Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ข้อความในโทรเลขจะบินออกไปเฉพาะเมื่อมีงานที่ตกลงไป
  • task_concurrency=1 - เราห้ามไม่ให้เปิดใช้งานอินสแตนซ์งานหลายรายการพร้อมกันของงานเดียว มิฉะนั้นเราจะได้รับการเปิดตัวหลายรายการพร้อมกัน VerticaOperator (ดูที่โต๊ะเดียว);
  • report_update >> [email, tg] - ทั้งหมด VerticaOperator มาบรรจบกันในการส่งจดหมายและข้อความดังนี้
    Apache Airflow: ทำให้ ETL ง่ายขึ้น

    แต่เนื่องจากตัวดำเนินการแจ้งเตือนมีเงื่อนไขการเปิดใช้งานที่แตกต่างกัน จึงมีเพียงเงื่อนไขเดียวเท่านั้นที่จะใช้งานได้ ใน Tree View ทุกสิ่งจะดูน้อยลงเล็กน้อย:
    Apache Airflow: ทำให้ ETL ง่ายขึ้น

ฉันจะพูดสองสามคำเกี่ยวกับ มาโคร และเพื่อนของพวกเขา - ตัวแปร.

มาโครคือตัวยึดตำแหน่ง Jinja ที่สามารถแทนที่ข้อมูลที่เป็นประโยชน์ต่างๆ ลงในอาร์กิวเมนต์ตัวดำเนินการ ตัวอย่างเช่น:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} จะขยายไปยังเนื้อหาของตัวแปรบริบท execution_date ในรูปแบบ YYYY-MM-DD: 2020-07-14. ส่วนที่ดีที่สุดคือตัวแปรบริบทถูกตอกเข้ากับอินสแตนซ์ของงานเฉพาะ (สี่เหลี่ยมจัตุรัสในมุมมองแบบต้นไม้) และเมื่อรีสตาร์ท ตัวยึดตำแหน่งจะขยายเป็นค่าเดียวกัน

สามารถดูค่าที่กำหนดได้โดยใช้ปุ่ม Rendered ในแต่ละอินสแตนซ์ของงาน นี่คือวิธีการส่งจดหมาย:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

ดังนั้นในงานด้วยการส่งข้อความ:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

รายชื่อมาโครที่มีอยู่แล้วภายในทั้งหมดสำหรับเวอร์ชันล่าสุดมีอยู่ที่นี่: การอ้างอิงมาโคร

นอกจากนี้ ด้วยความช่วยเหลือของปลั๊กอิน เราสามารถประกาศมาโครของเราเองได้ แต่นั่นก็เป็นอีกเรื่องหนึ่ง

นอกเหนือจากสิ่งที่กำหนดไว้ล่วงหน้าแล้ว เราสามารถแทนค่าของตัวแปรของเราได้ (ฉันใช้สิ่งนี้ไปแล้วในโค้ดด้านบน) มาสร้างกันเถอะ Admin/Variables สองสิ่ง:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

ทุกสิ่งที่คุณสามารถใช้ได้:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

ค่านี้สามารถเป็นสเกลาร์ หรืออาจเป็น JSON ก็ได้ ในกรณีของ JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

เพียงใช้เส้นทางไปยังคีย์ที่ต้องการ: {{ var.json.bot_config.bot.token }}.

ฉันจะพูดคำเดียวและแสดงภาพหน้าจอหนึ่งภาพเกี่ยวกับ สัมพันธ์. ทุกอย่างเป็นพื้นฐานที่นี่: บนหน้า Admin/Connections เราสร้างการเชื่อมต่อเพิ่มข้อมูลเข้าสู่ระบบ / รหัสผ่านและพารามิเตอร์เฉพาะเพิ่มเติมที่นั่น แบบนี้:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

รหัสผ่านสามารถเข้ารหัสได้ (ละเอียดกว่าค่าเริ่มต้น) หรือคุณสามารถละเว้นประเภทการเชื่อมต่อ (อย่างที่ฉันทำ tg_main) - ความจริงก็คือรายการประเภทนั้นเดินสายในโมเดล Airflow และไม่สามารถขยายได้หากไม่มีซอร์สโค้ด (ถ้าจู่ๆ ฉันไม่ได้ google อะไรสักอย่าง โปรดแก้ไขฉันด้วย) แต่ไม่มีอะไรจะหยุดเราไม่ให้ได้รับเครดิตเพียงแค่ ชื่อ.

คุณยังสามารถทำการเชื่อมต่อหลายรายการด้วยชื่อเดียวกัน: ในกรณีนี้คือเมธอด BaseHook.get_connection()ซึ่งทำให้เรามีการเชื่อมต่อตามชื่อ จะให้ สุ่ม จากคนชื่อซ้ำหลายคน (การสร้าง Round Robin จะมีเหตุผลมากกว่า แต่ปล่อยให้เป็นมโนธรรมของนักพัฒนา Airflow)

ตัวแปรและการเชื่อมต่อเป็นเครื่องมือที่ยอดเยี่ยม แต่สิ่งสำคัญคือต้องไม่เสียสมดุล: ส่วนใดของโฟลว์ที่คุณเก็บไว้ในโค้ด และส่วนใดที่คุณมอบให้กับ Airflow เพื่อจัดเก็บ ในแง่หนึ่ง การเปลี่ยนค่าอย่างรวดเร็ว เช่น กล่องจดหมาย ผ่านทาง UI สามารถทำได้สะดวก ในทางกลับกัน นี่ยังคงเป็นการกลับไปสู่การคลิกเมาส์ ซึ่งเรา (ฉัน) ต้องการจะกำจัด

การทำงานกับการเชื่อมต่อเป็นหนึ่งในงาน ตะขอ. โดยทั่วไปแล้ว Airflow hooks เป็นจุดสำหรับเชื่อมต่อกับบริการและไลบรารีของบุคคลที่สาม เช่น, JiraHook จะเปิดไคลเอนต์ให้เราโต้ตอบกับจิระ (คุณสามารถย้ายงานไปมาได้) และด้วยความช่วยเหลือของ SambaHook คุณสามารถพุชไฟล์ในเครื่องไปที่ smb-จุด.

แยกตัวดำเนินการที่กำหนดเอง

และเราเข้าใกล้วิธีการทำ TelegramBotSendMessage

รหัส commons/operators.py กับตัวดำเนินการจริง:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

เช่นเดียวกับสิ่งอื่นๆ ใน Airflow ทุกอย่างเรียบง่ายมาก:

  • สืบทอดมาจาก BaseOperatorซึ่งใช้สิ่งเฉพาะของ Airflow ค่อนข้างน้อย (ดูที่เวลาว่างของคุณ)
  • ฟิลด์ที่ประกาศ template_fieldsซึ่ง Jinja จะมองหามาโครเพื่อประมวลผล
  • จัดอาร์กิวเมนต์ที่เหมาะสมสำหรับ __init__()ให้ตั้งค่าเริ่มต้นตามความจำเป็น
  • เราไม่ลืมเกี่ยวกับการเริ่มต้นของบรรพบุรุษเช่นกัน
  • เปิดเบ็ดที่เกี่ยวข้อง TelegramBotHookได้รับวัตถุไคลเอนต์จากมัน
  • วิธีการแทนที่ (นิยามใหม่) BaseOperator.execute()ซึ่ง Airfow จะกระตุกเมื่อถึงเวลาเปิดตัวโอเปอเรเตอร์ - ในนั้นเราจะดำเนินการหลักโดยลืมเข้าสู่ระบบ (เราเข้าสู่ระบบโดยวิธีการเข้า stdout и stderr - กระแสลมจะดักจับทุกอย่าง ห่อให้สวยงาม ย่อยสลายตามความจำเป็น)

มาดูกันว่ามีอะไรบ้าง commons/hooks.py. ส่วนแรกของไฟล์พร้อมตัวเบ็ด:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ฉันไม่รู้จะอธิบายอะไรที่นี่ ฉันจะสังเกตประเด็นสำคัญ:

  • เรารับมรดก คิดเกี่ยวกับข้อโต้แย้ง - ในกรณีส่วนใหญ่จะเป็นหนึ่งเดียว: conn_id;
  • เอาชนะวิธีการมาตรฐาน: ฉันจำกัดตัวเอง get_conn()ซึ่งฉันได้รับพารามิเตอร์การเชื่อมต่อตามชื่อและเพิ่งได้รับส่วน extra (นี่คือฟิลด์ JSON) ซึ่งฉัน (ตามคำแนะนำของฉันเอง!) ใส่โทเค็นบอท Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • ฉันสร้างตัวอย่างของเรา TelegramBotโดยให้โทเค็นเฉพาะ

นั่นคือทั้งหมด คุณสามารถรับลูกค้าจากตะขอโดยใช้ TelegramBotHook().clent หรือ TelegramBotHook().get_conn().

และส่วนที่สองของไฟล์ที่ฉันสร้าง microwrapper สำหรับ Telegram REST API เพื่อไม่ให้ลากสิ่งเดียวกัน python-telegram-bot สำหรับวิธีหนึ่ง sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

วิธีที่ถูกต้องคือเพิ่มทั้งหมด: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ในปลั๊กอิน ให้ใส่ที่เก็บข้อมูลสาธารณะและมอบให้กับ Open Source

ในขณะที่เรากำลังศึกษาทั้งหมดนี้ การอัปเดตรายงานของเราล้มเหลวและส่งข้อความแสดงข้อผิดพลาดในช่อง เดี๋ยวจะลองเช็คดูครับว่าผิดไหม...

Apache Airflow: ทำให้ ETL ง่ายขึ้น
มีบางอย่างพังใน Doge ของเรา! นั่นไม่ใช่สิ่งที่เราคาดหวังใช่ไหม อย่างแน่นอน!

คุณจะเท?

คุณรู้สึกว่าฉันพลาดอะไรไปหรือเปล่า? ดูเหมือนว่าเขาสัญญาว่าจะถ่ายโอนข้อมูลจาก SQL Server ไปยัง Vertica จากนั้นเขาก็รับมันและย้ายออกจากหัวข้อ คนขี้โกง!

ความโหดร้ายนี้เกิดขึ้นโดยเจตนา ฉันแค่ต้องถอดรหัสคำศัพท์บางอย่างให้คุณ ตอนนี้คุณสามารถไปต่อได้

แผนของเราคือ:

  1. ทำดาก
  2. สร้างงาน
  3. ดูว่าทุกอย่างสวยงามแค่ไหน
  4. กำหนดหมายเลขเซสชันเพื่อเติม
  5. รับข้อมูลจาก SQL Server
  6. ใส่ข้อมูลลงใน Vertica
  7. รวบรวมสถิติ

ดังนั้นเพื่อให้ทุกอย่างดำเนินไป ฉันได้ทำการเพิ่มเติมเล็กน้อยให้กับเรา docker-compose.yml:

นักเทียบท่า-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

เราเพิ่ม:

  • Vertica เป็นโฮสต์ dwh ด้วยการตั้งค่าเริ่มต้นส่วนใหญ่
  • สามอินสแตนซ์ของ SQL Server
  • เราเติมฐานข้อมูลในฐานข้อมูลหลังด้วยข้อมูลบางส่วน (ไม่ว่าในกรณีใดอย่าดู mssql_init.py!)

เราเปิดตัวสิ่งที่ดีทั้งหมดด้วยความช่วยเหลือของคำสั่งที่ซับซ้อนกว่าครั้งที่แล้วเล็กน้อย:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

สิ่งที่ตัวสุ่มมหัศจรรย์ของเราสร้างขึ้น คุณสามารถใช้ไอเท็มได้ Data Profiling/Ad Hoc Query:

Apache Airflow: ทำให้ ETL ง่ายขึ้น
สิ่งสำคัญคือไม่ต้องแสดงให้นักวิเคราะห์เห็น

อธิบายรายละเอียด เซสชัน ETL ฉันจะไม่ ทุกอย่างเป็นเรื่องเล็กน้อยที่นั่น: เราสร้างฐาน มีเครื่องหมายอยู่ในนั้น เรารวมทุกอย่างด้วยเครื่องมือจัดการบริบท และตอนนี้เราทำสิ่งนี้:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

ถึงเวลาแล้ว รวบรวมข้อมูลของเรา จากหนึ่งร้อยครึ่งโต๊ะของเรา มาทำสิ่งนี้ด้วยความช่วยเหลือของบรรทัดที่ไม่โอ้อวด:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ด้วยความช่วยเหลือของตะขอที่เราได้รับจาก Airflow pymssql-เชื่อมต่อ
  2. เรามาแทนที่ข้อ จำกัด ในรูปแบบของวันที่ในคำขอ - เอ็นจิ้นเทมเพลตจะถูกส่งเข้าสู่ฟังก์ชัน
  3. ฟีดคำขอของเรา pandasใครจะรับเรา DataFrame - มันจะเป็นประโยชน์กับเราในอนาคต

ฉันใช้การแทนที่ {dt} แทนพารามิเตอร์คำขอ %s ไม่ใช่เพราะฉันเป็นพินอคคิโอที่ชั่วร้าย แต่เป็นเพราะ pandas ไม่สามารถจัดการ pymssql และหลุดเป็นคนสุดท้าย params: Listแม้ว่าเขาจะต้องการจริงๆ tuple.
โปรดทราบว่าผู้พัฒนา pymssql ตัดสินใจที่จะไม่สนับสนุนเขาอีกต่อไป และถึงเวลาแล้วที่จะต้องย้ายออกไป pyodbc.

มาดูกันว่า Airflow ยัดข้อโต้แย้งของฟังก์ชันของเราด้วยอะไร:

Apache Airflow: ทำให้ ETL ง่ายขึ้น

หากไม่มีข้อมูลก็ไม่มีประโยชน์ในการดำเนินการต่อ แต่มันก็แปลกที่จะพิจารณาว่าการเติมสำเร็จ แต่นี่ไม่ใช่ความผิดพลาด อะ-อะ-อะ ทำไงดี?! และนี่คือสิ่งที่:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException บอก Airflow ว่าไม่มีข้อผิดพลาด แต่เราข้ามงาน อินเทอร์เฟซจะไม่มีสี่เหลี่ยมสีเขียวหรือสีแดง แต่เป็นสีชมพู

มาโยนข้อมูลของเรากันเถอะ หลายคอลัมน์:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

กล่าวคือ

  • ฐานข้อมูลที่เรารับคำสั่ง
  • ID ของช่วงน้ำท่วมของเรา (จะต่างกัน สำหรับทุกงาน),
  • แฮชจากแหล่งที่มาและรหัสคำสั่งซื้อ - เพื่อให้ในฐานข้อมูลสุดท้าย (ซึ่งทุกอย่างรวมอยู่ในตารางเดียว) เรามีรหัสคำสั่งซื้อที่ไม่ซ้ำกัน

ขั้นตอนสุดท้ายยังคงอยู่: เททุกอย่างลงใน Vertica และที่แปลกก็คือหนึ่งในวิธีที่ยอดเยี่ยมและมีประสิทธิภาพที่สุดในการทำเช่นนี้คือการใช้ CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. เรากำลังสร้างเครื่องรับพิเศษ StringIO.
  2. pandas กรุณาใส่ของเรา DataFrame ในขณะที่ CSV- เส้น
  3. มาเปิดการเชื่อมต่อกับ Vertica ที่เราชื่นชอบกันเถอะ
  4. และตอนนี้ด้วยความช่วยเหลือ copy() ส่งข้อมูลของเราโดยตรงไปยัง Vertika!

เราจะรับจากคนขับว่ามีกี่บรรทัดที่เต็มและบอกผู้จัดการเซสชั่นว่าทุกอย่างเรียบร้อยดี:

session.loaded_rows = cursor.rowcount
session.successful = True

นั่นคือทั้งหมดที่

ในการขาย เราสร้างแผ่นเป้าหมายด้วยตนเอง ที่นี่ฉันอนุญาตให้ใช้เครื่องจักรขนาดเล็ก:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ฉันใช้ VerticaOperator() ฉันสร้างสคีมาฐานข้อมูลและตาราง (ถ้ายังไม่มีอยู่แล้ว) สิ่งสำคัญคือการจัดเรียงการอ้างอิงอย่างถูกต้อง:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

ข้อสรุปถึง

- อืม - หนูตัวน้อยพูด - ไม่ใช่ตอนนี้
คุณเชื่อหรือไม่ว่าฉันเป็นสัตว์ที่น่ากลัวที่สุดในป่า?

จูเลีย โดนัลด์สัน, The Gruffalo

ฉันคิดว่าถ้าฉันกับเพื่อนร่วมงานแข่งขันกัน ใครจะสร้างและเริ่มกระบวนการ ETL ได้อย่างรวดเร็วตั้งแต่เริ่มต้น พวกเขาใช้ SSIS และเมาส์ และฉันใช้ Airflow ... จากนั้นเราจะเปรียบเทียบความง่ายในการบำรุงรักษา ... ว้าว ฉันคิดว่าคุณจะยอมรับว่าฉันจะเอาชนะพวกเขาในทุกด้าน!

ถ้าจริงจังมากกว่านี้ Apache Airflow - โดยการอธิบายกระบวนการในรูปแบบของรหัสโปรแกรม - ทำงานของฉัน มาก สะดวกสบายและสนุกสนานยิ่งขึ้น

ความสามารถในการขยายที่ไม่จำกัด ทั้งในแง่ของปลั๊กอินและความสามารถในการปรับขยายทำให้คุณมีโอกาสใช้ Airflow ในเกือบทุกพื้นที่: แม้แต่ในการรวบรวม เตรียม และประมวลผลข้อมูลแบบครบวงจร แม้กระทั่งในการปล่อยจรวด (ไปยังดาวอังคาร คอร์ส).

ส่วนสุดท้าย การอ้างอิงและข้อมูล

คราดที่เรารวบรวมมาให้คุณ

  • start_date. ใช่ นี่เป็นมีมท้องถิ่นอยู่แล้ว ผ่านทางอาร์กิวเมนต์หลักของ Doug start_date ผ่านทั้งหมด. สั้น ๆ หากคุณระบุใน start_date วันที่ปัจจุบัน และ schedule_interval - วันหนึ่ง DAG จะเริ่มในวันพรุ่งนี้ไม่เร็วกว่านี้
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    และไม่มีปัญหาอีกต่อไป

    มีข้อผิดพลาดรันไทม์อื่นที่เกี่ยวข้อง: Task is missing the start_date parameterซึ่งส่วนใหญ่มักระบุว่าคุณลืมผูกกับตัวดำเนินการ dag

  • ทั้งหมดในเครื่องเดียว ใช่ และฐาน (Airflow เองและการเคลือบผิวของเรา) และเว็บเซิร์ฟเวอร์ และตัวกำหนดตารางเวลา และผู้ปฏิบัติงาน และมันก็ได้ผล แต่เมื่อเวลาผ่านไป จำนวนงานสำหรับบริการก็เพิ่มมากขึ้น และเมื่อ PostgreSQL เริ่มตอบสนองต่อดัชนีใน 20 วินาที แทนที่จะเป็น 5 มิลลิวินาที เราก็นำมันออกไป
  • ผู้ดำเนินการในท้องถิ่น ใช่ เรายังคงนั่งอยู่บนนั้น และเราได้มาถึงขอบเหวลึกแล้ว จนถึงตอนนี้ LocalExecutor เพียงพอสำหรับเราแล้ว แต่ตอนนี้ถึงเวลาแล้วที่จะขยายงานโดยมีผู้ปฏิบัติงานอย่างน้อยหนึ่งคน และเราจะต้องทำงานอย่างหนักเพื่อเปลี่ยนไปใช้ CeleryExecutor และด้วยความจริงที่ว่าคุณสามารถทำงานกับมันในเครื่องเดียวได้ ไม่มีอะไรหยุดคุณจากการใช้ Celery แม้แต่บนเซิร์ฟเวอร์ ซึ่ง "แน่นอนว่าจะไม่เข้าสู่การผลิตโดยสุจริต!"
  • ไม่ใช้งาน เครื่องมือในตัว:
    • การเชื่อมต่อ เพื่อจัดเก็บข้อมูลประจำตัวบริการ
    • SLA พลาด เพื่อตอบสนองต่องานที่ทำไม่ทันเวลา
    • เอ็กซ์คอม สำหรับการแลกเปลี่ยนข้อมูลเมตา (ฉันพูดว่า เมตาdata!) ระหว่างงาน dag
  • การละเมิดจดหมาย ฉันจะพูดอะไรดี การแจ้งเตือนถูกตั้งค่าไว้สำหรับงานที่ล้มเหลวซ้ำๆ ทั้งหมด ตอนนี้ Gmail ที่ทำงานของฉันมีอีเมลมากกว่า 90k ฉบับจาก Airflow และเว็บเมลปากกระบอกปืนปฏิเสธที่จะรับและลบมากกว่า 100 ฉบับในแต่ละครั้ง

ข้อผิดพลาดเพิ่มเติม: Apache ข้อผิดพลาดการไหลของอากาศ

เครื่องมืออัตโนมัติเพิ่มเติม

เพื่อให้เราทำงานโดยใช้หัวมากขึ้น ไม่ใช่ด้วยมือ Airflow ได้เตรียมสิ่งนี้ไว้ให้เรา:

  • REST API - เขายังคงมีสถานะเป็น Experimental ซึ่งไม่ได้ป้องกันเขาจากการทำงาน ด้วยวิธีนี้ คุณไม่เพียงแต่จะได้รับข้อมูลเกี่ยวกับ dag และงานเท่านั้น แต่ยังหยุด/เริ่ม dag สร้าง DAG Run หรือพูลได้อีกด้วย
  • CLI - มีเครื่องมือมากมายให้ใช้งานผ่าน command line ซึ่งไม่ใช่แค่ไม่สะดวกในการใช้งานผ่าน WebUI แต่โดยทั่วไปจะไม่มี ตัวอย่างเช่น:
    • backfill จำเป็นต้องรีสตาร์ทอินสแตนซ์ของงาน
      ตัวอย่างเช่น นักวิเคราะห์มาและพูดว่า: "และคุณสหาย มีข้อมูลไร้สาระตั้งแต่วันที่ 1 ถึง 13 มกราคม! ซ่อมเลย ซ่อมเลย ซ่อมเลย ซ่อมเลย!" และคุณเป็นคนชอบทำอาหาร:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • บริการพื้นฐาน: initdb, resetdb, upgradedb, checkdb.
    • runซึ่งทำให้คุณสามารถเรียกใช้งานอินสแตนซ์เดียว และแม้แต่ให้คะแนนการขึ้นต่อกันทั้งหมด นอกจากนี้ คุณสามารถเรียกใช้ผ่าน LocalExecutorแม้ว่าคุณจะมีคลัสเตอร์ขึ้นฉ่ายก็ตาม
    • ทำสิ่งเดียวกันได้ค่อนข้างมาก testเฉพาะในฐานเท่านั้นที่ไม่ได้เขียนอะไรเลย
    • connections อนุญาตให้สร้างการเชื่อมต่อจำนวนมากจากเปลือก
  • python api - วิธีการโต้ตอบที่ค่อนข้างไม่ยอมใครง่ายๆซึ่งมีไว้สำหรับปลั๊กอินและไม่จับกลุ่มด้วยมือเล็ก ๆ แต่ใครจะห้ามไม่ให้เราไป /home/airflow/dags, วิ่ง ipython และเริ่มยุ่งเหยิง? ตัวอย่างเช่น คุณสามารถส่งออกการเชื่อมต่อทั้งหมดด้วยรหัสต่อไปนี้:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • การเชื่อมต่อกับฐานข้อมูลเมตาของ Airflow ฉันไม่แนะนำให้เขียนถึงมัน แต่การรับสถานะงานสำหรับเมตริกเฉพาะต่างๆ ทำได้เร็วและง่ายกว่าผ่าน API ใดๆ

    สมมติว่าไม่ใช่งานทั้งหมดของเราที่ไม่มีอำนาจ แต่บางครั้งอาจตกหล่นได้ และนี่เป็นเรื่องปกติ แต่การอุดตันบางอย่างน่าสงสัยอยู่แล้ว และจำเป็นต้องตรวจสอบ

    ระวัง SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

การอ้างอิง

และแน่นอน สิบลิงค์แรกจากการออกของ Google คือเนื้อหาของโฟลเดอร์ Airflow จากบุ๊กมาร์กของฉัน

และลิงค์ที่ใช้ในบทความ:

ที่มา: will.com

ซื้อโฮสติ้งที่เชื่อถือได้สำหรับไซต์ที่มีการป้องกัน DDoS เซิร์ฟเวอร์ VPS VDS 🔥 ซื้อบริการเว็บโฮสติ้งที่เชื่อถือได้ พร้อมระบบป้องกัน DDoS และเซิร์ฟเวอร์ VPS/VDS | ProHoster