Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

Hallo, Habr! In dit artikel wil ik het hebben over een geweldig hulpmiddel voor het ontwikkelen van batchgegevensverwerkingsprocessen, bijvoorbeeld in de infrastructuur van een bedrijfs-DWH of uw DataLake. We zullen het hebben over Apache Airflow (hierna Airflow genoemd). Het wordt ten onrechte de aandacht ontnomen op Habré, en in het grootste deel zal ik proberen u ervan te overtuigen dat Airflow in ieder geval de moeite waard is om naar te kijken bij het kiezen van een planner voor uw ETL/ELT-processen.

Eerder schreef ik een reeks artikelen over het onderwerp DWH toen ik bij Tinkoff Bank werkte. Nu ben ik onderdeel geworden van het Mail.Ru Group-team en ontwikkel ik een platform voor data-analyse op gaminggebied. Als er nieuws en interessante oplossingen verschijnen, zullen mijn team en ik hier praten over ons platform voor data-analyse.

proloog

Dus laten we beginnen. Wat is luchtstroom? Dit is een bibliotheek (of reeks bibliotheken) voor het ontwikkelen, plannen en bewaken van werkprocessen. Het belangrijkste kenmerk van Airflow: Python-code wordt gebruikt om processen te beschrijven (ontwikkelen). Dit heeft veel voordelen voor het organiseren van uw project en ontwikkeling: in wezen is uw (bijvoorbeeld) ETL-project slechts een Python-project en kunt u het inrichten zoals u dat wilt, rekening houdend met de specifieke kenmerken van de infrastructuur, teamgrootte en andere vereisten. Instrumentaal is alles eenvoudig. Gebruik bijvoorbeeld PyCharm + Git. Het is geweldig en erg handig!

Laten we nu eens kijken naar de belangrijkste entiteiten van Airflow. Door de essentie en het doel ervan te begrijpen, kunt u uw procesarchitectuur optimaal inrichten. Misschien wel de belangrijkste entiteit is de Directed Acyclic Graph (hierna DAG genoemd).

DAG

Een DAG is een betekenisvolle associatie van uw taken die u in een strikt gedefinieerde volgorde volgens een specifiek schema wilt voltooien. Airflow biedt een handige webinterface voor het werken met DAG's en andere entiteiten:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

De DAG zou er als volgt uit kunnen zien:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

De ontwikkelaar legt bij het ontwerpen van een DAG een reeks operators vast waarop taken binnen de DAG zullen worden gebouwd. Hier komen we bij een andere belangrijke entiteit: Airflow Operator.

Операторы

Een operator is een entiteit op basis waarvan taakinstanties worden aangemaakt, die beschrijft wat er zal gebeuren tijdens de uitvoering van een taakinstantie. Luchtstroomreleases van GitHub bevatten al een set operators die klaar zijn voor gebruik. Voorbeelden:

  • BashOperator - operator voor het uitvoeren van een bash-opdracht.
  • PythonOperator - operator voor het aanroepen van Python-code.
  • EmailOperator — operator voor het verzenden van e-mail.
  • HTTPOperator - operator voor het werken met http-verzoeken.
  • SqlOperator - operator voor het uitvoeren van SQL-code.
  • Sensor is een operator voor het wachten op een gebeurtenis (de aankomst van de vereiste tijd, het verschijnen van het vereiste bestand, een regel in de database, een reactie van de API, enz., enz.).

Er zijn meer specifieke operators: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

U kunt ook operators ontwikkelen op basis van uw eigen kenmerken en deze in uw project gebruiken. We hebben bijvoorbeeld MongoDBToHiveViaHdfsTransfer gemaakt, een operator voor het exporteren van documenten van MongoDB naar Hive, en verschillende operators om mee te werken Klik op Huis: CHLoadFromHiveOperator en CHTableLoaderOperator. Zodra een project veelvuldig gebruik heeft gemaakt van code die op basisinstructies is gebouwd, kunt u erover nadenken om deze in een nieuwe instructie in te bouwen. Dit vereenvoudigt de verdere ontwikkeling en u breidt uw bibliotheek met operators in het project uit.

Vervolgens moeten al deze taken worden uitgevoerd, en nu zullen we het hebben over de planner.

Планировщик

De taakplanner van Airflow is erop gebouwd Selderij. Celery is een Python-bibliotheek waarmee u een wachtrij en asynchrone en gedistribueerde uitvoering van taken kunt organiseren. Aan de Airflow-kant zijn alle taken verdeeld in pools. Pools worden handmatig aangemaakt. Meestal is hun doel het beperken van de werklast van het werken met de bron, of het typeren van taken binnen de DWH. Pools kunnen worden beheerd via de webinterface:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

Elke pool heeft een limiet op het aantal slots. Bij het aanmaken van een DAG krijgt deze een pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Een pool die op DAG-niveau is gedefinieerd, kan op taakniveau worden overschreven.
Een apart proces, Scheduler, is verantwoordelijk voor het plannen van alle taken in Airflow. Eigenlijk houdt Scheduler zich bezig met alle mechanismen voor het instellen van taken voor uitvoering. De taak doorloopt verschillende fasen voordat deze wordt uitgevoerd:

  1. De vorige taken zijn voltooid in de DAG; een nieuwe kan in de wachtrij worden geplaatst.
  2. De wachtrij wordt gesorteerd afhankelijk van de prioriteit van taken (prioriteiten kunnen ook worden beheerd), en als er een vrije plek in de pool is, kan de taak in werking worden gesteld.
  3. Als er een vrije werker-selderij is, wordt de taak ernaar verzonden; het werk dat u in het probleem hebt geprogrammeerd, begint met een of andere operator.

Simpel genoeg.

Scheduler draait op de set van alle DAG's en alle taken binnen de DAG's.

Om Scheduler met DAG te laten werken, moet de DAG een schema instellen:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Er is een set kant-en-klare presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

U kunt ook cron-expressies gebruiken:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dag van executie

Om te begrijpen hoe Airflow werkt, is het belangrijk om te begrijpen wat de Uitvoeringsdatum is voor een DAG. In Airflow heeft DAG een dimensie Uitvoeringsdatum, d.w.z. dat er, afhankelijk van het werkschema van de DAG, taakinstanties worden gemaakt voor elke Uitvoeringsdatum. En voor elke Uitvoeringsdatum kunnen taken opnieuw worden uitgevoerd – of kan een DAG bijvoorbeeld tegelijkertijd op meerdere Uitvoeringsdata werken. Dit wordt hier duidelijk weergegeven:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

Helaas (of misschien gelukkig: het hangt van de situatie af), als de implementatie van de taak in de DAG wordt gecorrigeerd, zal de uitvoering op de vorige Uitvoeringsdatum doorgaan, rekening houdend met de aanpassingen. Dit is goed als je gegevens uit voorgaande perioden opnieuw moet berekenen met een nieuw algoritme, maar het is slecht omdat de reproduceerbaarheid van het resultaat verloren gaat (natuurlijk stoort niemand je om de vereiste versie van de broncode uit Git terug te sturen en te berekenen wat je hebt het eenmalig nodig, zoals jij het nodig hebt).

Taken genereren

De implementatie van de DAG is code in Python, dus we hebben een zeer handige manier om de hoeveelheid code te verminderen wanneer we bijvoorbeeld met sharded-bronnen werken. Laten we zeggen dat je drie MySQL-scherven als bron hebt, je moet in elke scherven klimmen en wat gegevens verzamelen. Bovendien onafhankelijk en parallel. De Python-code in de DAG kan er als volgt uitzien:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

De DAG ziet er als volgt uit:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

In dit geval kunt u een scherf toevoegen of verwijderen door eenvoudigweg de instellingen aan te passen en de DAG bij te werken. Comfortabel!

U kunt ook complexere codegeneratie gebruiken, bijvoorbeeld werken met bronnen in de vorm van een database of een tabelstructuur beschrijven, een algoritme voor het werken met een tabel, en, rekening houdend met de kenmerken van de DWH-infrastructuur, een proces genereren voor het laden van N tabellen in uw opslag. Of als u bijvoorbeeld met een API werkt die het werken met een parameter in de vorm van een lijst niet ondersteunt, kunt u uit deze lijst N taken in een DAG genereren, de parallelliteit van verzoeken in de API beperken tot een pool en schrapen de benodigde gegevens uit de API. Flexibel!

opslagplaats

Airflow heeft zijn eigen backend-repository, een database (kan MySQL of Postgres zijn, we hebben Postgres), die de status van taken, DAG's, verbindingsinstellingen, globale variabelen, enz. Opslaat. Hier zou ik willen zeggen dat de repository in Airflow is heel eenvoudig (ongeveer 20 tabellen) en handig als u er uw eigen processen bovenop wilt bouwen. Ik herinner me de 100500 tabellen in de Informatica-repository, die lange tijd moesten worden bestudeerd voordat ik begreep hoe je een query moest bouwen.

controle

Gezien de eenvoud van de repository kunt u een proces voor taakmonitoring bouwen dat voor u handig is. We gebruiken een notitieblok in Zeppelin, waar we de status van taken bekijken:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

Dit zou ook de webinterface van Airflow zelf kunnen zijn:

Airflow is een hulpmiddel voor het eenvoudig en snel ontwikkelen en onderhouden van batchgegevensverwerkingsprocessen

De Airflow-code is open source, daarom hebben we waarschuwingen aan Telegram toegevoegd. Als er bij elk actief exemplaar van een taak een fout optreedt, wordt de groep in Telegram spamt, waar het hele ontwikkelings- en ondersteuningsteam bestaat.

Via Telegram krijgen we snel antwoord (indien nodig) en via Zeppelin krijgen we een totaalbeeld van de taken in Airflow.

In totaal

Airflow is voornamelijk open source en je moet er geen wonderen van verwachten. Wees bereid om de tijd en moeite te steken in het bouwen van een oplossing die werkt. Het doel is haalbaar, geloof me, het is het waard. Snelheid van ontwikkeling, flexibiliteit, gemak van het toevoegen van nieuwe processen - u zult het leuk vinden. Natuurlijk moet je veel aandacht besteden aan de organisatie van het project, de stabiliteit van de Airflow zelf: wonderen gebeuren niet.

Nu hebben we Airflow dagelijks aan het werk ongeveer 6,5 duizend taken. Ze zijn heel verschillend van karakter. Er zijn taken voor het laden van gegevens in de hoofd-DWH vanuit veel verschillende en zeer specifieke bronnen, er zijn taken voor het berekenen van winkelpuien in de hoofd-DWH, er zijn taken voor het publiceren van gegevens in een snelle DWH, er zijn heel veel verschillende taken - en Airflow kauwt ze dag na dag allemaal op. In cijfers gesproken, dit is het 2,3 duizend ELT-taken van verschillende complexiteit binnen DWH (Hadoop), ca. 2,5 honderd databases bronnen, dit is een team van 4 ETL-ontwikkelaars, die zijn onderverdeeld in ETL-gegevensverwerking in DWH en ELT-gegevensverwerking binnen DWH en natuurlijk meer één beheerder, die zich bezighoudt met de infrastructuur van de dienst.

Plannen voor de toekomst

Het aantal processen groeit onvermijdelijk, en het belangrijkste wat we zullen doen op het gebied van de Airflow-infrastructuur is schaalvergroting. We willen een Airflow-cluster bouwen, een paar benen toewijzen aan Celery-werknemers en een zichzelf duplicerend hoofd maken met taakplanningsprocessen en een opslagplaats.

epiloog

Dit is natuurlijk niet alles wat ik over Airflow zou willen vertellen, maar ik heb geprobeerd de belangrijkste punten te benadrukken. Eetlust komt met eten, probeer het en je zult het leuk vinden :)

Bron: www.habr.com

Voeg een reactie