Γεια, είμαι ο Dmitry Logvinenko - Μηχανικός Δεδομένων του Τμήματος Analytics του ομίλου εταιρειών Vezet.
Θα σας πω για ένα υπέροχο εργαλείο για την ανάπτυξη διαδικασιών ETL - το Apache Airflow. Αλλά το Airflow είναι τόσο ευέλικτο και πολύπλευρο που θα πρέπει να το κοιτάξετε πιο προσεκτικά ακόμα κι αν δεν συμμετέχετε σε ροές δεδομένων, αλλά έχετε την ανάγκη να εκκινείτε περιοδικά οποιεσδήποτε διαδικασίες και να παρακολουθείτε την εκτέλεσή τους.
Και ναι, όχι μόνο θα πω, αλλά και θα δείξω: το πρόγραμμα έχει πολύ κώδικα, στιγμιότυπα οθόνης και συστάσεις.

Τι βλέπετε συνήθως όταν ψάχνετε στο google τη λέξη Airflow / Wikimedia Commons
πίνακας περιεχομένων
Εισαγωγή
Το Apache Airflow είναι ακριβώς όπως το Django:
- γραμμένο σε python
- υπάρχει ένας υπέροχος πίνακας διαχείρισης,
- επεκτείνεται επ' αόριστον
- μόνο καλύτερα, και έγινε για εντελώς διαφορετικούς σκοπούς, δηλαδή (όπως είναι γραμμένο πριν από το kat):
- εκτέλεση και παρακολούθηση εργασιών σε απεριόριστο αριθμό μηχανών (όσες Celery / Kubernetes και η συνείδησή σας θα σας επιτρέψουν)
- με δημιουργία δυναμικής ροής εργασιών από πολύ εύκολο στην εγγραφή και κατανόηση κώδικα Python
- και τη δυνατότητα σύνδεσης οποιωνδήποτε βάσεων δεδομένων και API μεταξύ τους χρησιμοποιώντας έτοιμα στοιχεία και οικιακά πρόσθετα (που είναι εξαιρετικά απλό).
Χρησιμοποιούμε το Apache Airflow ως εξής:
- συλλέγουμε δεδομένα από διάφορες πηγές (πολλές παρουσίες SQL Server και PostgreSQL, διάφορα API με μετρήσεις εφαρμογών, ακόμη και 1C) σε DWH και ODS (έχουμε Vertica και Clickhouse).
- πόσο προχωρημένο
cron, το οποίο ξεκινά τις διαδικασίες ενοποίησης δεδομένων στο ODS και παρακολουθεί επίσης τη συντήρησή τους.
Μέχρι πρόσφατα, οι ανάγκες μας καλύπτονταν από έναν μικρό διακομιστή με 32 πυρήνες και 50 GB μνήμης RAM. Στο Airflow, αυτό λειτουργεί:
- περισσότερο 200 dags (στην πραγματικότητα ροές εργασίας, στις οποίες γεμίσαμε εργασίες),
- σε καθεμία κατά μέσο όρο 70 εργασίες,
- αυτή η καλοσύνη ξεκινά (επίσης κατά μέσο όρο) μια φορά την ώρα.
Και για το πώς επεκταθήκαμε, θα γράψω παρακάτω, αλλά τώρα ας ορίσουμε το über-πρόβλημα που θα λύσουμε:
Υπάρχουν τρεις αρχικοί SQL Servers, ο καθένας με 50 βάσεις δεδομένων - περιπτώσεις ενός έργου, αντίστοιχα, έχουν την ίδια δομή (σχεδόν παντού, mua-ha-ha), που σημαίνει ότι ο καθένας έχει έναν πίνακα παραγγελιών (ευτυχώς, ένας πίνακας με αυτό το όνομα μπορεί να προωθηθεί σε οποιαδήποτε επιχείρηση). Παίρνουμε τα δεδομένα προσθέτοντας πεδία υπηρεσιών (διακομιστής πηγής, βάση δεδομένων πηγής, αναγνωριστικό εργασιών ETL) και τα ρίχνουμε αφελώς, ας πούμε, στο Vertica.
Πάμε!
Το κύριο μέρος, πρακτικό (και λίγο θεωρητικό)
Γιατί εμείς (και εσείς)
Όταν τα δέντρα ήταν μεγάλα και εγώ ήμουν απλός SQLΣε ένα ρωσικό κατάστημα λιανικής, εξαπατήσαμε διαδικασίες ETL που ονομάζονται ροές δεδομένων χρησιμοποιώντας δύο εργαλεία που έχουμε στη διάθεσή μας:
- Κέντρο Ενέργειας Informatica - ένα εξαιρετικά διαδεδομένο σύστημα, εξαιρετικά παραγωγικό, με το δικό του υλικό, τη δική του έκδοση. Χρησιμοποίησα θεός να το κάνει το 1% των δυνατοτήτων του. Γιατί; Λοιπόν, πρώτα απ 'όλα, αυτή η διεπαφή, κάπου από τη δεκαετία του 380, μας άσκησε ψυχική πίεση. Δεύτερον, αυτό το μηχάνημα έχει σχεδιαστεί για εξαιρετικά φανταχτερές διεργασίες, μανιώδη επαναχρησιμοποίηση εξαρτημάτων και άλλα πολύ σημαντικά επιχειρηματικά κόλπα. Σχετικά με το κόστος, όπως το φτερό του Airbus AXNUMX / έτος, δεν θα πούμε τίποτα.
Προσοχή, ένα στιγμιότυπο οθόνης μπορεί να βλάψει λίγο τα άτομα κάτω των 30 ετών

- SQL Server Integration Server - χρησιμοποιήσαμε αυτόν τον σύντροφο στις ροές εντός του έργου μας. Λοιπόν, στην πραγματικότητα: χρησιμοποιούμε ήδη τον SQL Server και θα ήταν κατά κάποιο τρόπο παράλογο να μην χρησιμοποιήσουμε τα εργαλεία ETL του. Όλα σε αυτό είναι καλά: τόσο η διεπαφή είναι όμορφη, όσο και οι αναφορές προόδου... Αλλά δεν είναι αυτός ο λόγος που αγαπάμε τα προϊόντα λογισμικού, ω, όχι για αυτό. Έκδοση του
dtsx(που είναι XML με κόμβους ανακατεμένους κατά την αποθήκευση) μπορούμε, αλλά ποιο είναι το νόημα; Τι θα λέγατε να φτιάξετε ένα πακέτο εργασιών που θα σύρει εκατοντάδες πίνακες από τον έναν διακομιστή στον άλλο; Ναι, τι εκατό, ο δείκτης σας θα πέσει από είκοσι κομμάτια, κάνοντας κλικ στο κουμπί του ποντικιού. Σίγουρα όμως δείχνει πιο μοδάτος:
Σίγουρα ψάξαμε για διέξοδο. Υπόθεση ακόμη σχεδόν ήρθε σε μια αυτογραμμένη γεννήτρια πακέτων SSIS ...
…και μετά με βρήκε μια νέα δουλειά. Και το Apache Airflow με προσπέρασε πάνω του.
Όταν ανακάλυψα ότι οι περιγραφές διεργασιών ETL είναι απλοί κώδικας Python, απλά δεν χόρεψα από τη χαρά μου. Αυτός είναι ο τρόπος με τον οποίο οι ροές δεδομένων εκδόθηκαν και διαφοροποιήθηκαν, και η απόχυση πινάκων με μια ενιαία δομή από εκατοντάδες βάσεις δεδομένων σε έναν στόχο έγινε θέμα κώδικα Python σε μιάμιση ή δύο οθόνες 13 ιντσών.
Συναρμολόγηση του συμπλέγματος
Ας μην κανονίσουμε ένα εντελώς νηπιαγωγείο και ας μην μιλήσουμε για εντελώς προφανή πράγματα εδώ, όπως η εγκατάσταση του Airflow, της επιλεγμένης βάσης δεδομένων σας, του Celery και άλλων περιπτώσεων που περιγράφονται στις αποβάθρες.
Για να μπορέσουμε να ξεκινήσουμε αμέσως τα πειράματα, σκιαγράφησα docker-compose.yml στο οποίο:
- Ας αυξήσουμε ουσιαστικά Ροής αέρα: Scheduler, Webserver. Το Flower θα περιστρέφεται επίσης εκεί για να παρακολουθεί τις εργασίες του Celery (επειδή έχει ήδη ωθηθεί
apache/airflow:1.10.10-python3.7, αλλά δεν μας πειράζει) - PostgreSQL, στο οποίο το Airflow θα γράψει τις πληροφορίες σέρβις του (δεδομένα προγραμματιστή, στατιστικά στοιχεία εκτέλεσης κ.λπ.) και το Celery θα επισημάνει τις ολοκληρωμένες εργασίες.
- Ρέντη, το οποίο θα λειτουργεί ως μεσίτης εργασιών για το Celery.
- Σέλινο εργάτη, η οποία θα ασχολείται με την άμεση εκτέλεση εργασιών.
- Σε φάκελο
./dagsθα προσθέσουμε τα αρχεία μας με την περιγραφή των dags. Θα μαζευτούν εν κινήσει, επομένως δεν χρειάζεται να κάνετε ταχυδακτυλουργικά όλη τη στοίβα μετά από κάθε φτάρνισμα.
Σε ορισμένα σημεία, ο κώδικας στα παραδείγματα δεν εμφανίζεται πλήρως (για να μην μπερδεύεται το κείμενο), αλλά κάπου τροποποιείται στη διαδικασία. Μπορείτε να βρείτε πλήρη παραδείγματα κώδικα εργασίας στο αποθετήριο .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerΣημειώσεις:
- Στη συναρμολόγηση της σύνθεσης βασίστηκα σε μεγάλο βαθμό στη γνωστή εικόνα - να το τσεκαρεις σιγουρα. Ίσως δεν χρειάζεστε τίποτα άλλο στη ζωή σας.
- Όλες οι ρυθμίσεις ροής αέρα είναι διαθέσιμες όχι μόνο μέσω
airflow.cfg, αλλά και μέσω μεταβλητών περιβάλλοντος (ευχαριστώ τους προγραμματιστές), τις οποίες εκμεταλλεύτηκα κακόβουλα. - Φυσικά, δεν είναι έτοιμο για παραγωγή: σκόπιμα δεν έβαλα καρδιακούς παλμούς σε δοχεία, δεν ασχολήθηκα με την ασφάλεια. Αλλά έκανα το ελάχιστο κατάλληλο για τους πειραματιστές μας.
- Σημειώστε ότι:
- Ο φάκελος dag πρέπει να είναι προσβάσιμος τόσο στον προγραμματιστή όσο και στους εργαζόμενους.
- Το ίδιο ισχύει για όλες τις βιβλιοθήκες τρίτων - πρέπει όλες να είναι εγκατεστημένες σε μηχανήματα με προγραμματιστή και εργάτες.
Λοιπόν, τώρα είναι απλό:
$ docker-compose up --scale worker=3Αφού όλα ανέβουν, μπορείτε να δείτε τις διεπαφές ιστού:
- Ροή αέρα:
- Λουλούδι:
Βασικές έννοιες
Εάν δεν καταλάβατε τίποτα σε όλα αυτά τα «ντάγια», τότε εδώ είναι ένα σύντομο λεξικό:
- Scheduler - ο πιο σημαντικός θείος στο Airflow, ο οποίος ελέγχει ότι τα ρομπότ δουλεύουν σκληρά και όχι ένα άτομο: παρακολουθεί το πρόγραμμα, ενημερώνει τα στοιχεία, εκκινεί εργασίες.
Γενικά, σε παλιότερες εκδόσεις, είχε προβλήματα με τη μνήμη (όχι, όχι αμνησία, αλλά διαρροές) και η παράμετρος legacy παρέμενε ακόμη και στις ρυθμίσεις
run_duration— το διάστημα επανεκκίνησης του. Τώρα όμως όλα είναι καλά. - DAG (γνωστός και ως "dag") - "κατευθυνόμενο άκυκλο γράφημα", αλλά ένας τέτοιος ορισμός θα πει σε λίγους ανθρώπους, αλλά στην πραγματικότητα είναι ένα δοχείο για εργασίες που αλληλεπιδρούν μεταξύ τους (βλ. παρακάτω) ή ένα ανάλογο του πακέτου στο SSIS και της ροής εργασίας στο Informatica .
Εκτός από τα dags, μπορεί να υπάρχουν ακόμα υποδαγκώματα, αλλά πιθανότατα δεν θα φτάσουμε σε αυτά.
- DAG Run - αρχικοποιημένο dag, στο οποίο εκχωρείται το δικό του
execution_date. Τα Dagran του ίδιου dag μπορούν να λειτουργήσουν παράλληλα (αν κάνατε τις εργασίες σας ανίκανες, φυσικά). - Χειριστής είναι κομμάτια κώδικα που είναι υπεύθυνα για την εκτέλεση μιας συγκεκριμένης ενέργειας. Υπάρχουν τρεις τύποι χειριστών:
- δράσησαν το αγαπημένο μας
PythonOperator, το οποίο μπορεί να εκτελέσει οποιονδήποτε (έγκυρο) κώδικα Python. - μεταφορά, που μεταφέρει δεδομένα από μέρος σε μέρος, ας πούμε,
MsSqlToHiveTransfer; - αισθητήρα Από την άλλη πλευρά, θα σας επιτρέψει να αντιδράσετε ή να επιβραδύνετε την περαιτέρω εκτέλεση του dag μέχρι να συμβεί ένα συμβάν.
HttpSensorμπορεί να τραβήξει το καθορισμένο τελικό σημείο και όταν περιμένει η επιθυμητή απόκριση, ξεκινήστε τη μεταφοράGoogleCloudStorageToS3Operator. Ένας περίεργος νους θα ρωτήσει: «γιατί; Άλλωστε, μπορείτε να κάνετε επαναλήψεις απευθείας στον χειριστή!». Και μετά, για να μην φράξουμε τη δεξαμενή εργασιών με ανασταλμένους χειριστές. Ο αισθητήρας ξεκινά, ελέγχει και πεθαίνει πριν από την επόμενη προσπάθεια.
- δράσησαν το αγαπημένο μας
- Έργο - οι δηλωμένοι χειριστές, ανεξαρτήτως τύπου, και προσαρτημένοι στο dag προάγονται στον βαθμό της εργασίας.
- παράδειγμα εργασίας - όταν ο γενικός σχεδιαστής αποφάσισε ότι ήταν καιρός να στείλει καθήκοντα στη μάχη σε ερμηνευτές-εργάτες (ακριβώς επί τόπου, αν χρησιμοποιήσουμε
LocalExecutorή σε έναν απομακρυσμένο κόμβο στην περίπτωση τουCeleryExecutor), τους εκχωρεί ένα πλαίσιο (δηλαδή, ένα σύνολο μεταβλητών - παραμέτρων εκτέλεσης), επεκτείνει τα πρότυπα εντολών ή ερωτημάτων και τα συγκεντρώνει.
Δημιουργούμε εργασίες
Αρχικά, ας περιγράψουμε το γενικό σχήμα του ζυμαριού μας και στη συνέχεια θα βουτήξουμε στις λεπτομέρειες όλο και περισσότερο, επειδή εφαρμόζουμε μερικές μη τετριμμένες λύσεις.
Έτσι, στην απλούστερη μορφή του, ένα τέτοιο dag θα μοιάζει με αυτό:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ας το καταλάβουμε:
- Αρχικά, εισάγουμε τα απαραίτητα lib και κάτι άλλο;
sql_server_ds- ΕίναιList[namedtuple[str, str]]με τα ονόματα των συνδέσεων από το Airflow Connections και τις βάσεις δεδομένων από τις οποίες θα πάρουμε το πιάτο μας.dag- η ανακοίνωση του dag μας, που πρέπει απαραίτητα να είναι μέσαglobals(), διαφορετικά το Airflow δεν θα το βρει. Ο Νταγκ πρέπει επίσης να πει:- ποιο είναι το όνομα του
orders- αυτό το όνομα θα εμφανιστεί στη συνέχεια στη διεπαφή ιστού, - ότι θα εργάζεται από τα μεσάνυχτα της XNUMXης Ιουλίου,
- και θα πρέπει να τρέχει, περίπου κάθε 6 ώρες (για σκληρούς τύπους εδώ αντί για
timedelta()αποδεκτόςcron-γραμμή0 0 0/6 ? * * *, για τους λιγότερο κουλ - μια έκφραση όπως@daily);
- ποιο είναι το όνομα του
workflow()θα κάνει την κύρια δουλειά, αλλά όχι τώρα. Προς το παρόν, απλώς θα απορρίψουμε το περιεχόμενό μας στο αρχείο καταγραφής.- Και τώρα η απλή μαγεία της δημιουργίας εργασιών:
- τρέχουμε μέσα από τις πηγές μας.
- αρχικοποίηση
PythonOperator, που θα εκτελέσει το ομοίωμά μαςworkflow(). Μην ξεχάσετε να καθορίσετε ένα μοναδικό (μέσα στο dag) όνομα της εργασίας και να συνδέσετε το ίδιο το dag. Σημαίαprovide_contextμε τη σειρά του, θα ρίξει επιπλέον ορίσματα στη συνάρτηση, τα οποία θα συλλέξουμε προσεκτικά χρησιμοποιώντας**context.
Προς το παρόν, αυτό είναι όλο. Τι πήραμε:
- νέο dag στη διεπαφή ιστού,
- μιάμιση εκατό εργασίες που θα εκτελούνται παράλληλα (αν το επιτρέπουν οι ρυθμίσεις Airflow, Celery και η χωρητικότητα του διακομιστή).
Λοιπόν, σχεδόν το κατάλαβα.

Ποιος θα εγκαταστήσει τις εξαρτήσεις;
Για να το απλοποιήσω όλο αυτό το πράγμα, έσφιξα docker-compose.yml επεξεργασία requirements.txt σε όλους τους κόμβους.
Τώρα έφυγε:

Τα γκρι τετράγωνα είναι στιγμιότυπα εργασιών που υποβάλλονται σε επεξεργασία από τον προγραμματιστή.
Περιμένουμε λίγο, τα καθήκοντα λύνονται από τους εργαζόμενους:

Τα πράσινα βέβαια έχουν δουλέψει με επιτυχία. Οι κόκκινοι δεν έχουν μεγάλη επιτυχία.
Παρεμπιπτόντως, δεν υπάρχει φάκελος στο προϊόν μας
./dags, δεν υπάρχει συγχρονισμός μεταξύ των μηχανών - όλες οι κρίσεις βρίσκονται μέσαgitστο Gitlab μας και το Gitlab CI διανέμει ενημερώσεις σε μηχανήματα κατά τη συγχώνευσηmaster.
Λίγα λόγια για το Flower
Όσο οι εργάτριες αλωνίζουν τις πιπίλες μας, ας θυμηθούμε ένα άλλο εργαλείο που μπορεί να μας δείξει κάτι - Λουλούδι.
Η πρώτη σελίδα με συνοπτικές πληροφορίες για τους κόμβους εργαζομένων:

Η πιο έντονη σελίδα με εργασίες που λειτούργησαν:

Η πιο βαρετή σελίδα με την κατάσταση του μεσίτη μας:

Η πιο φωτεινή σελίδα είναι με γραφήματα κατάστασης εργασιών και τον χρόνο εκτέλεσής τους:

Φορτώνουμε τα υποφορτισμένα
Έτσι, όλα τα καθήκοντα έχουν επιλυθεί, μπορείτε να παρασύρετε τους τραυματίες.

Και υπήρχαν πολλοί τραυματίες - για τον έναν ή τον άλλο λόγο. Στην περίπτωση της σωστής χρήσης του Airflow, αυτά τα τετράγωνα υποδεικνύουν ότι τα δεδομένα σίγουρα δεν έφθασαν.
Πρέπει να παρακολουθήσετε το αρχείο καταγραφής και να επανεκκινήσετε τις εμφανίσεις εργασιών.
Κάνοντας κλικ σε οποιοδήποτε τετράγωνο, θα δούμε τις ενέργειες που είναι διαθέσιμες σε εμάς:

Μπορείτε να πάρετε και να κάνετε Clear the fallen. Δηλαδή, ξεχνάμε ότι κάτι απέτυχε εκεί και η ίδια εργασία παρουσίας θα πάει στον προγραμματιστή.

Είναι σαφές ότι το να το κάνεις αυτό με το ποντίκι με όλα τα κόκκινα τετράγωνα δεν είναι πολύ ανθρώπινο - αυτό δεν περιμένουμε από το Airflow. Φυσικά, έχουμε όπλα μαζικής καταστροφής: Browse/Task Instances

Ας επιλέξουμε τα πάντα ταυτόχρονα και ας μηδενίσουμε, κάντε κλικ στο σωστό στοιχείο:

Μετά τον καθαρισμό, τα ταξί μας μοιάζουν με αυτό (περιμένουν ήδη τον προγραμματιστή να τα προγραμματίσει):

Συνδέσεις, άγκιστρα και άλλες μεταβλητές
Ήρθε η ώρα να δούμε την επόμενη DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Έκανε ποτέ όλοι μια ενημέρωση αναφοράς; Αυτή είναι και πάλι: υπάρχει μια λίστα πηγών από όπου μπορείτε να λάβετε τα δεδομένα. υπαρχει λιστα που να βαλω? μην ξεχάσετε να κορνάρετε όταν όλα συνέβησαν ή έσπασαν (καλά, αυτό δεν αφορά εμάς, όχι).
Ας δούμε ξανά το αρχείο και ας δούμε τα νέα σκοτεινά πράγματα:
from commons.operators import TelegramBotSendMessage- τίποτα δεν μας εμποδίζει να φτιάξουμε τους δικούς μας χειριστές, τους οποίους εκμεταλλευτήκαμε φτιάχνοντας ένα μικρό περιτύλιγμα για την αποστολή μηνυμάτων στο Unblocked. (Θα μιλήσουμε περισσότερα για αυτόν τον τελεστή παρακάτω).default_args={}- Το dag μπορεί να διανείμει τα ίδια ορίσματα σε όλους τους τελεστές του.to='{{ var.value.all_the_kings_men }}'- χωράφιtoδεν θα έχουμε σκληρό κώδικα, αλλά θα δημιουργηθεί δυναμικά χρησιμοποιώντας Jinja και μια μεταβλητή με μια λίστα email, την οποία έβαλα προσεκτικάAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— προϋπόθεση για την εκκίνηση του χειριστή. Στην περίπτωσή μας, το γράμμα θα πετάξει στα αφεντικά μόνο εάν έχουν επιλυθεί όλες οι εξαρτήσεις επιτυχώς;tg_bot_conn_id='tg_main'- επιχειρήματαconn_idαποδεχόμαστε τα αναγνωριστικά σύνδεσης που δημιουργούμεAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- τα μηνύματα στο Telegram θα πετάξουν μακριά μόνο εάν υπάρχουν πεσμένες εργασίες.task_concurrency=1- απαγορεύουμε την ταυτόχρονη εκκίνηση πολλών στιγμιότυπων εργασιών μιας εργασίας. Διαφορετικά, θα έχουμε την ταυτόχρονη εκκίνηση πολλώνVerticaOperator(κοιτάζοντας σε ένα τραπέζι).report_update >> [email, tg]- όλαVerticaOperatorσυγκλίνουν στην αποστολή επιστολών και μηνυμάτων, όπως αυτό:

Αλλά επειδή οι χειριστές ειδοποιήσεων έχουν διαφορετικές συνθήκες εκκίνησης, μόνο ένας θα λειτουργήσει. Στην προβολή δέντρου, όλα φαίνονται λιγότερο οπτικά:

Θα πω λίγα λόγια για μακροεντολές και οι φίλοι τους - μεταβλητές.
Οι μακροεντολές είναι σύμβολα θέσης Jinja που μπορούν να αντικαταστήσουν διάφορες χρήσιμες πληροφορίες σε ορίσματα χειριστή. Για παράδειγμα, όπως αυτό:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} θα επεκταθεί στα περιεχόμενα της μεταβλητής περιβάλλοντος execution_date σε μορφή YYYY-MM-DD: 2020-07-14. Το καλύτερο μέρος είναι ότι οι μεταβλητές περιβάλλοντος καρφώνονται σε μια συγκεκριμένη παρουσία εργασίας (ένα τετράγωνο στην προβολή δέντρου) και όταν επανεκκινηθούν, τα σύμβολα κράτησης θέσης θα επεκταθούν στις ίδιες τιμές.
Οι εκχωρημένες τιμές μπορούν να προβληθούν χρησιμοποιώντας το κουμπί Rendered σε κάθε στιγμιότυπο εργασίας. Αυτός είναι ο τρόπος με τον οποίο γίνεται η αποστολή μιας επιστολής:

Και έτσι στην αποστολή με την αποστολή μηνύματος:

Μια πλήρης λίστα με ενσωματωμένες μακροεντολές για την πιο πρόσφατη διαθέσιμη έκδοση είναι διαθέσιμη εδώ:
Επιπλέον, με τη βοήθεια πρόσθετων, μπορούμε να δηλώσουμε τις δικές μας μακροεντολές, αλλά αυτό είναι μια άλλη ιστορία.
Εκτός από τα προκαθορισμένα πράγματα, μπορούμε να αντικαταστήσουμε τις τιμές των μεταβλητών μας (το χρησιμοποίησα ήδη στον παραπάνω κώδικα). Ας δημιουργήσουμε μέσα Admin/Variables δυο πραγματα:

Όλα όσα μπορείτε να χρησιμοποιήσετε:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Η τιμή μπορεί να είναι βαθμωτή ή μπορεί επίσης να είναι JSON. Σε περίπτωση JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}απλά χρησιμοποιήστε τη διαδρομή προς το επιθυμητό κλειδί: {{ var.json.bot_config.bot.token }}.
Θα πω κυριολεκτικά μια λέξη και θα δείξω ένα στιγμιότυπο οθόνης για συνδέσεις. Όλα είναι στοιχειώδη εδώ: στη σελίδα Admin/Connections δημιουργούμε μια σύνδεση, προσθέτουμε τα στοιχεία σύνδεσης / κωδικούς πρόσβασης και πιο συγκεκριμένες παραμέτρους εκεί. Σαν αυτό:

Οι κωδικοί πρόσβασης μπορούν να κρυπτογραφηθούν (πιο διεξοδικά από τον προεπιλεγμένο) ή μπορείτε να παραλείψετε τον τύπο σύνδεσης (όπως έκανα για tg_main) - το γεγονός είναι ότι η λίστα των τύπων είναι ενσωματωμένη στα μοντέλα Airflow και δεν μπορεί να επεκταθεί χωρίς να μπει στους πηγαίους κώδικες (αν ξαφνικά δεν έψαξα κάτι στο google, διορθώστε με), αλλά τίποτα δεν θα μας εμποδίσει να λάβουμε πιστώσεις μόλις όνομα.
Μπορείτε επίσης να κάνετε πολλές συνδέσεις με το ίδιο όνομα: σε αυτήν την περίπτωση, τη μέθοδο BaseHook.get_connection(), που μας δίνει συνδέσεις ονομαστικά, θα δώσει τυχαίος από διάφορους συνονόματους (θα ήταν πιο λογικό να φτιάξουμε το Round Robin, αλλά ας το αφήσουμε στη συνείδηση των προγραμματιστών του Airflow).
Οι μεταβλητές και οι συνδέσεις είναι σίγουρα υπέροχα εργαλεία, αλλά είναι σημαντικό να μην χάσετε την ισορροπία: ποια μέρη των ροών σας αποθηκεύετε στον ίδιο τον κώδικα και ποια μέρη δίνετε στο Airflow για αποθήκευση. Από τη μία πλευρά, η γρήγορη αλλαγή της τιμής, για παράδειγμα, μια ταχυδρομική θυρίδα, μπορεί να είναι βολική μέσω της διεπαφής χρήστη. Από την άλλη, αυτό είναι ακόμα μια επιστροφή στο κλικ του ποντικιού, από το οποίο (εγώ) θέλαμε να απαλλαγούμε.
Η εργασία με συνδέσεις είναι μία από τις εργασίες αγκίστρια. Γενικά, τα άγκιστρα ροής αέρα είναι σημεία για τη σύνδεσή του με υπηρεσίες και βιβλιοθήκες τρίτων. Π.χ, JiraHook θα ανοίξει έναν πελάτη για να αλληλεπιδράσουμε με τον Jira (μπορείτε να μετακινήσετε εργασίες εμπρός και πίσω) και με τη βοήθεια SambaHook μπορείτε να προωθήσετε ένα τοπικό αρχείο σε smb-σημείο.
Ανάλυση του προσαρμοσμένου τελεστή
Και πλησιάσαμε να δούμε πώς φτιάχνεται TelegramBotSendMessage
Κώδικας commons/operators.py με τον πραγματικό χειριστή:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Εδώ, όπως όλα τα άλλα στο Airflow, όλα είναι πολύ απλά:
- Κληρονόμησε από
BaseOperator, το οποίο εφαρμόζει αρκετά πράγματα ειδικά για τη ροή αέρα (κοιτάξτε τον ελεύθερο χρόνο σας) - Δηλωμένα πεδία
template_fields, στο οποίο ο Jinja θα αναζητήσει μακροεντολές για επεξεργασία. - Τακτοποίησε τα σωστά επιχειρήματα για
__init__(), ορίστε τις προεπιλογές όπου χρειάζεται. - Δεν ξεχάσαμε ούτε την αρχικοποίηση του προγόνου.
- Άνοιξε το αντίστοιχο άγκιστρο
TelegramBotHookέλαβε ένα αντικείμενο πελάτη από αυτό. - Παράκαμψη (επανακαθορισμένη) μέθοδος
BaseOperator.execute(), το οποίο το Airfow θα συσπάσει όταν έρθει η ώρα να ξεκινήσει ο χειριστής - σε αυτό θα εφαρμόσουμε την κύρια ενέργεια, ξεχνώντας να συνδεθείτε. (Συνδεόμαστε, παρεμπιπτόντως, απευθείαςstdoutиstderr- Η ροή αέρα θα παρεμποδίσει τα πάντα, θα τα τυλίξει όμορφα, θα τα αποσυντεθεί όπου χρειάζεται.)
Ας δούμε τι έχουμε commons/hooks.py. Το πρώτο μέρος του αρχείου, με το ίδιο το άγκιστρο:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientΔεν ξέρω καν τι να εξηγήσω εδώ, θα σημειώσω μόνο τα σημαντικά σημεία:
- Κληρονομούμε, σκεφτόμαστε τα επιχειρήματα - στις περισσότερες περιπτώσεις θα είναι ένα:
conn_id; - Υπερισχύουσες τυπικές μεθόδους: Περιορίστηκα
get_conn(), στο οποίο παίρνω τις παραμέτρους σύνδεσης ονομαστικά και απλώς παίρνω την ενότηταextra(αυτό είναι ένα πεδίο JSON), στο οποίο (σύμφωνα με τις δικές μου οδηγίες!) έβαλα το διακριτικό bot Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Δημιουργώ ένα παράδειγμα μας
TelegramBot, δίνοντάς του ένα συγκεκριμένο διακριτικό.
Αυτό είναι όλο. Μπορείτε να πάρετε έναν πελάτη από ένα γάντζο χρησιμοποιώντας TelegramBotHook().clent ή TelegramBotHook().get_conn().
Και το δεύτερο μέρος του αρχείου, στο οποίο φτιάχνω ένα microwrapper για το Telegram REST API, για να μην σύρω το ίδιο για μια μέθοδο sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ο σωστός τρόπος είναι να τα αθροίσεις όλα:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- στο πρόσθετο, τοποθετήστε το σε ένα δημόσιο αποθετήριο και δώστε το στο Open Source.
Ενώ μελετούσαμε όλα αυτά, οι ενημερώσεις των αναφορών μας κατάφεραν να αποτύχουν με επιτυχία και να μου στείλουν ένα μήνυμα σφάλματος στο κανάλι. Θα το τσεκάρω να δω αν είναι λάθος...

Κάτι έσπασε στο δόγμα μας! Αυτό δεν περιμέναμε; Ακριβώς!
Θα χύσεις;
Νιώθεις ότι έχασα κάτι; Φαίνεται ότι υποσχέθηκε να μεταφέρει δεδομένα από τον SQL Server στην Vertica, και μετά τα πήρε και απομακρύνθηκε από το θέμα, το σκάρτο!
Αυτή η θηριωδία ήταν σκόπιμη, απλά έπρεπε να αποκρυπτογραφήσω κάποια ορολογία για εσάς. Τώρα μπορείτε να προχωρήσετε παρακάτω.
Το σχέδιό μας ήταν το εξής:
- Κάνετε το χάλι
- Δημιουργήστε εργασίες
- Δείτε πόσο όμορφα είναι όλα
- Εκχωρήστε αριθμούς συνεδρίας σε γεμίσματα
- Λάβετε δεδομένα από τον SQL Server
- Τοποθετήστε δεδομένα στο Vertica
- Συλλέξτε στατιστικά στοιχεία
Έτσι, για να τα βάλω όλα σε λειτουργία, έκανα μια μικρή προσθήκη στο δικό μας docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyΕκεί σηκώνουμε:
- Vertica ως οικοδεσπότης
dwhμε τις πιο προεπιλεγμένες ρυθμίσεις, - τρεις περιπτώσεις SQL Server,
- γεμίζουμε τις βάσεις δεδομένων της τελευταίας με κάποια δεδομένα (σε καμία περίπτωση μην εξετάσετε
mssql_init.py!)
Ξεκινάμε όλα τα καλά με τη βοήθεια μιας ελαφρώς πιο περίπλοκης εντολής από την προηγούμενη φορά:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Αυτό που δημιούργησε ο θαυματουργός τυχαιοποιητής μας, μπορείτε να χρησιμοποιήσετε το αντικείμενο Data Profiling/Ad Hoc Query:

Το κύριο πράγμα είναι να μην το δείξουμε στους αναλυτές
επεξεργαστείτε συνεδρίες ETL Δεν θα το κάνω, όλα είναι ασήμαντα εκεί: φτιάχνουμε μια βάση, υπάρχει μια πινακίδα σε αυτήν, τυλίγουμε τα πάντα με έναν διαχειριστή περιβάλλοντος και τώρα κάνουμε αυτό:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passΉρθε η ώρα συλλέγουμε τα δεδομένα μας από τα μιάμιση τραπέζια μας. Ας το κάνουμε αυτό με τη βοήθεια πολύ ανεπιτήδευτων γραμμών:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Με τη βοήθεια ενός γάντζου παίρνουμε από το Airflow
pymssql-συνδέω-συωδεομαι - Ας αντικαταστήσουμε έναν περιορισμό με τη μορφή ημερομηνίας στο αίτημα - θα μεταφερθεί στη λειτουργία από τη μηχανή προτύπου.
- Τροφοδοτώντας το αίτημά μας
pandasποιος θα μας πάρειDataFrame- θα μας είναι χρήσιμο στο μέλλον.
Χρησιμοποιώ αντικατάσταση
{dt}αντί για παράμετρο αιτήματος%sόχι επειδή είμαι ένας κακός Πινόκιο, αλλά επειδήpandasδεν αντέχειpymssqlκαι γλιστράει το τελευταίοparams: Listαν και θέλει πολύtuple.
Σημειώστε επίσης ότι ο προγραμματιστήςpymssqlαποφάσισε να μην τον στηρίξει πια, και ήρθε η ώρα να φύγειpyodbc.
Ας δούμε με τι γέμισε το Airflow τα ορίσματα των συναρτήσεών μας:

Αν δεν υπάρχουν δεδομένα, τότε δεν έχει νόημα να συνεχίσουμε. Είναι όμως και περίεργο να θεωρούμε επιτυχημένο το γέμισμα. Αυτό όμως δεν είναι λάθος. Α-αχ-αχ, τι να κάνω;! Και να τι:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException θα πει στο Airflow ότι δεν υπάρχουν σφάλματα, αλλά παρακάμπτουμε την εργασία. Η διεπαφή δεν θα έχει πράσινο ή κόκκινο τετράγωνο, αλλά ροζ.
Ας πετάξουμε τα δεδομένα μας πολλαπλές στήλες:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])δηλαδή
- Η βάση δεδομένων από την οποία πήραμε τις παραγγελίες,
- Το αναγνωριστικό της περιόδου πλημμύρας (θα είναι διαφορετικό για κάθε εργασία),
- Ένας κατακερματισμός από την πηγή και το αναγνωριστικό παραγγελίας - έτσι ώστε στην τελική βάση δεδομένων (όπου όλα χύνονται σε έναν πίνακα) έχουμε ένα μοναδικό αναγνωριστικό παραγγελίας.
Το προτελευταίο βήμα παραμένει: ρίξτε τα πάντα στη Vertica. Και, παραδόξως, ένας από τους πιο θεαματικούς και αποτελεσματικούς τρόπους για να γίνει αυτό είναι μέσω του CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Φτιάχνουμε έναν ειδικό δέκτη
StringIO. pandasθα βάλει ευγενικά μαςDataFrameμε τη μορφήCSV- γραμμές.- Ας ανοίξουμε μια σύνδεση με το αγαπημένο μας Vertica με ένα γάντζο.
- Και τώρα με τη βοήθεια
copy()στείλτε τα δεδομένα μας απευθείας στη Vertika!
Θα πάρουμε από το πρόγραμμα οδήγησης πόσες γραμμές γεμίστηκαν και θα πούμε στον διαχειριστή συνεδρίας ότι όλα είναι εντάξει:
session.loaded_rows = cursor.rowcount
session.successful = TrueΑυτό είναι όλο.
Στην πώληση, δημιουργούμε την πλάκα-στόχο χειροκίνητα. Εδώ επέτρεψα στον εαυτό μου ένα μικρό μηχάνημα:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)χρησιμοποιώ
VerticaOperator()Δημιουργώ ένα σχήμα βάσης δεδομένων και έναν πίνακα (αν δεν υπάρχουν ήδη, φυσικά). Το κύριο πράγμα είναι να τακτοποιήσετε σωστά τις εξαρτήσεις:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadΑνακεφαλαίωση
- Λοιπόν, - είπε το ποντικάκι, - δεν είναι, τώρα
Είσαι πεπεισμένος ότι είμαι το πιο τρομερό ζώο στο δάσος;
Τζούλια Ντόναλντσον, Ο Γκρούφαλο
Νομίζω ότι αν οι συνάδελφοί μου και εγώ είχαμε έναν ανταγωνισμό: ποιος θα δημιουργήσει και θα ξεκινήσει γρήγορα μια διαδικασία ETL από την αρχή: αυτοί με το SSIS και ένα ποντίκι και εγώ με το Airflow... Και τότε θα συγκρίναμε επίσης την ευκολία συντήρησης... Πω πω, θα συμφωνήσετε ότι θα τους νικήσω σε όλα τα μέτωπα!
Αν είναι λίγο πιο σοβαρά, τότε το Apache Airflow -περιγράφοντας διαδικασίες με τη μορφή κώδικα προγράμματος- έκανε τη δουλειά μου πολύ πιο άνετα και ευχάριστα.
Η απεριόριστη επεκτασιμότητα του, τόσο ως προς τα plug-ins όσο και ως προς την προδιάθεση για επεκτασιμότητα, σας δίνει την ευκαιρία να χρησιμοποιήσετε το Airflow σχεδόν σε οποιοδήποτε τομέα: ακόμη και σε ολόκληρο τον κύκλο συλλογής, προετοιμασίας και επεξεργασίας δεδομένων, ακόμη και στην εκτόξευση πυραύλων (προς Άρη, σειρά μαθημάτων).
Τελικό μέρος, αναφορά και πληροφορίες
Η γκανιότα που έχουμε συγκεντρώσει για εσάς
start_date. Ναι, αυτό είναι ήδη ένα τοπικό μιμίδιο. Μέσω του κύριου επιχειρήματος του Νταγκstart_dateόλα περνούν. Εν συντομία, αν προσδιορίσετε σεstart_dateτρέχουσα ημερομηνία καιschedule_interval- μια μέρα, τότε η DAG θα ξεκινήσει αύριο όχι νωρίτερα.start_date = datetime(2020, 7, 7, 0, 1, 2)Και όχι άλλα προβλήματα.
Υπάρχει ένα άλλο σφάλμα χρόνου εκτέλεσης που σχετίζεται με αυτό:
Task is missing the start_date parameter, που τις περισσότερες φορές υποδεικνύει ότι ξεχάσατε να δεσμευτείτε με τον χειριστή dag.- Όλα σε ένα μηχάνημα. Ναι, και βάσεις (το ίδιο το Airflow και η επίστρωσή μας), και διακομιστής ιστού, προγραμματιστής και εργαζόμενοι. Και μάλιστα λειτούργησε. Αλλά με την πάροδο του χρόνου, ο αριθμός των εργασιών για τις υπηρεσίες αυξήθηκε και όταν η PostgreSQL άρχισε να ανταποκρίνεται στο ευρετήριο σε 20 δευτερόλεπτα αντί για 5 ms, το πήραμε και το μεταφέραμε.
- LocalExecutor. Ναι, ακόμα καθόμαστε πάνω του, και έχουμε ήδη φτάσει στην άκρη της αβύσσου. Το LocalExecutor μας ήταν αρκετό μέχρι στιγμής, αλλά τώρα ήρθε η ώρα να επεκταθούμε με τουλάχιστον έναν εργαζόμενο και θα πρέπει να εργαστούμε σκληρά για να μετακομίσουμε στο CeleryExecutor. Και λόγω του γεγονότος ότι μπορείτε να εργαστείτε με αυτό σε ένα μηχάνημα, τίποτα δεν σας εμποδίζει να χρησιμοποιήσετε το Celery ακόμη και σε διακομιστή, ο οποίος "φυσικά, δεν θα βγει ποτέ στην παραγωγή, ειλικρινά!"
- Μη χρήση ενσωματωμένα εργαλεία:
- Διασυνδέσεις για την αποθήκευση των διαπιστευτηρίων υπηρεσίας,
- Δεσποινίς SLA να ανταποκρίνεται σε εργασίες που δεν λειτούργησαν εγκαίρως,
- xcom για ανταλλαγή μεταδεδομένων (είπα μεταδεδομένα!) μεταξύ των εργασιών dag.
- Κατάχρηση αλληλογραφίας. Λοιπόν, τι να πω; Δημιουργήθηκαν ειδοποιήσεις για όλες τις επαναλήψεις πεσμένων εργασιών. Τώρα το Gmail της δουλειάς μου έχει >90 μηνύματα ηλεκτρονικού ταχυδρομείου από το Airflow και το ρύγχος αλληλογραφίας ιστού αρνείται να παραλάβει και να διαγράψει περισσότερα από 100 ταυτόχρονα.
Περισσότερες παγίδες:
Περισσότερα εργαλεία αυτοματισμού
Για να δουλεύουμε ακόμα περισσότερο με το κεφάλι και όχι με τα χέρια μας, το Airflow ετοίμασε για εμάς αυτό:
- - εξακολουθεί να έχει την ιδιότητα του Πειραματικού, κάτι που δεν τον εμποδίζει να εργαστεί. Με αυτό, μπορείτε όχι μόνο να λάβετε πληροφορίες σχετικά με τα dags και τις εργασίες, αλλά και να σταματήσετε/ξεκινήσετε ένα dag, να δημιουργήσετε ένα DAG Run ή ένα pool.
- - πολλά εργαλεία είναι διαθέσιμα μέσω της γραμμής εντολών που δεν είναι απλώς άβολα στη χρήση μέσω του WebUI, αλλά γενικά απουσιάζουν. Για παράδειγμα:
backfillαπαιτείται για επανεκκίνηση των παρουσιών εργασιών.
Για παράδειγμα, ήρθαν αναλυτές και είπαν: «Κι εσύ σύντροφε, έχεις βλακείες στα στοιχεία από 1 έως 13 Ιανουαρίου! Διορθώστε το, φτιάξτε το, φτιάξτε το, φτιάξτε το!». Και είσαι μια τέτοια εστία:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Βασική υπηρεσία:
initdb,resetdb,upgradedb,checkdb. run, το οποίο σας επιτρέπει να εκτελέσετε μια εργασία παρουσίας και ακόμη και να σκοράρετε σε όλες τις εξαρτήσεις. Επιπλέον, μπορείτε να το εκτελέσετε μέσωLocalExecutor, ακόμα κι αν έχετε ένα σύμπλεγμα σέλινου.- Κάνει σχεδόν το ίδιο πράγμα
test, μόνο και στις βάσεις δεν γράφει τίποτα. connectionsεπιτρέπει τη μαζική δημιουργία συνδέσεων από το κέλυφος.
- - ένας μάλλον σκληροπυρηνικός τρόπος αλληλεπίδρασης, ο οποίος προορίζεται για πρόσθετα, και όχι για σωρεία με μικρά χέρια. Αλλά ποιος θα μας εμποδίσει να πάμε
/home/airflow/dags, τρέξιμοipythonκαι να αρχίσουμε να τα μπερδεύουμε; Μπορείτε, για παράδειγμα, να εξάγετε όλες τις συνδέσεις με τον ακόλουθο κώδικα:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Σύνδεση στη βάση μεταδεδομένων Airflow. Δεν συνιστώ να γράψετε σε αυτό, αλλά η λήψη καταστάσεων εργασιών για διάφορες συγκεκριμένες μετρήσεις μπορεί να είναι πολύ πιο γρήγορη και ευκολότερη από τη χρήση οποιουδήποτε από τα API.
Ας πούμε ότι δεν είναι όλες οι εργασίες μας ανίκανες, αλλά μερικές φορές μπορεί να πέσουν και αυτό είναι φυσιολογικό. Αλλά μερικά μπλοκαρίσματα είναι ήδη ύποπτα και θα ήταν απαραίτητο να ελέγξετε.
Προσοχή SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
παραπομπές
Και φυσικά, οι δέκα πρώτοι σύνδεσμοι από την έκδοση της Google είναι τα περιεχόμενα του φακέλου Airflow από τους σελιδοδείκτες μου.
- - Φυσικά, πρέπει να ξεκινήσουμε από το γραφείο. τεκμηρίωση, αλλά ποιος διαβάζει τις οδηγίες;
- - Λοιπόν, τουλάχιστον διαβάστε τις συστάσεις από τους δημιουργούς.
- - η αρχή: η διεπαφή χρήστη στις εικόνες
- - οι βασικές έννοιες περιγράφονται καλά, αν (ξαφνικά!) δεν καταλάβατε κάτι από εμένα.
- - ένας σύντομος οδηγός για τη ρύθμιση ενός συμπλέγματος ροής αέρα.
- - σχεδόν το ίδιο ενδιαφέρον άρθρο, εκτός ίσως από περισσότερο φορμαλισμό και λιγότερα παραδείγματα.
- — σχετικά με τη συνεργασία με το Celery.
- - σχετικά με την αδυναμία των εργασιών, τη φόρτωση με αναγνωριστικό αντί για την ημερομηνία, τη μετατροπή, τη δομή του αρχείου και άλλα ενδιαφέροντα πράγματα.
- - εξαρτήσεις εργασιών και κανόνας ενεργοποίησης, που ανέφερα μόνο εν παρόδω.
- - πώς να ξεπεράσετε ορισμένες "εργασίες όπως προβλέπεται" στον προγραμματιστή, να φορτώσετε χαμένα δεδομένα και να ιεραρχήσετε εργασίες.
- — χρήσιμα ερωτήματα SQL στα μεταδεδομένα Airflow.
- - υπάρχει μια χρήσιμη ενότητα σχετικά με τη δημιουργία ενός προσαρμοσμένου αισθητήρα.
- — μια ενδιαφέρουσα σύντομη σημείωση σχετικά με τη δημιουργία μιας υποδομής στο AWS για την Επιστήμη των Δεδομένων.
- - κοινά λάθη (όταν κάποιος εξακολουθεί να μην διαβάζει τις οδηγίες).
- - Χαμογελάστε με τον τρόπο που οι άνθρωποι αποθήκευσαν τους κωδικούς πρόσβασης, αν και μπορείτε απλώς να χρησιμοποιήσετε τις Συνδέσεις.
- - σιωπηρή προώθηση DAG, εισαγωγή συναρτήσεων, και πάλι για εξαρτήσεις, καθώς και για παράβλεψη εκκινήσεων εργασιών.
- - σχετικά με τη χρήση
default argumentsиparamsσε πρότυπα, καθώς και σε μεταβλητές και συνδέσεις. - - μια ιστορία για το πώς ο σχεδιαστής προετοιμάζεται για το Airflow 2.0.
- - ένα ελαφρώς ξεπερασμένο άρθρο σχετικά με την ανάπτυξη του συμπλέγματός μας
docker-compose. - - δυναμικές εργασίες με χρήση προτύπων και προώθησης περιβάλλοντος.
- — τυπικές και προσαρμοσμένες ειδοποιήσεις μέσω ταχυδρομείου και Slack.
- - Εργασίες διακλάδωσης, μακροεντολές και XCom.
Και οι σύνδεσμοι που χρησιμοποιούνται στο άρθρο:
- - σύμβολα κράτησης θέσης διαθέσιμα για χρήση σε πρότυπα.
- — Συνήθη λάθη κατά τη δημιουργία σκαλισμάτων.
- -
docker-composeγια πειραματισμό, εντοπισμό σφαλμάτων και άλλα. - — Περιτύλιγμα Python για το Telegram REST API.
Πηγή: www.habr.com




