Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Γεια σου Χαμπρ! Σε αυτό το άρθρο, θέλω να μιλήσω για ένα εξαιρετικό εργαλείο για την ανάπτυξη διαδικασιών επεξεργασίας δεδομένων παρτίδας, για παράδειγμα, στην υποδομή ενός εταιρικού DWH ή του DataLake σας. Θα μιλήσουμε για το Apache Airflow (στο εξής θα αναφέρεται ως Airflow). Του στερείται άδικα η προσοχή στο Habré και στο κύριο μέρος θα προσπαθήσω να σας πείσω ότι τουλάχιστον αξίζει να κοιτάξετε το Airflow όταν επιλέγετε έναν προγραμματιστή για τις διεργασίες σας ETL / ELT.

Προηγουμένως, είχα γράψει μια σειρά άρθρων με θέμα το DWH όταν εργαζόμουν στην Tinkoff Bank. Τώρα έχω γίνει μέλος της ομάδας Mail.Ru Group και αναπτύσσω μια πλατφόρμα για ανάλυση δεδομένων στον τομέα των τυχερών παιχνιδιών. Στην πραγματικότητα, καθώς εμφανίζονται νέα και ενδιαφέρουσες λύσεις, η ομάδα και εγώ θα μιλήσουμε εδώ για την πλατφόρμα μας για ανάλυση δεδομένων.

Πρόλογος

Λοιπόν, ας ξεκινήσουμε. Τι είναι η ροή αέρα; Αυτή είναι μια βιβλιοθήκη (ή σύνολο βιβλιοθηκών) να αναπτύσσει, να σχεδιάζει και να παρακολουθεί τις διαδικασίες εργασίας. Το κύριο χαρακτηριστικό του Airflow: Ο κώδικας Python χρησιμοποιείται για την περιγραφή (ανάπτυξη) διαδικασιών. Αυτό έχει πολλά πλεονεκτήματα για την οργάνωση του έργου και της ανάπτυξής σας: στην ουσία, το (για παράδειγμα) έργο σας ETL είναι απλώς ένα έργο Python και μπορείτε να το οργανώσετε όπως θέλετε, λαμβάνοντας υπόψη τις ιδιαιτερότητες της υποδομής, το μέγεθος της ομάδας και Λοιπές απαιτήσεις. Ενορχηστρικά όλα είναι απλά. Χρησιμοποιήστε για παράδειγμα PyCharm + Git. Είναι υπέροχο και πολύ βολικό!

Τώρα ας δούμε τις κύριες οντότητες του Airflow. Έχοντας κατανοήσει την ουσία και τον σκοπό τους, θα οργανώσετε βέλτιστα την αρχιτεκτονική της διαδικασίας. Ίσως η κύρια οντότητα είναι το Directed Acyclic Graph (στο εξής DAG).

DAG

Το DAG είναι κάποια σημασιολογική συσχέτιση των εργασιών σας που θέλετε να ολοκληρώσετε σε μια αυστηρά καθορισμένη σειρά σε ένα συγκεκριμένο χρονοδιάγραμμα. Το Airflow παρέχει μια βολική διεπαφή ιστού για εργασία με DAG και άλλες οντότητες:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Το DAG μπορεί να μοιάζει με αυτό:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Ο προγραμματιστής, όταν σχεδιάζει ένα DAG, καθορίζει ένα σύνολο χειριστών στους οποίους θα δημιουργηθούν εργασίες εντός του DAG. Εδώ ερχόμαστε σε μια άλλη σημαντική οντότητα: Διαχειριστής ροής αέρα.

Χειριστές

Ο χειριστής είναι μια οντότητα βάσει της οποίας δημιουργούνται περιπτώσεις εργασίας, η οποία περιγράφει τι θα συμβεί κατά την εκτέλεση μιας παρουσίας εργασίας. Η ροή αέρα απελευθερώνεται από το GitHub περιέχει ήδη ένα σύνολο εντολών έτοιμες για χρήση. Παραδείγματα:

  • BashOperator - τελεστής για την εκτέλεση εντολής bash.
  • PythonOperator - τελεστής για την κλήση κώδικα Python.
  • EmailOperator — τελεστής για την αποστολή email.
  • HTTPOoperator - ένας τελεστής για εργασία με αιτήματα http.
  • SqlOperator - τελεστής για την εκτέλεση κώδικα SQL.
  • Ο αισθητήρας είναι ένας τελεστής αναμονής για ένα συμβάν (την άφιξη της επιθυμητής ώρας, την εμφάνιση του απαιτούμενου αρχείου, μια σειρά στη βάση δεδομένων, μια απάντηση από το API, κ.λπ., κ.λπ.).

Υπάρχουν πιο συγκεκριμένοι χειριστές: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Μπορείτε επίσης να αναπτύξετε χειριστές που να ταιριάζουν στις ανάγκες σας και να τους χρησιμοποιήσετε στο έργο σας. Για παράδειγμα, δημιουργήσαμε το MongoDBToHiveViaHdfsTransfer, έναν τελεστή για την εξαγωγή εγγράφων από το MongoDB στο Hive, και αρκετούς τελεστές για εργασία με Κάντε κλικ στο σπίτι: CHLoadFromHiveOperator και CHTableLoaderOperator. Στην πραγματικότητα, μόλις ένα έργο χρησιμοποιεί συχνά κώδικα που βασίζεται σε βασικές δηλώσεις, μπορείτε να σκεφτείτε να τον μεταγλωττίσετε σε μια νέα δήλωση. Αυτό θα απλοποιήσει την περαιτέρω ανάπτυξη και θα προσθέσετε στη βιβλιοθήκη των χειριστών σας στο έργο.

Στη συνέχεια, όλες αυτές οι περιπτώσεις εργασιών πρέπει να εκτελεστούν και τώρα θα μιλήσουμε για τον προγραμματιστή.

Προγραμματιστής

Ο προγραμματιστής εργασιών της ροής αέρα είναι χτισμένος Σέλινο. Το Celery είναι μια βιβλιοθήκη Python που σας επιτρέπει να οργανώσετε μια ουρά καθώς και ασύγχρονη και κατανεμημένη εκτέλεση εργασιών. Από την πλευρά της ροής αέρα, όλες οι εργασίες χωρίζονται σε πισίνες. Οι πισίνες δημιουργούνται χειροκίνητα. Κατά κανόνα, σκοπός τους είναι να περιορίσουν το φόρτο εργασίας με την πηγή ή να πληκτρολογήσουν εργασίες μέσα στο DWH. Η διαχείριση των ομάδων μπορεί να γίνει μέσω της διεπαφής ιστού:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Κάθε πισίνα έχει ένα όριο στον αριθμό των κουλοχέρηδων. Κατά τη δημιουργία ενός DAG, του δίνεται ένα pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Μια ομάδα που ορίζεται σε επίπεδο DAG μπορεί να παρακαμφθεί σε επίπεδο εργασίας.
Μια ξεχωριστή διαδικασία, το Scheduler, είναι υπεύθυνη για τον προγραμματισμό όλων των εργασιών στο Airflow. Στην πραγματικότητα, ο Scheduler ασχολείται με όλους τους μηχανισμούς του καθορισμού εργασιών για εκτέλεση. Η εργασία περνάει από διάφορα στάδια πριν εκτελεστεί:

  1. Οι προηγούμενες εργασίες έχουν ολοκληρωθεί στο DAG· μια νέα μπορεί να μπει στην ουρά.
  2. Η ουρά ταξινομείται ανάλογα με την προτεραιότητα των εργασιών (οι προτεραιότητες μπορούν επίσης να ελεγχθούν) και εάν υπάρχει δωρεάν υποδοχή στην πισίνα, η εργασία μπορεί να τεθεί σε λειτουργία.
  3. Εάν υπάρχει δωρεάν εργάτης, η εργασία αποστέλλεται σε αυτήν. η εργασία που προγραμματίσατε στο πρόβλημα ξεκινά, χρησιμοποιώντας έναν ή άλλο τελεστή.

Αρκετά απλό.

Το Scheduler εκτελείται στο σύνολο όλων των DAG και όλων των εργασιών εντός των DAG.

Για να ξεκινήσει ο Προγραμματιστής να εργάζεται με την DAG, η DAG πρέπει να ορίσει ένα χρονοδιάγραμμα:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Υπάρχει ένα σύνολο έτοιμων προεπιλογών: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Μπορείτε επίσης να χρησιμοποιήσετε εκφράσεις cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ημερομηνία εκτέλεσης

Για να κατανοήσετε πώς λειτουργεί η ροή αέρα, είναι σημαντικό να κατανοήσετε τι είναι η ημερομηνία εκτέλεσης για μια DAG. Το Airflow DAG έχει την ιδιότητα Ημερομηνία εκτέλεσης, δηλαδή, ανάλογα με το πρόγραμμα εργασίας του DAG, δημιουργούνται περιπτώσεις εργασιών για κάθε Ημερομηνία Εκτέλεσης. Και για κάθε Ημερομηνία Εκτέλεσης, οι εργασίες μπορούν να εκτελούνται εκ νέου - ή, για παράδειγμα, ένα DAG μπορεί να λειτουργεί ταυτόχρονα σε πολλές Ημερομηνίες Εκτέλεσης. Αυτό φαίνεται ξεκάθαρα εδώ:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Δυστυχώς (ή ίσως ευτυχώς: εξαρτάται από την κατάσταση), εάν η υλοποίηση της εργασίας στο DAG διορθωθεί, τότε η εκτέλεση στην προηγούμενη Ημερομηνία Εκτέλεσης θα προχωρήσει λαμβάνοντας υπόψη τις προσαρμογές. Αυτό είναι καλό εάν πρέπει να υπολογίσετε εκ νέου δεδομένα σε προηγούμενες περιόδους χρησιμοποιώντας έναν νέο αλγόριθμο, αλλά είναι κακό επειδή χάνεται η αναπαραγωγιμότητα του αποτελέσματος (φυσικά, κανείς δεν σας ενοχλεί να επιστρέψετε την απαιτούμενη έκδοση του πηγαίου κώδικα από το Git και να υπολογίσετε τι χρειάζεσαι μία φορά, όπως τη χρειάζεσαι).

Δημιουργία εργασιών

Η υλοποίηση του DAG είναι κώδικας Python, επομένως έχουμε έναν πολύ βολικό τρόπο να μειώσουμε την ποσότητα του κώδικα όταν εργαζόμαστε, για παράδειγμα, με κοινόχρηστες πηγές. Ας υποθέσουμε ότι έχετε τρία θραύσματα MySQL ως πηγή, πρέπει να αναρριχηθείτε σε κάθε ένα και να συλλέξετε κάποια δεδομένα. Επιπλέον, ανεξάρτητα και παράλληλα. Ο κώδικας Python στο DAG μπορεί να μοιάζει με αυτό:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Το DAG μοιάζει με αυτό:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Ταυτόχρονα, μπορείτε να προσθέσετε ή να αφαιρέσετε ένα θραύσμα προσαρμόζοντας απλώς τη ρύθμιση και ενημερώνοντας το DAG. Ανετος!

Μπορείτε επίσης να χρησιμοποιήσετε πιο περίπλοκη δημιουργία κώδικα, για παράδειγμα, να εργαστείτε με πηγές με τη μορφή βάσης δεδομένων ή να περιγράψετε μια δομή πίνακα, έναν αλγόριθμο για την εργασία με έναν πίνακα και, λαμβάνοντας υπόψη τα χαρακτηριστικά της υποδομής DWH, να δημιουργήσετε τη διαδικασία της φόρτωσης N πινάκων στην αποθήκη σας. Ή, για παράδειγμα, όταν εργάζεστε με ένα API που δεν υποστηρίζει εργασία με μια παράμετρο με τη μορφή λίστας, μπορείτε να δημιουργήσετε N εργασίες σε ένα DAG χρησιμοποιώντας αυτήν τη λίστα, να περιορίσετε τον παραλληλισμό των αιτημάτων στο API σε μια ομάδα και να εξαγάγετε τα απαραίτητα δεδομένα από το API. Εύκαμπτος!

αποθήκη

Το Airflow έχει το δικό του αποθετήριο υποστήριξης, μια βάση δεδομένων (ίσως MySQL ή Postgres, έχουμε Postgres), που αποθηκεύει τις καταστάσεις εργασιών, DAG, ρυθμίσεις σύνδεσης, καθολικές μεταβλητές κ.λπ., κ.λπ. Εδώ θα ήθελα να πω ότι το αποθετήριο στο Airflow είναι πολύ απλό (περίπου 20 πίνακες) και βολικό αν θέλετε να δημιουργήσετε οποιαδήποτε από τις διαδικασίες σας σε αυτό. Θυμάμαι 100500 πίνακες στο αποθετήριο Informatica, οι οποίοι έπρεπε να καπνιστούν για μεγάλο χρονικό διάστημα πριν καταλάβουμε πώς να δημιουργήσουμε ένα ερώτημα.

Παρακολούθηση

Δεδομένης της απλότητας του αποθετηρίου, μπορείτε να δημιουργήσετε μια διαδικασία παρακολούθησης εργασιών που είναι βολική για εσάς. Χρησιμοποιούμε ένα σημειωματάριο στο Zeppelin, όπου εξετάζουμε την κατάσταση των εργασιών:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Μπορεί επίσης να είναι η διεπαφή ιστού του ίδιου του Airflow:

Η ροή αέρα είναι ένα εργαλείο για εύκολη και γρήγορη ανάπτυξη και διατήρηση διαδικασιών επεξεργασίας δεδομένων παρτίδας

Ο κωδικός ροής αέρα είναι ανοιχτός, επομένως προσθέσαμε μια ειδοποίηση στο Telegram. Κάθε παρουσία εργασίας που εκτελείται, εάν παρουσιαστεί σφάλμα, στέλνει ανεπιθύμητα μηνύματα στην ομάδα Telegram, όπου αποτελείται ολόκληρη η ομάδα ανάπτυξης και υποστήριξης.

Λαμβάνουμε μια άμεση απάντηση μέσω του Telegram (εάν απαιτείται), μέσω του Zeppelin - μια συνολική εικόνα των εργασιών στο Airflow.

Σε συνολικά

Η ροή αέρα είναι πρώτα και κύρια ανοιχτού κώδικα και δεν πρέπει να περιμένετε θαύματα από αυτήν. Να είστε έτοιμοι να αφιερώσετε χρόνο και προσπάθεια για να δημιουργήσετε μια λειτουργική λύση. Ο στόχος είναι εφικτός, πιστέψτε με, αξίζει τον κόπο. Ταχύτητα ανάπτυξης, ευελιξία, ευκολία προσθήκης νέων διαδικασιών - θα το λατρέψετε. Φυσικά, πρέπει να δώσετε μεγάλη προσοχή στην οργάνωση του έργου, στη σταθερότητα του έργου του ίδιου του Airflow: δεν υπάρχουν θαύματα.

Τώρα έχουμε Airflow που λειτουργεί καθημερινά περίπου 6,5 χιλιάδες εργασίες. Είναι αρκετά διαφορετικοί ως προς τον χαρακτήρα. Υπάρχουν εργασίες για τη φόρτωση δεδομένων στο κύριο DWH από πολλές διαφορετικές και πολύ συγκεκριμένες πηγές, υπάρχουν εργασίες για τον υπολογισμό των βιτρινών μέσα στο κύριο DWH, υπάρχουν εργασίες για τη δημοσίευση δεδομένων σε ένα γρήγορο DWH, υπάρχουν πολλές, πολλές διαφορετικές εργασίες - και Airflow τα μασάει όλη μέρα με τη μέρα. Μιλώντας με αριθμούς, αυτό είναι 2,3 χιλιάδες Εργασίες ELT ποικίλης πολυπλοκότητας εντός DWH (Hadoop), περ. 2,5 εκατοντάδες βάσεις δεδομένων πηγές, αυτή είναι μια ομάδα από 4 προγραμματιστές ETL, τα οποία χωρίζονται σε επεξεργασία δεδομένων ETL στο DWH και ELT επεξεργασία δεδομένων εντός DWH και φυσικά σε άλλα ένας διαχειριστής, που ασχολείται με τις υποδομές της υπηρεσίας.

Σχέδια για το μέλλον

Ο αριθμός των διαδικασιών αναπόφευκτα αυξάνεται και το κύριο πράγμα που θα κάνουμε όσον αφορά την υποδομή Airflow είναι η κλιμάκωση. Θέλουμε να δημιουργήσουμε ένα σύμπλεγμα Airflow, να διαθέσουμε ένα ζευγάρι πόδια για τους εργάτες της Celery και να δημιουργήσουμε μια αυτοδιπλασιαζόμενη κεφαλή με διαδικασίες προγραμματισμού εργασιών και ένα αποθετήριο.

Επίλογος

Αυτό, φυσικά, απέχει πολύ από όλα όσα θα ήθελα να μιλήσω για το Airflow, αλλά προσπάθησα να επισημάνω τα κύρια σημεία. Η όρεξη έρχεται με το φαγητό, δοκιμάστε το και θα σας αρέσει 🙂

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο