Apache Airflow: Κάνοντας το ETL ευκολότερο

Γεια, είμαι ο Dmitry Logvinenko - Μηχανικός Δεδομένων του Τμήματος Analytics του ομίλου εταιρειών Vezet.

Θα σας πω για ένα υπέροχο εργαλείο για την ανάπτυξη διαδικασιών ETL - το Apache Airflow. Αλλά το Airflow είναι τόσο ευέλικτο και πολύπλευρο που θα πρέπει να το κοιτάξετε πιο προσεκτικά ακόμα κι αν δεν συμμετέχετε σε ροές δεδομένων, αλλά έχετε την ανάγκη να εκκινείτε περιοδικά οποιεσδήποτε διαδικασίες και να παρακολουθείτε την εκτέλεσή τους.

Και ναι, όχι μόνο θα πω, αλλά και θα δείξω: το πρόγραμμα έχει πολύ κώδικα, στιγμιότυπα οθόνης και συστάσεις.

Apache Airflow: Κάνοντας το ETL ευκολότερο
Τι βλέπετε συνήθως όταν ψάχνετε στο google τη λέξη Airflow / Wikimedia Commons

πίνακας περιεχομένων

Εισαγωγή

Το Apache Airflow είναι ακριβώς όπως το Django:

  • γραμμένο σε python
  • υπάρχει ένας υπέροχος πίνακας διαχείρισης,
  • επεκτείνεται επ' αόριστον

- μόνο καλύτερα, και έγινε για εντελώς διαφορετικούς σκοπούς, δηλαδή (όπως είναι γραμμένο πριν από το kat):

  • εκτέλεση και παρακολούθηση εργασιών σε απεριόριστο αριθμό μηχανών (όσες Celery / Kubernetes και η συνείδησή σας θα σας επιτρέψουν)
  • με δημιουργία δυναμικής ροής εργασιών από πολύ εύκολο στην εγγραφή και κατανόηση κώδικα Python
  • και τη δυνατότητα σύνδεσης οποιωνδήποτε βάσεων δεδομένων και API μεταξύ τους χρησιμοποιώντας έτοιμα στοιχεία και οικιακά πρόσθετα (που είναι εξαιρετικά απλό).

Χρησιμοποιούμε το Apache Airflow ως εξής:

  • συλλέγουμε δεδομένα από διάφορες πηγές (πολλές παρουσίες SQL Server και PostgreSQL, διάφορα API με μετρήσεις εφαρμογών, ακόμη και 1C) σε DWH και ODS (έχουμε Vertica και Clickhouse).
  • πόσο προχωρημένο cron, το οποίο ξεκινά τις διαδικασίες ενοποίησης δεδομένων στο ODS και παρακολουθεί επίσης τη συντήρησή τους.

Μέχρι πρόσφατα, οι ανάγκες μας καλύπτονταν από έναν μικρό διακομιστή με 32 πυρήνες και 50 GB μνήμης RAM. Στο Airflow, αυτό λειτουργεί:

  • περισσότερο 200 dags (στην πραγματικότητα ροές εργασίας, στις οποίες γεμίσαμε εργασίες),
  • σε καθεμία κατά μέσο όρο 70 εργασίες,
  • αυτή η καλοσύνη ξεκινά (επίσης κατά μέσο όρο) μια φορά την ώρα.

Και για το πώς επεκταθήκαμε, θα γράψω παρακάτω, αλλά τώρα ας ορίσουμε το über-πρόβλημα που θα λύσουμε:

Υπάρχουν τρεις αρχικοί SQL Servers, ο καθένας με 50 βάσεις δεδομένων - περιπτώσεις ενός έργου, αντίστοιχα, έχουν την ίδια δομή (σχεδόν παντού, mua-ha-ha), που σημαίνει ότι ο καθένας έχει έναν πίνακα παραγγελιών (ευτυχώς, ένας πίνακας με αυτό το όνομα μπορεί να προωθηθεί σε οποιαδήποτε επιχείρηση). Παίρνουμε τα δεδομένα προσθέτοντας πεδία υπηρεσιών (διακομιστής πηγής, βάση δεδομένων πηγής, αναγνωριστικό εργασιών ETL) και τα ρίχνουμε αφελώς, ας πούμε, στο Vertica.

Πάμε!

Το κύριο μέρος, πρακτικό (και λίγο θεωρητικό)

Γιατί εμείς (και εσείς)

Όταν τα δέντρα ήταν μεγάλα και εγώ ήμουν απλός SQLΣε ένα ρωσικό κατάστημα λιανικής, εξαπατήσαμε διαδικασίες ETL που ονομάζονται ροές δεδομένων χρησιμοποιώντας δύο εργαλεία που έχουμε στη διάθεσή μας:

  • Κέντρο Ενέργειας Informatica - ένα εξαιρετικά διαδεδομένο σύστημα, εξαιρετικά παραγωγικό, με το δικό του υλικό, τη δική του έκδοση. Χρησιμοποίησα θεός να το κάνει το 1% των δυνατοτήτων του. Γιατί; Λοιπόν, πρώτα απ 'όλα, αυτή η διεπαφή, κάπου από τη δεκαετία του 380, μας άσκησε ψυχική πίεση. Δεύτερον, αυτό το μηχάνημα έχει σχεδιαστεί για εξαιρετικά φανταχτερές διεργασίες, μανιώδη επαναχρησιμοποίηση εξαρτημάτων και άλλα πολύ σημαντικά επιχειρηματικά κόλπα. Σχετικά με το κόστος, όπως το φτερό του Airbus AXNUMX / έτος, δεν θα πούμε τίποτα.

    Προσοχή, ένα στιγμιότυπο οθόνης μπορεί να βλάψει λίγο τα άτομα κάτω των 30 ετών

    Apache Airflow: Κάνοντας το ETL ευκολότερο

  • SQL Server Integration Server - χρησιμοποιήσαμε αυτόν τον σύντροφο στις ροές εντός του έργου μας. Λοιπόν, στην πραγματικότητα: χρησιμοποιούμε ήδη τον SQL Server και θα ήταν κατά κάποιο τρόπο παράλογο να μην χρησιμοποιήσουμε τα εργαλεία ETL του. Όλα σε αυτό είναι καλά: τόσο η διεπαφή είναι όμορφη, όσο και οι αναφορές προόδου... Αλλά δεν είναι αυτός ο λόγος που αγαπάμε τα προϊόντα λογισμικού, ω, όχι για αυτό. Έκδοση του dtsx (που είναι XML με κόμβους ανακατεμένους κατά την αποθήκευση) μπορούμε, αλλά ποιο είναι το νόημα; Τι θα λέγατε να φτιάξετε ένα πακέτο εργασιών που θα σύρει εκατοντάδες πίνακες από τον έναν διακομιστή στον άλλο; Ναι, τι εκατό, ο δείκτης σας θα πέσει από είκοσι κομμάτια, κάνοντας κλικ στο κουμπί του ποντικιού. Σίγουρα όμως δείχνει πιο μοδάτος:

    Apache Airflow: Κάνοντας το ETL ευκολότερο

Σίγουρα ψάξαμε για διέξοδο. Υπόθεση ακόμη σχεδόν ήρθε σε μια αυτογραμμένη γεννήτρια πακέτων SSIS ...

…και μετά με βρήκε μια νέα δουλειά. Και το Apache Airflow με προσπέρασε πάνω του.

Όταν ανακάλυψα ότι οι περιγραφές διεργασιών ETL είναι απλοί κώδικας Python, απλά δεν χόρεψα από τη χαρά μου. Αυτός είναι ο τρόπος με τον οποίο οι ροές δεδομένων εκδόθηκαν και διαφοροποιήθηκαν, και η απόχυση πινάκων με μια ενιαία δομή από εκατοντάδες βάσεις δεδομένων σε έναν στόχο έγινε θέμα κώδικα Python σε μιάμιση ή δύο οθόνες 13 ιντσών.

Συναρμολόγηση του συμπλέγματος

Ας μην κανονίσουμε ένα εντελώς νηπιαγωγείο και ας μην μιλήσουμε για εντελώς προφανή πράγματα εδώ, όπως η εγκατάσταση του Airflow, της επιλεγμένης βάσης δεδομένων σας, του Celery και άλλων περιπτώσεων που περιγράφονται στις αποβάθρες.

Για να μπορέσουμε να ξεκινήσουμε αμέσως τα πειράματα, σκιαγράφησα docker-compose.yml στο οποίο:

  • Ας αυξήσουμε ουσιαστικά Ροής αέρα: Scheduler, Webserver. Το Flower θα περιστρέφεται επίσης εκεί για να παρακολουθεί τις εργασίες του Celery (επειδή έχει ήδη ωθηθεί apache/airflow:1.10.10-python3.7, αλλά δεν μας πειράζει)
  • PostgreSQL, στο οποίο το Airflow θα γράψει τις πληροφορίες σέρβις του (δεδομένα προγραμματιστή, στατιστικά στοιχεία εκτέλεσης κ.λπ.) και το Celery θα επισημάνει τις ολοκληρωμένες εργασίες.
  • Ρέντη, το οποίο θα λειτουργεί ως μεσίτης εργασιών για το Celery.
  • Σέλινο εργάτη, η οποία θα ασχολείται με την άμεση εκτέλεση εργασιών.
  • Σε φάκελο ./dags θα προσθέσουμε τα αρχεία μας με την περιγραφή των dags. Θα μαζευτούν εν κινήσει, επομένως δεν χρειάζεται να κάνετε ταχυδακτυλουργικά όλη τη στοίβα μετά από κάθε φτάρνισμα.

Σε ορισμένα σημεία, ο κώδικας στα παραδείγματα δεν εμφανίζεται πλήρως (για να μην μπερδεύεται το κείμενο), αλλά κάπου τροποποιείται στη διαδικασία. Μπορείτε να βρείτε πλήρη παραδείγματα κώδικα εργασίας στο αποθετήριο https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Σημειώσεις:

  • Στη συναρμολόγηση της σύνθεσης βασίστηκα σε μεγάλο βαθμό στη γνωστή εικόνα puckel/docker-flow air - να το τσεκαρεις σιγουρα. Ίσως δεν χρειάζεστε τίποτα άλλο στη ζωή σας.
  • Όλες οι ρυθμίσεις ροής αέρα είναι διαθέσιμες όχι μόνο μέσω airflow.cfg, αλλά και μέσω μεταβλητών περιβάλλοντος (ευχαριστώ τους προγραμματιστές), τις οποίες εκμεταλλεύτηκα κακόβουλα.
  • Φυσικά, δεν είναι έτοιμο για παραγωγή: σκόπιμα δεν έβαλα καρδιακούς παλμούς σε δοχεία, δεν ασχολήθηκα με την ασφάλεια. Αλλά έκανα το ελάχιστο κατάλληλο για τους πειραματιστές μας.
  • Σημειώστε ότι:
    • Ο φάκελος dag πρέπει να είναι προσβάσιμος τόσο στον προγραμματιστή όσο και στους εργαζόμενους.
    • Το ίδιο ισχύει για όλες τις βιβλιοθήκες τρίτων - πρέπει όλες να είναι εγκατεστημένες σε μηχανήματα με προγραμματιστή και εργάτες.

Λοιπόν, τώρα είναι απλό:

$ docker-compose up --scale worker=3

Αφού όλα ανέβουν, μπορείτε να δείτε τις διεπαφές ιστού:

Βασικές έννοιες

Εάν δεν καταλάβατε τίποτα σε όλα αυτά τα «ντάγια», τότε εδώ είναι ένα σύντομο λεξικό:

  • Scheduler - ο πιο σημαντικός θείος στο Airflow, ο οποίος ελέγχει ότι τα ρομπότ δουλεύουν σκληρά και όχι ένα άτομο: παρακολουθεί το πρόγραμμα, ενημερώνει τα στοιχεία, εκκινεί εργασίες.

    Γενικά, σε παλιότερες εκδόσεις, είχε προβλήματα με τη μνήμη (όχι, όχι αμνησία, αλλά διαρροές) και η παράμετρος legacy παρέμενε ακόμη και στις ρυθμίσεις run_duration — το διάστημα επανεκκίνησης του. Τώρα όμως όλα είναι καλά.

  • DAG (γνωστός και ως "dag") - "κατευθυνόμενο άκυκλο γράφημα", αλλά ένας τέτοιος ορισμός θα πει σε λίγους ανθρώπους, αλλά στην πραγματικότητα είναι ένα δοχείο για εργασίες που αλληλεπιδρούν μεταξύ τους (βλ. παρακάτω) ή ένα ανάλογο του πακέτου στο SSIS και της ροής εργασίας στο Informatica .

    Εκτός από τα dags, μπορεί να υπάρχουν ακόμα υποδαγκώματα, αλλά πιθανότατα δεν θα φτάσουμε σε αυτά.

  • DAG Run - αρχικοποιημένο dag, στο οποίο εκχωρείται το δικό του execution_date. Τα Dagran του ίδιου dag μπορούν να λειτουργήσουν παράλληλα (αν κάνατε τις εργασίες σας ανίκανες, φυσικά).
  • Χειριστής είναι κομμάτια κώδικα που είναι υπεύθυνα για την εκτέλεση μιας συγκεκριμένης ενέργειας. Υπάρχουν τρεις τύποι χειριστών:
    • δράσησαν το αγαπημένο μας PythonOperator, το οποίο μπορεί να εκτελέσει οποιονδήποτε (έγκυρο) κώδικα Python.
    • μεταφορά, που μεταφέρει δεδομένα από μέρος σε μέρος, ας πούμε, MsSqlToHiveTransfer;
    • αισθητήρα Από την άλλη πλευρά, θα σας επιτρέψει να αντιδράσετε ή να επιβραδύνετε την περαιτέρω εκτέλεση του dag μέχρι να συμβεί ένα συμβάν. HttpSensor μπορεί να τραβήξει το καθορισμένο τελικό σημείο και όταν περιμένει η επιθυμητή απόκριση, ξεκινήστε τη μεταφορά GoogleCloudStorageToS3Operator. Ένας περίεργος νους θα ρωτήσει: «γιατί; Άλλωστε, μπορείτε να κάνετε επαναλήψεις απευθείας στον χειριστή!». Και μετά, για να μην φράξουμε τη δεξαμενή εργασιών με ανασταλμένους χειριστές. Ο αισθητήρας ξεκινά, ελέγχει και πεθαίνει πριν από την επόμενη προσπάθεια.
  • Έργο - οι δηλωμένοι χειριστές, ανεξαρτήτως τύπου, και προσαρτημένοι στο dag προάγονται στον βαθμό της εργασίας.
  • παράδειγμα εργασίας - όταν ο γενικός σχεδιαστής αποφάσισε ότι ήταν καιρός να στείλει καθήκοντα στη μάχη σε ερμηνευτές-εργάτες (ακριβώς επί τόπου, αν χρησιμοποιήσουμε LocalExecutor ή σε έναν απομακρυσμένο κόμβο στην περίπτωση του CeleryExecutor), τους εκχωρεί ένα πλαίσιο (δηλαδή, ένα σύνολο μεταβλητών - παραμέτρων εκτέλεσης), επεκτείνει τα πρότυπα εντολών ή ερωτημάτων και τα συγκεντρώνει.

Δημιουργούμε εργασίες

Αρχικά, ας περιγράψουμε το γενικό σχήμα του ζυμαριού μας και στη συνέχεια θα βουτήξουμε στις λεπτομέρειες όλο και περισσότερο, επειδή εφαρμόζουμε μερικές μη τετριμμένες λύσεις.

Έτσι, στην απλούστερη μορφή του, ένα τέτοιο dag θα μοιάζει με αυτό:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ας το καταλάβουμε:

  • Αρχικά, εισάγουμε τα απαραίτητα lib και κάτι άλλο;
  • sql_server_ds - Είναι List[namedtuple[str, str]] με τα ονόματα των συνδέσεων από το Airflow Connections και τις βάσεις δεδομένων από τις οποίες θα πάρουμε το πιάτο μας.
  • dag - η ανακοίνωση του dag μας, που πρέπει απαραίτητα να είναι μέσα globals(), διαφορετικά το Airflow δεν θα το βρει. Ο Νταγκ πρέπει επίσης να πει:
    • ποιο είναι το όνομα του orders - αυτό το όνομα θα εμφανιστεί στη συνέχεια στη διεπαφή ιστού,
    • ότι θα εργάζεται από τα μεσάνυχτα της XNUMXης Ιουλίου,
    • και θα πρέπει να τρέχει, περίπου κάθε 6 ώρες (για σκληρούς τύπους εδώ αντί για timedelta() αποδεκτός cron-γραμμή 0 0 0/6 ? * * *, για τους λιγότερο κουλ - μια έκφραση όπως @daily);
  • workflow() θα κάνει την κύρια δουλειά, αλλά όχι τώρα. Προς το παρόν, απλώς θα απορρίψουμε το περιεχόμενό μας στο αρχείο καταγραφής.
  • Και τώρα η απλή μαγεία της δημιουργίας εργασιών:
    • τρέχουμε μέσα από τις πηγές μας.
    • αρχικοποίηση PythonOperator, που θα εκτελέσει το ομοίωμά μας workflow(). Μην ξεχάσετε να καθορίσετε ένα μοναδικό (μέσα στο dag) όνομα της εργασίας και να συνδέσετε το ίδιο το dag. Σημαία provide_context με τη σειρά του, θα ρίξει επιπλέον ορίσματα στη συνάρτηση, τα οποία θα συλλέξουμε προσεκτικά χρησιμοποιώντας **context.

Προς το παρόν, αυτό είναι όλο. Τι πήραμε:

  • νέο dag στη διεπαφή ιστού,
  • μιάμιση εκατό εργασίες που θα εκτελούνται παράλληλα (αν το επιτρέπουν οι ρυθμίσεις Airflow, Celery και η χωρητικότητα του διακομιστή).

Λοιπόν, σχεδόν το κατάλαβα.

Apache Airflow: Κάνοντας το ETL ευκολότερο
Ποιος θα εγκαταστήσει τις εξαρτήσεις;

Για να το απλοποιήσω όλο αυτό το πράγμα, έσφιξα docker-compose.yml επεξεργασία requirements.txt σε όλους τους κόμβους.

Τώρα έφυγε:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Τα γκρι τετράγωνα είναι στιγμιότυπα εργασιών που υποβάλλονται σε επεξεργασία από τον προγραμματιστή.

Περιμένουμε λίγο, τα καθήκοντα λύνονται από τους εργαζόμενους:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Τα πράσινα βέβαια έχουν δουλέψει με επιτυχία. Οι κόκκινοι δεν έχουν μεγάλη επιτυχία.

Παρεμπιπτόντως, δεν υπάρχει φάκελος στο προϊόν μας ./dags, δεν υπάρχει συγχρονισμός μεταξύ των μηχανών - όλες οι κρίσεις βρίσκονται μέσα git στο Gitlab μας και το Gitlab CI διανέμει ενημερώσεις σε μηχανήματα κατά τη συγχώνευση master.

Λίγα λόγια για το Flower

Όσο οι εργάτριες αλωνίζουν τις πιπίλες μας, ας θυμηθούμε ένα άλλο εργαλείο που μπορεί να μας δείξει κάτι - Λουλούδι.

Η πρώτη σελίδα με συνοπτικές πληροφορίες για τους κόμβους εργαζομένων:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Η πιο έντονη σελίδα με εργασίες που λειτούργησαν:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Η πιο βαρετή σελίδα με την κατάσταση του μεσίτη μας:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Η πιο φωτεινή σελίδα είναι με γραφήματα κατάστασης εργασιών και τον χρόνο εκτέλεσής τους:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Φορτώνουμε τα υποφορτισμένα

Έτσι, όλα τα καθήκοντα έχουν επιλυθεί, μπορείτε να παρασύρετε τους τραυματίες.

Apache Airflow: Κάνοντας το ETL ευκολότερο

Και υπήρχαν πολλοί τραυματίες - για τον έναν ή τον άλλο λόγο. Στην περίπτωση της σωστής χρήσης του Airflow, αυτά τα τετράγωνα υποδεικνύουν ότι τα δεδομένα σίγουρα δεν έφθασαν.

Πρέπει να παρακολουθήσετε το αρχείο καταγραφής και να επανεκκινήσετε τις εμφανίσεις εργασιών.

Κάνοντας κλικ σε οποιοδήποτε τετράγωνο, θα δούμε τις ενέργειες που είναι διαθέσιμες σε εμάς:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Μπορείτε να πάρετε και να κάνετε Clear the fallen. Δηλαδή, ξεχνάμε ότι κάτι απέτυχε εκεί και η ίδια εργασία παρουσίας θα πάει στον προγραμματιστή.

Apache Airflow: Κάνοντας το ETL ευκολότερο

Είναι σαφές ότι το να το κάνεις αυτό με το ποντίκι με όλα τα κόκκινα τετράγωνα δεν είναι πολύ ανθρώπινο - αυτό δεν περιμένουμε από το Airflow. Φυσικά, έχουμε όπλα μαζικής καταστροφής: Browse/Task Instances

Apache Airflow: Κάνοντας το ETL ευκολότερο

Ας επιλέξουμε τα πάντα ταυτόχρονα και ας μηδενίσουμε, κάντε κλικ στο σωστό στοιχείο:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Μετά τον καθαρισμό, τα ταξί μας μοιάζουν με αυτό (περιμένουν ήδη τον προγραμματιστή να τα προγραμματίσει):

Apache Airflow: Κάνοντας το ETL ευκολότερο

Συνδέσεις, άγκιστρα και άλλες μεταβλητές

Ήρθε η ώρα να δούμε την επόμενη DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Έκανε ποτέ όλοι μια ενημέρωση αναφοράς; Αυτή είναι και πάλι: υπάρχει μια λίστα πηγών από όπου μπορείτε να λάβετε τα δεδομένα. υπαρχει λιστα που να βαλω? μην ξεχάσετε να κορνάρετε όταν όλα συνέβησαν ή έσπασαν (καλά, αυτό δεν αφορά εμάς, όχι).

Ας δούμε ξανά το αρχείο και ας δούμε τα νέα σκοτεινά πράγματα:

  • from commons.operators import TelegramBotSendMessage - τίποτα δεν μας εμποδίζει να φτιάξουμε τους δικούς μας χειριστές, τους οποίους εκμεταλλευτήκαμε φτιάχνοντας ένα μικρό περιτύλιγμα για την αποστολή μηνυμάτων στο Unblocked. (Θα μιλήσουμε περισσότερα για αυτόν τον τελεστή παρακάτω).
  • default_args={} - Το dag μπορεί να διανείμει τα ίδια ορίσματα σε όλους τους τελεστές του.
  • to='{{ var.value.all_the_kings_men }}' - χωράφι to δεν θα έχουμε σκληρό κώδικα, αλλά θα δημιουργηθεί δυναμικά χρησιμοποιώντας Jinja και μια μεταβλητή με μια λίστα email, την οποία έβαλα προσεκτικά Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — προϋπόθεση για την εκκίνηση του χειριστή. Στην περίπτωσή μας, το γράμμα θα πετάξει στα αφεντικά μόνο εάν έχουν επιλυθεί όλες οι εξαρτήσεις επιτυχώς;
  • tg_bot_conn_id='tg_main' - επιχειρήματα conn_id αποδεχόμαστε τα αναγνωριστικά σύνδεσης που δημιουργούμε Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - τα μηνύματα στο Telegram θα πετάξουν μακριά μόνο εάν υπάρχουν πεσμένες εργασίες.
  • task_concurrency=1 - απαγορεύουμε την ταυτόχρονη εκκίνηση πολλών στιγμιότυπων εργασιών μιας εργασίας. Διαφορετικά, θα έχουμε την ταυτόχρονη εκκίνηση πολλών VerticaOperator (κοιτάζοντας σε ένα τραπέζι).
  • report_update >> [email, tg] - όλα VerticaOperator συγκλίνουν στην αποστολή επιστολών και μηνυμάτων, όπως αυτό:
    Apache Airflow: Κάνοντας το ETL ευκολότερο

    Αλλά επειδή οι χειριστές ειδοποιήσεων έχουν διαφορετικές συνθήκες εκκίνησης, μόνο ένας θα λειτουργήσει. Στην προβολή δέντρου, όλα φαίνονται λιγότερο οπτικά:
    Apache Airflow: Κάνοντας το ETL ευκολότερο

Θα πω λίγα λόγια για μακροεντολές και οι φίλοι τους - μεταβλητές.

Οι μακροεντολές είναι σύμβολα θέσης Jinja που μπορούν να αντικαταστήσουν διάφορες χρήσιμες πληροφορίες σε ορίσματα χειριστή. Για παράδειγμα, όπως αυτό:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} θα επεκταθεί στα περιεχόμενα της μεταβλητής περιβάλλοντος execution_date σε μορφή YYYY-MM-DD: 2020-07-14. Το καλύτερο μέρος είναι ότι οι μεταβλητές περιβάλλοντος καρφώνονται σε μια συγκεκριμένη παρουσία εργασίας (ένα τετράγωνο στην προβολή δέντρου) και όταν επανεκκινηθούν, τα σύμβολα κράτησης θέσης θα επεκταθούν στις ίδιες τιμές.

Οι εκχωρημένες τιμές μπορούν να προβληθούν χρησιμοποιώντας το κουμπί Rendered σε κάθε στιγμιότυπο εργασίας. Αυτός είναι ο τρόπος με τον οποίο γίνεται η αποστολή μιας επιστολής:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Και έτσι στην αποστολή με την αποστολή μηνύματος:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Μια πλήρης λίστα με ενσωματωμένες μακροεντολές για την πιο πρόσφατη διαθέσιμη έκδοση είναι διαθέσιμη εδώ: αναφορά μακροεντολών

Επιπλέον, με τη βοήθεια πρόσθετων, μπορούμε να δηλώσουμε τις δικές μας μακροεντολές, αλλά αυτό είναι μια άλλη ιστορία.

Εκτός από τα προκαθορισμένα πράγματα, μπορούμε να αντικαταστήσουμε τις τιμές των μεταβλητών μας (το χρησιμοποίησα ήδη στον παραπάνω κώδικα). Ας δημιουργήσουμε μέσα Admin/Variables δυο πραγματα:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Όλα όσα μπορείτε να χρησιμοποιήσετε:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Η τιμή μπορεί να είναι βαθμωτή ή μπορεί επίσης να είναι JSON. Σε περίπτωση JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

απλά χρησιμοποιήστε τη διαδρομή προς το επιθυμητό κλειδί: {{ var.json.bot_config.bot.token }}.

Θα πω κυριολεκτικά μια λέξη και θα δείξω ένα στιγμιότυπο οθόνης για συνδέσεις. Όλα είναι στοιχειώδη εδώ: στη σελίδα Admin/Connections δημιουργούμε μια σύνδεση, προσθέτουμε τα στοιχεία σύνδεσης / κωδικούς πρόσβασης και πιο συγκεκριμένες παραμέτρους εκεί. Σαν αυτό:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Οι κωδικοί πρόσβασης μπορούν να κρυπτογραφηθούν (πιο διεξοδικά από τον προεπιλεγμένο) ή μπορείτε να παραλείψετε τον τύπο σύνδεσης (όπως έκανα για tg_main) - το γεγονός είναι ότι η λίστα των τύπων είναι ενσωματωμένη στα μοντέλα Airflow και δεν μπορεί να επεκταθεί χωρίς να μπει στους πηγαίους κώδικες (αν ξαφνικά δεν έψαξα κάτι στο google, διορθώστε με), αλλά τίποτα δεν θα μας εμποδίσει να λάβουμε πιστώσεις μόλις όνομα.

Μπορείτε επίσης να κάνετε πολλές συνδέσεις με το ίδιο όνομα: σε αυτήν την περίπτωση, τη μέθοδο BaseHook.get_connection(), που μας δίνει συνδέσεις ονομαστικά, θα δώσει τυχαίος από διάφορους συνονόματους (θα ήταν πιο λογικό να φτιάξουμε το Round Robin, αλλά ας το αφήσουμε στη συνείδηση ​​των προγραμματιστών του Airflow).

Οι μεταβλητές και οι συνδέσεις είναι σίγουρα υπέροχα εργαλεία, αλλά είναι σημαντικό να μην χάσετε την ισορροπία: ποια μέρη των ροών σας αποθηκεύετε στον ίδιο τον κώδικα και ποια μέρη δίνετε στο Airflow για αποθήκευση. Από τη μία πλευρά, η γρήγορη αλλαγή της τιμής, για παράδειγμα, μια ταχυδρομική θυρίδα, μπορεί να είναι βολική μέσω της διεπαφής χρήστη. Από την άλλη, αυτό είναι ακόμα μια επιστροφή στο κλικ του ποντικιού, από το οποίο (εγώ) θέλαμε να απαλλαγούμε.

Η εργασία με συνδέσεις είναι μία από τις εργασίες αγκίστρια. Γενικά, τα άγκιστρα ροής αέρα είναι σημεία για τη σύνδεσή του με υπηρεσίες και βιβλιοθήκες τρίτων. Π.χ, JiraHook θα ανοίξει έναν πελάτη για να αλληλεπιδράσουμε με τον Jira (μπορείτε να μετακινήσετε εργασίες εμπρός και πίσω) και με τη βοήθεια SambaHook μπορείτε να προωθήσετε ένα τοπικό αρχείο σε smb-σημείο.

Ανάλυση του προσαρμοσμένου τελεστή

Και πλησιάσαμε να δούμε πώς φτιάχνεται TelegramBotSendMessage

Κώδικας commons/operators.py με τον πραγματικό χειριστή:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Εδώ, όπως όλα τα άλλα στο Airflow, όλα είναι πολύ απλά:

  • Κληρονόμησε από BaseOperator, το οποίο εφαρμόζει αρκετά πράγματα ειδικά για τη ροή αέρα (κοιτάξτε τον ελεύθερο χρόνο σας)
  • Δηλωμένα πεδία template_fields, στο οποίο ο Jinja θα αναζητήσει μακροεντολές για επεξεργασία.
  • Τακτοποίησε τα σωστά επιχειρήματα για __init__(), ορίστε τις προεπιλογές όπου χρειάζεται.
  • Δεν ξεχάσαμε ούτε την αρχικοποίηση του προγόνου.
  • Άνοιξε το αντίστοιχο άγκιστρο TelegramBotHookέλαβε ένα αντικείμενο πελάτη από αυτό.
  • Παράκαμψη (επανακαθορισμένη) μέθοδος BaseOperator.execute(), το οποίο το Airfow θα συσπάσει όταν έρθει η ώρα να ξεκινήσει ο χειριστής - σε αυτό θα εφαρμόσουμε την κύρια ενέργεια, ξεχνώντας να συνδεθείτε. (Συνδεόμαστε, παρεμπιπτόντως, απευθείας stdout и stderr - Η ροή αέρα θα παρεμποδίσει τα πάντα, θα τα τυλίξει όμορφα, θα τα αποσυντεθεί όπου χρειάζεται.)

Ας δούμε τι έχουμε commons/hooks.py. Το πρώτο μέρος του αρχείου, με το ίδιο το άγκιστρο:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Δεν ξέρω καν τι να εξηγήσω εδώ, θα σημειώσω μόνο τα σημαντικά σημεία:

  • Κληρονομούμε, σκεφτόμαστε τα επιχειρήματα - στις περισσότερες περιπτώσεις θα είναι ένα: conn_id;
  • Υπερισχύουσες τυπικές μεθόδους: Περιορίστηκα get_conn(), στο οποίο παίρνω τις παραμέτρους σύνδεσης ονομαστικά και απλώς παίρνω την ενότητα extra (αυτό είναι ένα πεδίο JSON), στο οποίο (σύμφωνα με τις δικές μου οδηγίες!) έβαλα το διακριτικό bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Δημιουργώ ένα παράδειγμα μας TelegramBot, δίνοντάς του ένα συγκεκριμένο διακριτικό.

Αυτό είναι όλο. Μπορείτε να πάρετε έναν πελάτη από ένα γάντζο χρησιμοποιώντας TelegramBotHook().clent ή TelegramBotHook().get_conn().

Και το δεύτερο μέρος του αρχείου, στο οποίο φτιάχνω ένα microwrapper για το Telegram REST API, για να μην σύρω το ίδιο python-telegram-bot για μια μέθοδο sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ο σωστός τρόπος είναι να τα αθροίσεις όλα: TelegramBotSendMessage, TelegramBotHook, TelegramBot - στο πρόσθετο, τοποθετήστε το σε ένα δημόσιο αποθετήριο και δώστε το στο Open Source.

Ενώ μελετούσαμε όλα αυτά, οι ενημερώσεις των αναφορών μας κατάφεραν να αποτύχουν με επιτυχία και να μου στείλουν ένα μήνυμα σφάλματος στο κανάλι. Θα το τσεκάρω να δω αν είναι λάθος...

Apache Airflow: Κάνοντας το ETL ευκολότερο
Κάτι έσπασε στο δόγμα μας! Αυτό δεν περιμέναμε; Ακριβώς!

Θα χύσεις;

Νιώθεις ότι έχασα κάτι; Φαίνεται ότι υποσχέθηκε να μεταφέρει δεδομένα από τον SQL Server στην Vertica, και μετά τα πήρε και απομακρύνθηκε από το θέμα, το σκάρτο!

Αυτή η θηριωδία ήταν σκόπιμη, απλά έπρεπε να αποκρυπτογραφήσω κάποια ορολογία για εσάς. Τώρα μπορείτε να προχωρήσετε παρακάτω.

Το σχέδιό μας ήταν το εξής:

  1. Κάνετε το χάλι
  2. Δημιουργήστε εργασίες
  3. Δείτε πόσο όμορφα είναι όλα
  4. Εκχωρήστε αριθμούς συνεδρίας σε γεμίσματα
  5. Λάβετε δεδομένα από τον SQL Server
  6. Τοποθετήστε δεδομένα στο Vertica
  7. Συλλέξτε στατιστικά στοιχεία

Έτσι, για να τα βάλω όλα σε λειτουργία, έκανα μια μικρή προσθήκη στο δικό μας docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Εκεί σηκώνουμε:

  • Vertica ως οικοδεσπότης dwh με τις πιο προεπιλεγμένες ρυθμίσεις,
  • τρεις περιπτώσεις SQL Server,
  • γεμίζουμε τις βάσεις δεδομένων της τελευταίας με κάποια δεδομένα (σε καμία περίπτωση μην εξετάσετε mssql_init.py!)

Ξεκινάμε όλα τα καλά με τη βοήθεια μιας ελαφρώς πιο περίπλοκης εντολής από την προηγούμενη φορά:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Αυτό που δημιούργησε ο θαυματουργός τυχαιοποιητής μας, μπορείτε να χρησιμοποιήσετε το αντικείμενο Data Profiling/Ad Hoc Query:

Apache Airflow: Κάνοντας το ETL ευκολότερο
Το κύριο πράγμα είναι να μην το δείξουμε στους αναλυτές

επεξεργαστείτε συνεδρίες ETL Δεν θα το κάνω, όλα είναι ασήμαντα εκεί: φτιάχνουμε μια βάση, υπάρχει μια πινακίδα σε αυτήν, τυλίγουμε τα πάντα με έναν διαχειριστή περιβάλλοντος και τώρα κάνουμε αυτό:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ήρθε η ώρα συλλέγουμε τα δεδομένα μας από τα μιάμιση τραπέζια μας. Ας το κάνουμε αυτό με τη βοήθεια πολύ ανεπιτήδευτων γραμμών:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Με τη βοήθεια ενός γάντζου παίρνουμε από το Airflow pymssql-συνδέω-συωδεομαι
  2. Ας αντικαταστήσουμε έναν περιορισμό με τη μορφή ημερομηνίας στο αίτημα - θα μεταφερθεί στη λειτουργία από τη μηχανή προτύπου.
  3. Τροφοδοτώντας το αίτημά μας pandasποιος θα μας πάρει DataFrame - θα μας είναι χρήσιμο στο μέλλον.

Χρησιμοποιώ αντικατάσταση {dt} αντί για παράμετρο αιτήματος %s όχι επειδή είμαι ένας κακός Πινόκιο, αλλά επειδή pandas δεν αντέχει pymssql και γλιστράει το τελευταίο params: Listαν και θέλει πολύ tuple.
Σημειώστε επίσης ότι ο προγραμματιστής pymssql αποφάσισε να μην τον στηρίξει πια, και ήρθε η ώρα να φύγει pyodbc.

Ας δούμε με τι γέμισε το Airflow τα ορίσματα των συναρτήσεών μας:

Apache Airflow: Κάνοντας το ETL ευκολότερο

Αν δεν υπάρχουν δεδομένα, τότε δεν έχει νόημα να συνεχίσουμε. Είναι όμως και περίεργο να θεωρούμε επιτυχημένο το γέμισμα. Αυτό όμως δεν είναι λάθος. Α-αχ-αχ, τι να κάνω;! Και να τι:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException θα πει στο Airflow ότι δεν υπάρχουν σφάλματα, αλλά παρακάμπτουμε την εργασία. Η διεπαφή δεν θα έχει πράσινο ή κόκκινο τετράγωνο, αλλά ροζ.

Ας πετάξουμε τα δεδομένα μας πολλαπλές στήλες:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

δηλαδή

  • Η βάση δεδομένων από την οποία πήραμε τις παραγγελίες,
  • Το αναγνωριστικό της περιόδου πλημμύρας (θα είναι διαφορετικό για κάθε εργασία),
  • Ένας κατακερματισμός από την πηγή και το αναγνωριστικό παραγγελίας - έτσι ώστε στην τελική βάση δεδομένων (όπου όλα χύνονται σε έναν πίνακα) έχουμε ένα μοναδικό αναγνωριστικό παραγγελίας.

Το προτελευταίο βήμα παραμένει: ρίξτε τα πάντα στη Vertica. Και, παραδόξως, ένας από τους πιο θεαματικούς και αποτελεσματικούς τρόπους για να γίνει αυτό είναι μέσω του CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Φτιάχνουμε έναν ειδικό δέκτη StringIO.
  2. pandas θα βάλει ευγενικά μας DataFrame με τη μορφή CSV- γραμμές.
  3. Ας ανοίξουμε μια σύνδεση με το αγαπημένο μας Vertica με ένα γάντζο.
  4. Και τώρα με τη βοήθεια copy() στείλτε τα δεδομένα μας απευθείας στη Vertika!

Θα πάρουμε από το πρόγραμμα οδήγησης πόσες γραμμές γεμίστηκαν και θα πούμε στον διαχειριστή συνεδρίας ότι όλα είναι εντάξει:

session.loaded_rows = cursor.rowcount
session.successful = True

Αυτό είναι όλο.

Στην πώληση, δημιουργούμε την πλάκα-στόχο χειροκίνητα. Εδώ επέτρεψα στον εαυτό μου ένα μικρό μηχάνημα:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

χρησιμοποιώ VerticaOperator() Δημιουργώ ένα σχήμα βάσης δεδομένων και έναν πίνακα (αν δεν υπάρχουν ήδη, φυσικά). Το κύριο πράγμα είναι να τακτοποιήσετε σωστά τις εξαρτήσεις:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ανακεφαλαίωση

- Λοιπόν, - είπε το ποντικάκι, - δεν είναι, τώρα
Είσαι πεπεισμένος ότι είμαι το πιο τρομερό ζώο στο δάσος;

Τζούλια Ντόναλντσον, Ο Γκρούφαλο

Νομίζω ότι αν οι συνάδελφοί μου και εγώ είχαμε έναν ανταγωνισμό: ποιος θα δημιουργήσει και θα ξεκινήσει γρήγορα μια διαδικασία ETL από την αρχή: αυτοί με το SSIS και ένα ποντίκι και εγώ με το Airflow... Και τότε θα συγκρίναμε επίσης την ευκολία συντήρησης... Πω πω, θα συμφωνήσετε ότι θα τους νικήσω σε όλα τα μέτωπα!

Αν είναι λίγο πιο σοβαρά, τότε το Apache Airflow -περιγράφοντας διαδικασίες με τη μορφή κώδικα προγράμματος- έκανε τη δουλειά μου πολύ πιο άνετα και ευχάριστα.

Η απεριόριστη επεκτασιμότητα του, τόσο ως προς τα plug-ins όσο και ως προς την προδιάθεση για επεκτασιμότητα, σας δίνει την ευκαιρία να χρησιμοποιήσετε το Airflow σχεδόν σε οποιοδήποτε τομέα: ακόμη και σε ολόκληρο τον κύκλο συλλογής, προετοιμασίας και επεξεργασίας δεδομένων, ακόμη και στην εκτόξευση πυραύλων (προς Άρη, σειρά μαθημάτων).

Τελικό μέρος, αναφορά και πληροφορίες

Η γκανιότα που έχουμε συγκεντρώσει για εσάς

  • start_date. Ναι, αυτό είναι ήδη ένα τοπικό μιμίδιο. Μέσω του κύριου επιχειρήματος του Νταγκ start_date όλα περνούν. Εν συντομία, αν προσδιορίσετε σε start_date τρέχουσα ημερομηνία και schedule_interval - μια μέρα, τότε η DAG θα ξεκινήσει αύριο όχι νωρίτερα.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Και όχι άλλα προβλήματα.

    Υπάρχει ένα άλλο σφάλμα χρόνου εκτέλεσης που σχετίζεται με αυτό: Task is missing the start_date parameter, που τις περισσότερες φορές υποδεικνύει ότι ξεχάσατε να δεσμευτείτε με τον χειριστή dag.

  • Όλα σε ένα μηχάνημα. Ναι, και βάσεις (το ίδιο το Airflow και η επίστρωσή μας), και διακομιστής ιστού, προγραμματιστής και εργαζόμενοι. Και μάλιστα λειτούργησε. Αλλά με την πάροδο του χρόνου, ο αριθμός των εργασιών για τις υπηρεσίες αυξήθηκε και όταν η PostgreSQL άρχισε να ανταποκρίνεται στο ευρετήριο σε 20 δευτερόλεπτα αντί για 5 ms, το πήραμε και το μεταφέραμε.
  • LocalExecutor. Ναι, ακόμα καθόμαστε πάνω του, και έχουμε ήδη φτάσει στην άκρη της αβύσσου. Το LocalExecutor μας ήταν αρκετό μέχρι στιγμής, αλλά τώρα ήρθε η ώρα να επεκταθούμε με τουλάχιστον έναν εργαζόμενο και θα πρέπει να εργαστούμε σκληρά για να μετακομίσουμε στο CeleryExecutor. Και λόγω του γεγονότος ότι μπορείτε να εργαστείτε με αυτό σε ένα μηχάνημα, τίποτα δεν σας εμποδίζει να χρησιμοποιήσετε το Celery ακόμη και σε διακομιστή, ο οποίος "φυσικά, δεν θα βγει ποτέ στην παραγωγή, ειλικρινά!"
  • Μη χρήση ενσωματωμένα εργαλεία:
    • Διασυνδέσεις για την αποθήκευση των διαπιστευτηρίων υπηρεσίας,
    • Δεσποινίς SLA να ανταποκρίνεται σε εργασίες που δεν λειτούργησαν εγκαίρως,
    • xcom για ανταλλαγή μεταδεδομένων (είπα μεταδεδομένα!) μεταξύ των εργασιών dag.
  • Κατάχρηση αλληλογραφίας. Λοιπόν, τι να πω; Δημιουργήθηκαν ειδοποιήσεις για όλες τις επαναλήψεις πεσμένων εργασιών. Τώρα το Gmail της δουλειάς μου έχει >90 μηνύματα ηλεκτρονικού ταχυδρομείου από το Airflow και το ρύγχος αλληλογραφίας ιστού αρνείται να παραλάβει και να διαγράψει περισσότερα από 100 ταυτόχρονα.

Περισσότερες παγίδες: Apache Airflow Pitfails

Περισσότερα εργαλεία αυτοματισμού

Για να δουλεύουμε ακόμα περισσότερο με το κεφάλι και όχι με τα χέρια μας, το Airflow ετοίμασε για εμάς αυτό:

  • REST API - εξακολουθεί να έχει την ιδιότητα του Πειραματικού, κάτι που δεν τον εμποδίζει να εργαστεί. Με αυτό, μπορείτε όχι μόνο να λάβετε πληροφορίες σχετικά με τα dags και τις εργασίες, αλλά και να σταματήσετε/ξεκινήσετε ένα dag, να δημιουργήσετε ένα DAG Run ή ένα pool.
  • CLI - πολλά εργαλεία είναι διαθέσιμα μέσω της γραμμής εντολών που δεν είναι απλώς άβολα στη χρήση μέσω του WebUI, αλλά γενικά απουσιάζουν. Για παράδειγμα:
    • backfill απαιτείται για επανεκκίνηση των παρουσιών εργασιών.
      Για παράδειγμα, ήρθαν αναλυτές και είπαν: «Κι εσύ σύντροφε, έχεις βλακείες στα στοιχεία από 1 έως 13 Ιανουαρίου! Διορθώστε το, φτιάξτε το, φτιάξτε το, φτιάξτε το!». Και είσαι μια τέτοια εστία:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Βασική υπηρεσία: initdb, resetdb, upgradedb, checkdb.
    • run, το οποίο σας επιτρέπει να εκτελέσετε μια εργασία παρουσίας και ακόμη και να σκοράρετε σε όλες τις εξαρτήσεις. Επιπλέον, μπορείτε να το εκτελέσετε μέσω LocalExecutor, ακόμα κι αν έχετε ένα σύμπλεγμα σέλινου.
    • Κάνει σχεδόν το ίδιο πράγμα test, μόνο και στις βάσεις δεν γράφει τίποτα.
    • connections επιτρέπει τη μαζική δημιουργία συνδέσεων από το κέλυφος.
  • API Python - ένας μάλλον σκληροπυρηνικός τρόπος αλληλεπίδρασης, ο οποίος προορίζεται για πρόσθετα, και όχι για σωρεία με μικρά χέρια. Αλλά ποιος θα μας εμποδίσει να πάμε /home/airflow/dags, τρέξιμο ipython και να αρχίσουμε να τα μπερδεύουμε; Μπορείτε, για παράδειγμα, να εξάγετε όλες τις συνδέσεις με τον ακόλουθο κώδικα:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Σύνδεση στη βάση μεταδεδομένων Airflow. Δεν συνιστώ να γράψετε σε αυτό, αλλά η λήψη καταστάσεων εργασιών για διάφορες συγκεκριμένες μετρήσεις μπορεί να είναι πολύ πιο γρήγορη και ευκολότερη από τη χρήση οποιουδήποτε από τα API.

    Ας πούμε ότι δεν είναι όλες οι εργασίες μας ανίκανες, αλλά μερικές φορές μπορεί να πέσουν και αυτό είναι φυσιολογικό. Αλλά μερικά μπλοκαρίσματα είναι ήδη ύποπτα και θα ήταν απαραίτητο να ελέγξετε.

    Προσοχή SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

παραπομπές

Και φυσικά, οι δέκα πρώτοι σύνδεσμοι από την έκδοση της Google είναι τα περιεχόμενα του φακέλου Airflow από τους σελιδοδείκτες μου.

Και οι σύνδεσμοι που χρησιμοποιούνται στο άρθρο:

Πηγή: www.habr.com

Αγοράστε αξιόπιστη φιλοξενία για ιστότοπους με προστασία DDoS, διακομιστές VPS VDS 🔥 Αγοράστε αξιόπιστη φιλοξενία ιστοσελίδων με προστασία DDoS, διακομιστές VPS VDS | ProHoster