Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Witaj, Habro! W tym artykule chcę porozmawiać o jednym świetnym narzędziu do rozwijania procesów wsadowego przetwarzania danych, na przykład w infrastrukturze korporacyjnego DWH lub Twojego DataLake. Porozmawiamy o Apache Airflow (zwanym dalej Airflow). Jest to niesprawiedliwie pozbawione uwagi na temat Habré, a w głównej części postaram się Was przekonać, że przy wyborze harmonogramu dla swoich procesów ETL/ELT warto zwrócić uwagę przynajmniej na Airflow.

Wcześniej, pracując w Tinkoff Bank, napisałem serię artykułów na temat DWH. Teraz stałem się częścią zespołu Mail.Ru Group i rozwijam platformę do analizy danych w obszarze gier. Właściwie, gdy pojawią się nowości i ciekawe rozwiązania, wraz z moim zespołem będziemy tutaj rozmawiać o naszej platformie do analityki danych.

Prolog

Zacznijmy więc. Co to jest przepływ powietrza? To jest biblioteka (lub zestaw bibliotek) opracowywać, planować i monitorować procesy pracy. Główną cechą Airflow: Kod Pythona służy do opisu (rozwoju) procesów. Ma to wiele zalet w zakresie organizacji projektu i rozwoju: w istocie Twój (na przykład) projekt ETL jest po prostu projektem w języku Python i możesz go zorganizować według własnego uznania, biorąc pod uwagę specyfikę infrastruktury, wielkość zespołu i inne wymagania. Instrumentalnie wszystko jest proste. Użyj na przykład PyCharm + Git. To wspaniałe i bardzo wygodne!

Przyjrzyjmy się teraz głównym elementom Airflow. Rozumiejąc ich istotę i cel, możesz optymalnie zorganizować architekturę procesów. Być może główną jednostką jest skierowany graf acykliczny (zwany dalej DAG).

DZIEŃ

DAG to jakieś sensowne powiązanie Twoich zadań, które chcesz wykonać w ściśle określonej kolejności, według określonego harmonogramu. Airflow zapewnia wygodny interfejs sieciowy do pracy z DAG-ami i innymi podmiotami:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

DAG może wyglądać następująco:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Deweloper projektując DAG ustala zestaw operatorów, na których będą budowane zadania w ramach DAG. Tutaj dochodzimy do kolejnego ważnego podmiotu: Airflow Operator.

Operatorzy

Operator to jednostka, na podstawie której tworzone są instancje zadania, która opisuje, co będzie się działo podczas realizacji instancji pracy. Wydania Airflow z GitHub zawierają już zestaw operatorów gotowych do użycia. Przykłady:

  • BashOperator - operator wykonujący polecenie bash.
  • PythonOperator - operator wywołujący kod Pythona.
  • EmailOperator — operator wysyłania wiadomości e-mail.
  • HTTPOperator - operator do pracy z żądaniami http.
  • SqlOperator - operator wykonujący kod SQL.
  • Sensor to operator oczekiwania na zdarzenie (nadejście wymaganego czasu, pojawienie się wymaganego pliku, linia w bazie danych, odpowiedź z API itp., itp.).

Istnieją bardziej szczegółowe operatory: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Możesz także opracować operatory w oparciu o własne cechy i wykorzystać je w swoim projekcie. Na przykład stworzyliśmy MongoDBToHiveViaHdfsTransfer, operator do eksportowania dokumentów z MongoDB do Hive oraz kilka operatorów do pracy z Kliknij Dom: CHLoadFromHiveOperator i CHTableLoaderOperator. Zasadniczo, gdy tylko w projekcie często używany jest kod zbudowany na podstawowych instrukcjach, można pomyśleć o wbudowaniu go w nową instrukcję. Uprości to dalszy rozwój i poszerzy bibliotekę operatorów w projekcie.

Następnie wszystkie te wystąpienia zadań muszą zostać wykonane, a teraz porozmawiamy o harmonogramie.

Planista

Wbudowany jest harmonogram zadań Airflow Seler. Celery to biblioteka Pythona, która pozwala organizować kolejkę oraz asynchroniczne i rozproszone wykonywanie zadań. Po stronie Airflow wszystkie zadania są podzielone na pule. Pule są tworzone ręcznie. Zazwyczaj mają na celu ograniczenie pracochłonności pracy ze źródłem lub typizację zadań w ramach DWH. Pulami można zarządzać poprzez interfejs WWW:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Każda pula ma ograniczoną liczbę miejsc. Tworząc DAG, otrzymuje się pulę:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Pulę zdefiniowaną na poziomie DAG można zastąpić na poziomie zadania.
Za planowanie wszystkich zadań w Airflow odpowiada odrębny proces Harmonogram. Właściwie Harmonogram zajmuje się całą mechaniką ustawiania zadań do wykonania. Zadanie przed wykonaniem przechodzi przez kilka etapów:

  1. Poprzednie zadania zostały zrealizowane w DAG, można kolejkować nowe.
  2. Kolejka jest sortowana w zależności od priorytetu zadań (priorytety można również kontrolować), a jeśli w puli jest wolne miejsce, zadanie można przystąpić do realizacji.
  3. Jeśli jest wolny seler robotniczy, zadanie zostaje do niego wysłane; rozpoczyna się praca zaprogramowana w zadaniu, przy użyciu tego lub innego operatora.

Wystarczająco proste.

Harmonogram działa na zestawie wszystkich DAG i wszystkich zadań w obrębie DAG.

Aby Harmonogram mógł rozpocząć współpracę z DAG, DAG musi ustawić harmonogram:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Dostępny jest zestaw gotowych presetów: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Możesz także użyć wyrażeń cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data wykonania

Aby zrozumieć, jak działa Airflow, ważne jest, aby zrozumieć, jaka jest data wykonania dla DAG. W Airflow DAG posiada wymiar Execution Date, czyli w zależności od harmonogramu pracy DAG, dla każdej Daty Realizacji tworzone są instancje zadań. A dla każdej Daty Wykonania zadania mogą zostać wykonane ponownie - lub np. DAG może pracować jednocześnie w kilku Datach Wykonania. Tutaj jest to wyraźnie pokazane:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Niestety (a może na szczęście: to zależy od sytuacji), jeśli realizacja zadania w DAG zostanie poprawiona, wówczas realizacja w poprzednim Dacie Realizacji będzie kontynuowana z uwzględnieniem korekt. Jest to dobre, jeśli trzeba przeliczyć dane z poprzednich okresów przy użyciu nowego algorytmu, ale jest złe, ponieważ traci się powtarzalność wyniku (oczywiście nikt nie przeszkadza, aby zwrócić wymaganą wersję kodu źródłowego z Git i obliczyć, co potrzebujesz raz, tak jak tego potrzebujesz).

Generowanie zadań

Implementacją DAG jest kod w Pythonie, dzięki czemu mamy bardzo wygodny sposób na zmniejszenie ilości kodu podczas pracy np. ze źródłami shardowanymi. Załóżmy, że masz trzy fragmenty MySQL jako źródło, musisz wejść do każdego z nich i pobrać trochę danych. Co więcej, niezależnie i równolegle. Kod Pythona w DAG może wyglądać następująco:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG wygląda następująco:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

W takim przypadku możesz dodać lub usunąć fragment, po prostu dostosowując ustawienia i aktualizując DAG. Wygodny!

Można także zastosować bardziej złożone generowanie kodu, np. pracować ze źródłami w postaci bazy danych lub opisać strukturę tabeli, algorytm pracy z tabelą i biorąc pod uwagę cechy infrastruktury DWH, wygenerować proces do ładowania N tabel do pamięci. Lub na przykład pracując z API, które nie obsługuje pracy z parametrem w postaci listy, możesz z tej listy wygenerować N zadań w DAG, ograniczyć równoległość żądań w API do puli i zeskrobać niezbędne dane z API. Elastyczny!

magazyn

Airflow ma własne repozytorium backendowe, bazę danych (może to być MySQL lub Postgres, my mamy Postgres), w której przechowywane są stany zadań, DAG-y, ustawienia połączeń, zmienne globalne itp. itd. Tutaj chciałbym powiedzieć, że repozytorium w Airflow jest bardzo proste (około 20 tabel) i wygodne, jeśli chcesz na nim zbudować dowolny własny proces. Pamiętam 100500 XNUMX tabel w repozytorium Informatica, które trzeba było długo studiować, zanim zrozumiałem, jak zbudować zapytanie.

Monitorowanie

Biorąc pod uwagę prostotę repozytorium, możesz zbudować wygodny dla siebie proces monitorowania zadań. W Zeppelinie korzystamy z notatnika, w którym przeglądamy status zadań:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Może to być również interfejs sieciowy samego Airflow:

Airflow to narzędzie umożliwiające wygodne i szybkie rozwijanie oraz utrzymywanie procesów wsadowego przetwarzania danych

Kod Airflow jest kodem open source, dlatego dodaliśmy alerty do Telegramu. Każde uruchomione wystąpienie zadania, jeśli wystąpi błąd, spamuje grupę w Telegramie, w której składa się cały zespół programistów i wsparcia.

Otrzymujemy szybką odpowiedź za pośrednictwem Telegramu (jeśli jest taka potrzeba), a poprzez Zeppelin otrzymujemy ogólny obraz zadań w Airflow.

Razem

Airflow to przede wszystkim open source i nie należy oczekiwać od niego cudów. Bądź przygotowany na poświęcenie czasu i wysiłku na zbudowanie działającego rozwiązania. Cel jest osiągalny, uwierz mi, warto. Szybkość rozwoju, elastyczność, łatwość dodawania nowych procesów – spodoba Ci się. Oczywiście trzeba zwrócić dużą uwagę na organizację projektu, stabilność samego Airflow: cuda się nie zdarzają.

Teraz Airflow działa codziennie około 6,5 tys. zadań. Mają zupełnie inny charakter. Są zadania polegające na ładowaniu danych do głównego DWH z wielu różnych i bardzo specyficznych źródeł, są zadania obliczania storefrontów wewnątrz głównego DWH, są zadania publikowania danych do szybkiego DWH, jest wiele, wiele różnych zadań - i Airflow przeżuwa je wszystkie dzień po dniu. Mówiąc liczbami, to tak 2,3 tysiące Zadania ELT o różnym stopniu złożoności w ramach DWH (Hadoop), ok. 2,5 tys. baz danych źródeł, to jest zespół z 4 programistów ETL, które dzielą się na przetwarzanie danych ETL w DWH i przetwarzanie danych ELT w DWH i oczywiście więcej jeden administrator, który zajmuje się infrastrukturą serwisu.

Plany na przyszłość

Liczba procesów nieuchronnie rośnie, a główną rzeczą, którą będziemy robić w zakresie infrastruktury Airflow, będzie skalowanie. Chcemy zbudować klaster Airflow, przydzielić parę nóg pracownikom Celery i stworzyć samoduplikującą się głowę z procesami planowania zadań i repozytorium.

Epilog

To oczywiście nie wszystko, co chciałbym powiedzieć o Airflow, ale starałem się podkreślić najważniejsze punkty. Apetyt przychodzi wraz z jedzeniem, spróbuj, a posmakujesz :)

Źródło: www.habr.com

Dodaj komentarz