Witaj, Habro! W tym artykule chcę porozmawiać o jednym świetnym narzędziu do rozwijania procesów wsadowego przetwarzania danych, na przykład w infrastrukturze korporacyjnego DWH lub Twojego DataLake. Porozmawiamy o Apache Airflow (zwanym dalej Airflow). Jest to niesprawiedliwie pozbawione uwagi na temat Habré, a w głównej części postaram się Was przekonać, że przy wyborze harmonogramu dla swoich procesów ETL/ELT warto zwrócić uwagę przynajmniej na Airflow.
Wcześniej, pracując w Tinkoff Bank, napisałem serię artykułów na temat DWH. Teraz stałem się częścią zespołu Mail.Ru Group i rozwijam platformę do analizy danych w obszarze gier. Właściwie, gdy pojawią się nowości i ciekawe rozwiązania, wraz z moim zespołem będziemy tutaj rozmawiać o naszej platformie do analityki danych.
Prolog
Zacznijmy więc. Co to jest przepływ powietrza? To jest biblioteka (lub
Przyjrzyjmy się teraz głównym elementom Airflow. Rozumiejąc ich istotę i cel, możesz optymalnie zorganizować architekturę procesów. Być może główną jednostką jest skierowany graf acykliczny (zwany dalej DAG).
DZIEŃ
DAG to jakieś sensowne powiązanie Twoich zadań, które chcesz wykonać w ściśle określonej kolejności, według określonego harmonogramu. Airflow zapewnia wygodny interfejs sieciowy do pracy z DAG-ami i innymi podmiotami:
DAG może wyglądać następująco:
Deweloper projektując DAG ustala zestaw operatorów, na których będą budowane zadania w ramach DAG. Tutaj dochodzimy do kolejnego ważnego podmiotu: Airflow Operator.
Operatorzy
Operator to jednostka, na podstawie której tworzone są instancje zadania, która opisuje, co będzie się działo podczas realizacji instancji pracy.
- BashOperator - operator wykonujący polecenie bash.
- PythonOperator - operator wywołujący kod Pythona.
- EmailOperator — operator wysyłania wiadomości e-mail.
- HTTPOperator - operator do pracy z żądaniami http.
- SqlOperator - operator wykonujący kod SQL.
- Sensor to operator oczekiwania na zdarzenie (nadejście wymaganego czasu, pojawienie się wymaganego pliku, linia w bazie danych, odpowiedź z API itp., itp.).
Istnieją bardziej szczegółowe operatory: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Możesz także opracować operatory w oparciu o własne cechy i wykorzystać je w swoim projekcie. Na przykład stworzyliśmy MongoDBToHiveViaHdfsTransfer, operator do eksportowania dokumentów z MongoDB do Hive oraz kilka operatorów do pracy z
Następnie wszystkie te wystąpienia zadań muszą zostać wykonane, a teraz porozmawiamy o harmonogramie.
Planista
Wbudowany jest harmonogram zadań Airflow
Każda pula ma ograniczoną liczbę miejsc. Tworząc DAG, otrzymuje się pulę:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Pulę zdefiniowaną na poziomie DAG można zastąpić na poziomie zadania.
Za planowanie wszystkich zadań w Airflow odpowiada odrębny proces Harmonogram. Właściwie Harmonogram zajmuje się całą mechaniką ustawiania zadań do wykonania. Zadanie przed wykonaniem przechodzi przez kilka etapów:
- Poprzednie zadania zostały zrealizowane w DAG, można kolejkować nowe.
- Kolejka jest sortowana w zależności od priorytetu zadań (priorytety można również kontrolować), a jeśli w puli jest wolne miejsce, zadanie można przystąpić do realizacji.
- Jeśli jest wolny seler robotniczy, zadanie zostaje do niego wysłane; rozpoczyna się praca zaprogramowana w zadaniu, przy użyciu tego lub innego operatora.
Wystarczająco proste.
Harmonogram działa na zestawie wszystkich DAG i wszystkich zadań w obrębie DAG.
Aby Harmonogram mógł rozpocząć współpracę z DAG, DAG musi ustawić harmonogram:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Dostępny jest zestaw gotowych presetów: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Możesz także użyć wyrażeń cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Data wykonania
Aby zrozumieć, jak działa Airflow, ważne jest, aby zrozumieć, jaka jest data wykonania dla DAG. W Airflow DAG posiada wymiar Execution Date, czyli w zależności od harmonogramu pracy DAG, dla każdej Daty Realizacji tworzone są instancje zadań. A dla każdej Daty Wykonania zadania mogą zostać wykonane ponownie - lub np. DAG może pracować jednocześnie w kilku Datach Wykonania. Tutaj jest to wyraźnie pokazane:
Niestety (a może na szczęście: to zależy od sytuacji), jeśli realizacja zadania w DAG zostanie poprawiona, wówczas realizacja w poprzednim Dacie Realizacji będzie kontynuowana z uwzględnieniem korekt. Jest to dobre, jeśli trzeba przeliczyć dane z poprzednich okresów przy użyciu nowego algorytmu, ale jest złe, ponieważ traci się powtarzalność wyniku (oczywiście nikt nie przeszkadza, aby zwrócić wymaganą wersję kodu źródłowego z Git i obliczyć, co potrzebujesz raz, tak jak tego potrzebujesz).
Generowanie zadań
Implementacją DAG jest kod w Pythonie, dzięki czemu mamy bardzo wygodny sposób na zmniejszenie ilości kodu podczas pracy np. ze źródłami shardowanymi. Załóżmy, że masz trzy fragmenty MySQL jako źródło, musisz wejść do każdego z nich i pobrać trochę danych. Co więcej, niezależnie i równolegle. Kod Pythona w DAG może wyglądać następująco:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG wygląda następująco:
W takim przypadku możesz dodać lub usunąć fragment, po prostu dostosowując ustawienia i aktualizując DAG. Wygodny!
Można także zastosować bardziej złożone generowanie kodu, np. pracować ze źródłami w postaci bazy danych lub opisać strukturę tabeli, algorytm pracy z tabelą i biorąc pod uwagę cechy infrastruktury DWH, wygenerować proces do ładowania N tabel do pamięci. Lub na przykład pracując z API, które nie obsługuje pracy z parametrem w postaci listy, możesz z tej listy wygenerować N zadań w DAG, ograniczyć równoległość żądań w API do puli i zeskrobać niezbędne dane z API. Elastyczny!
magazyn
Airflow ma własne repozytorium backendowe, bazę danych (może to być MySQL lub Postgres, my mamy Postgres), w której przechowywane są stany zadań, DAG-y, ustawienia połączeń, zmienne globalne itp. itd. Tutaj chciałbym powiedzieć, że repozytorium w Airflow jest bardzo proste (około 20 tabel) i wygodne, jeśli chcesz na nim zbudować dowolny własny proces. Pamiętam 100500 XNUMX tabel w repozytorium Informatica, które trzeba było długo studiować, zanim zrozumiałem, jak zbudować zapytanie.
Monitorowanie
Biorąc pod uwagę prostotę repozytorium, możesz zbudować wygodny dla siebie proces monitorowania zadań. W Zeppelinie korzystamy z notatnika, w którym przeglądamy status zadań:
Może to być również interfejs sieciowy samego Airflow:
Kod Airflow jest kodem open source, dlatego dodaliśmy alerty do Telegramu. Każde uruchomione wystąpienie zadania, jeśli wystąpi błąd, spamuje grupę w Telegramie, w której składa się cały zespół programistów i wsparcia.
Otrzymujemy szybką odpowiedź za pośrednictwem Telegramu (jeśli jest taka potrzeba), a poprzez Zeppelin otrzymujemy ogólny obraz zadań w Airflow.
Razem
Airflow to przede wszystkim open source i nie należy oczekiwać od niego cudów. Bądź przygotowany na poświęcenie czasu i wysiłku na zbudowanie działającego rozwiązania. Cel jest osiągalny, uwierz mi, warto. Szybkość rozwoju, elastyczność, łatwość dodawania nowych procesów – spodoba Ci się. Oczywiście trzeba zwrócić dużą uwagę na organizację projektu, stabilność samego Airflow: cuda się nie zdarzają.
Teraz Airflow działa codziennie około 6,5 tys. zadań. Mają zupełnie inny charakter. Są zadania polegające na ładowaniu danych do głównego DWH z wielu różnych i bardzo specyficznych źródeł, są zadania obliczania storefrontów wewnątrz głównego DWH, są zadania publikowania danych do szybkiego DWH, jest wiele, wiele różnych zadań - i Airflow przeżuwa je wszystkie dzień po dniu. Mówiąc liczbami, to tak 2,3 tysiące Zadania ELT o różnym stopniu złożoności w ramach DWH (Hadoop), ok. 2,5 tys. baz danych źródeł, to jest zespół z 4 programistów ETL, które dzielą się na przetwarzanie danych ETL w DWH i przetwarzanie danych ELT w DWH i oczywiście więcej jeden administrator, który zajmuje się infrastrukturą serwisu.
Plany na przyszłość
Liczba procesów nieuchronnie rośnie, a główną rzeczą, którą będziemy robić w zakresie infrastruktury Airflow, będzie skalowanie. Chcemy zbudować klaster Airflow, przydzielić parę nóg pracownikom Celery i stworzyć samoduplikującą się głowę z procesami planowania zadań i repozytorium.
Epilog
To oczywiście nie wszystko, co chciałbym powiedzieć o Airflow, ale starałem się podkreślić najważniejsze punkty. Apetyt przychodzi wraz z jedzeniem, spróbuj, a posmakujesz :)
Źródło: www.habr.com