Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Hej, Habr! I den här artikeln vill jag prata om ett bra verktyg för att utveckla batchdatabearbetningsprocesser, till exempel i infrastrukturen för ett företags DWH eller din DataLake. Vi kommer att prata om Apache Airflow (nedan kallat Airflow). Den är orättvist berövad uppmärksamhet på Habré, och i huvudsak ska jag försöka övertyga dig om att åtminstone Airflow är värt att titta på när du väljer en schemaläggare för dina ETL/ELT-processer.

Tidigare skrev jag en serie artiklar på temat DWH när jag jobbade på Tinkoff Bank. Nu har jag blivit en del av Mail.Ru Group-teamet och håller på att utveckla en plattform för dataanalys inom spelområdet. Faktiskt, när nyheter och intressanta lösningar dyker upp kommer mitt team och jag att prata här om vår plattform för dataanalys.

prolog

Så, låt oss börja. Vad är luftflöde? Detta är ett bibliotek (eller uppsättning bibliotek) att utveckla, planera och övervaka arbetsprocesser. Huvudfunktionen hos Airflow: Python-kod används för att beskriva (utveckla) processer. Detta har många fördelar för att organisera ditt projekt och din utveckling: i huvudsak är ditt (till exempel) ETL-projekt bara ett Python-projekt, och du kan organisera det som du vill, med hänsyn till detaljerna i infrastrukturen, teamstorlek och andra krav. Instrumentellt är allt enkelt. Använd till exempel PyCharm + Git. Det är underbart och väldigt bekvämt!

Låt oss nu titta på huvudenheterna i Airflow. Genom att förstå deras essens och syfte kan du organisera din processarkitektur optimalt. Huvudenheten är kanske den riktade acykliska grafen (nedan kallad DAG).

DAG

En DAG är en meningsfull association av dina uppgifter som du vill slutföra i en strikt definierad sekvens enligt ett specifikt schema. Airflow tillhandahåller ett bekvämt webbgränssnitt för att arbeta med DAG:er och andra enheter:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

DAG kan se ut så här:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Utvecklaren, när han utformar en DAG, fastställer en uppsättning operatörer på vilka uppgifter inom DAG kommer att byggas. Här kommer vi till en annan viktig enhet: Airflow Operator.

operatörer

En operatör är en enhet utifrån vilka jobbinstanser skapas, som beskriver vad som kommer att hända under utförandet av en jobbinstans. Luftflödet släpps från GitHub innehåller redan en uppsättning operatörer redo att användas. Exempel:

  • BashOperator - operatör för att utföra ett bash-kommando.
  • PythonOperator - operatör för att anropa Python-kod.
  • EmailOperator — operatör för att skicka e-post.
  • HTTPOperator - operatör för att arbeta med http-förfrågningar.
  • SqlOperator - operatör för exekvering av SQL-kod.
  • Sensorn är en operatör för att vänta på en händelse (ankomsten av den nödvändiga tiden, utseendet på den nödvändiga filen, en rad i databasen, ett svar från API, etc., etc.).

Det finns mer specifika operatörer: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Du kan också utveckla operatörer utifrån dina egna egenskaper och använda dem i ditt projekt. Till exempel skapade vi MongoDBToHiveViaHdfsTransfer, en operatör för att exportera dokument från MongoDB till Hive, och flera operatörer för att arbeta med klickhus: CHLoadFromHiveOperator och CHTableLoaderOperator. Så snart ett projekt ofta har använt kod byggd på grundläggande uttalanden, kan du tänka på att bygga in den i ett nytt uttalande. Detta kommer att förenkla vidareutvecklingen och du kommer att utöka ditt bibliotek av operatörer i projektet.

Därefter måste alla dessa instanser av uppgifter utföras, och nu ska vi prata om schemaläggaren.

Schemaläggare

Airflows uppgiftsschemaläggare är byggd på Selleri. Celery är ett Python-bibliotek som låter dig organisera en kö plus asynkron och distribuerad exekvering av uppgifter. På Airflow-sidan är alla uppgifter uppdelade i pooler. Pooler skapas manuellt. Typiskt är deras syfte att begränsa arbetsbelastningen av att arbeta med källan eller att typifiera uppgifter inom DWH. Pooler kan hanteras via webbgränssnittet:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Varje pool har en gräns för antalet slots. När du skapar en DAG får den en pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

En pool definierad på DAG-nivå kan åsidosättas på aktivitetsnivå.
En separat process, Scheduler, ansvarar för att schemalägga alla uppgifter i Airflow. Egentligen hanterar Scheduler all mekanik för att ställa in uppgifter för utförande. Uppgiften går igenom flera steg innan den utförs:

  1. De tidigare uppgifterna har slutförts i DAG, en ny kan ställas i kö.
  2. Kön sorteras beroende på prioritet av uppgifter (prioriteringar kan också styras), och om det finns en ledig plats i poolen kan uppgiften tas i drift.
  3. Om det finns en gratis arbetarselleri skickas uppgiften till den; arbetet som du programmerade i problemet börjar med en eller annan operatör.

Enkelt nog.

Schemaläggaren körs på uppsättningen av alla DAG:er och alla uppgifter inom DAG:er.

För att Schemaläggaren ska börja arbeta med DAG måste DAG ställa in ett schema:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Det finns en uppsättning färdiga förinställningar: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Du kan också använda cron-uttryck:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Utföringsdatum

För att förstå hur Airflow fungerar är det viktigt att förstå vad exekveringsdatum är för en DAG. I Airflow har DAG en Execution Date-dimension, det vill säga beroende på DAG:s arbetsschema skapas uppgiftsinstanser för varje Execution Date. Och för varje exekveringsdatum kan uppgifter köras om – eller till exempel kan en DAG arbeta samtidigt i flera exekveringsdatum. Detta visas tydligt här:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Tyvärr (eller kanske lyckligtvis: det beror på situationen), om genomförandet av uppgiften i DAG korrigeras, kommer exekveringen i det tidigare genomförandedatumet att fortsätta med hänsyn till justeringarna. Detta är bra om du behöver räkna om data under tidigare perioder med en ny algoritm, men det är dåligt eftersom reproducerbarheten av resultatet går förlorad (naturligtvis stör ingen dig att returnera den nödvändiga versionen av källkoden från Git och beräkna vad du behöver en gång, som du behöver den).

Generera uppgifter

Implementeringen av DAG är kod i Python, så vi har ett väldigt bekvämt sätt att minska mängden kod när man till exempel arbetar med fragmenterade källor. Låt oss säga att du har tre MySQL-skärvor som källa, du måste klättra in i var och en och plocka upp lite data. Dessutom självständigt och parallellt. Python-koden i DAG kan se ut så här:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG ser ut så här:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

I det här fallet kan du lägga till eller ta bort en skärva genom att helt enkelt justera inställningarna och uppdatera DAG. Bekväm!

Du kan också använda mer komplex kodgenerering, till exempel arbeta med källor i form av en databas eller beskriva en tabellstruktur, en algoritm för att arbeta med en tabell och, med hänsyn till funktionerna i DWH-infrastrukturen, generera en process för att ladda N tabeller i ditt lager. Eller, till exempel om du arbetar med ett API som inte stöder att arbeta med en parameter i form av en lista, kan du generera N uppgifter i en DAG från den här listan, begränsa parallelliteten mellan förfrågningar i API:et till en pool och skrapa nödvändiga data från API:et. Flexibel!

förvaret

Airflow har ett eget backend-lager, en databas (kan vara MySQL eller Postgres, vi har Postgres), som lagrar tillstånden för uppgifter, DAG:er, anslutningsinställningar, globala variabler, etc., etc. Här skulle jag vilja att jag kan säga att repository i Airflow är väldigt enkelt (cirka 20 tabeller) och bekvämt om du vill bygga någon av dina egna processer ovanpå den. Jag minns de 100500 XNUMX tabellerna i Informatica-förvaret, som måste studeras under lång tid innan man förstår hur man bygger en fråga.

övervakning

Med tanke på förvarets enkelhet kan du bygga en uppgiftsövervakningsprocess som är bekväm för dig. Vi använder ett anteckningsblock i Zeppelin, där vi tittar på status för uppgifter:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Detta kan också vara själva webbgränssnittet för Airflow:

Airflow är ett verktyg för att bekvämt och snabbt utveckla och underhålla batchdatabehandlingsprocesser

Airflow-koden är öppen källkod, så vi har lagt till varning till Telegram. Varje pågående instans av en uppgift, om ett fel inträffar, spammar gruppen i Telegram, där hela utvecklings- och supportteamet består.

Vi får ett snabbt svar via Telegram (vid behov), och genom Zeppelin får vi en helhetsbild av uppgifter i Airflow.

Totalt

Airflow är i första hand öppen källkod, och du bör inte förvänta dig mirakel av det. Var beredd att lägga ner tid och kraft på att bygga en lösning som fungerar. Målet är uppnåeligt, tro mig, det är värt det. Utvecklingshastighet, flexibilitet, enkelhet att lägga till nya processer - du kommer att gilla det. Naturligtvis måste du ägna mycket uppmärksamhet åt organisationen av projektet, stabiliteten i själva luftflödet: mirakel händer inte.

Nu har vi Airflow som arbetar dagligen ca 6,5 ​​tusen uppgifter. De är ganska olika till karaktären. Det finns uppgifter att ladda data till huvud-DWH från många olika och mycket specifika källor, det finns uppgifter att beräkna skyltfönster inuti huvud-DWH, det finns uppgifter att publicera data till en snabb DWH, det finns många, många olika uppgifter - och Airflow tuggar upp dem alla dag efter dag. Tala i siffror, det här är 2,3 tusen ELT-uppgifter av varierande komplexitet inom DWH (Hadoop), ca. 2,5 hundra databaser källor, det här är ett team från 4 ETL-utvecklare, som är uppdelade i ETL-databehandling i DWH och ELT-databehandling inuti DWH och naturligtvis mer en admin, som hanterar tjänstens infrastruktur.

Planer för framtiden

Antalet processer växer oundvikligen, och det viktigaste vi kommer att göra när det gäller luftflödesinfrastrukturen är skalning. Vi vill bygga ett Airflow-kluster, tilldela ett par ben för selleriarbetare och göra ett självduplicerande huvud med jobbschemaläggningsprocesser och ett förråd.

Epilog

Detta är naturligtvis inte allt som jag skulle vilja berätta om Airflow, men jag försökte lyfta fram huvudpunkterna. Aptit kommer med att äta, prova det och du kommer att gilla det :)

Källa: will.com

Lägg en kommentar