Hallo, Habr! In dit artikel wil ik het hebben over een geweldig hulpmiddel voor het ontwikkelen van batchgegevensverwerkingsprocessen, bijvoorbeeld in de infrastructuur van een bedrijfs-DWH of uw DataLake. We zullen het hebben over Apache Airflow (hierna Airflow genoemd). Het wordt ten onrechte de aandacht ontnomen op Habré, en in het grootste deel zal ik proberen u ervan te overtuigen dat Airflow in ieder geval de moeite waard is om naar te kijken bij het kiezen van een planner voor uw ETL/ELT-processen.
Eerder schreef ik een reeks artikelen over het onderwerp DWH toen ik bij Tinkoff Bank werkte. Nu ben ik onderdeel geworden van het Mail.Ru Group-team en ontwikkel ik een platform voor data-analyse op gaminggebied. Als er nieuws en interessante oplossingen verschijnen, zullen mijn team en ik hier praten over ons platform voor data-analyse.
proloog
Dus laten we beginnen. Wat is luchtstroom? Dit is een bibliotheek (of
Laten we nu eens kijken naar de belangrijkste entiteiten van Airflow. Door de essentie en het doel ervan te begrijpen, kunt u uw procesarchitectuur optimaal inrichten. Misschien wel de belangrijkste entiteit is de Directed Acyclic Graph (hierna DAG genoemd).
DAG
Een DAG is een betekenisvolle associatie van uw taken die u in een strikt gedefinieerde volgorde volgens een specifiek schema wilt voltooien. Airflow biedt een handige webinterface voor het werken met DAG's en andere entiteiten:
De DAG zou er als volgt uit kunnen zien:
De ontwikkelaar legt bij het ontwerpen van een DAG een reeks operators vast waarop taken binnen de DAG zullen worden gebouwd. Hier komen we bij een andere belangrijke entiteit: Airflow Operator.
Операторы
Een operator is een entiteit op basis waarvan taakinstanties worden aangemaakt, die beschrijft wat er zal gebeuren tijdens de uitvoering van een taakinstantie.
- BashOperator - operator voor het uitvoeren van een bash-opdracht.
- PythonOperator - operator voor het aanroepen van Python-code.
- EmailOperator — operator voor het verzenden van e-mail.
- HTTPOperator - operator voor het werken met http-verzoeken.
- SqlOperator - operator voor het uitvoeren van SQL-code.
- Sensor is een operator voor het wachten op een gebeurtenis (de aankomst van de vereiste tijd, het verschijnen van het vereiste bestand, een regel in de database, een reactie van de API, enz., enz.).
Er zijn meer specifieke operators: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
U kunt ook operators ontwikkelen op basis van uw eigen kenmerken en deze in uw project gebruiken. We hebben bijvoorbeeld MongoDBToHiveViaHdfsTransfer gemaakt, een operator voor het exporteren van documenten van MongoDB naar Hive, en verschillende operators om mee te werken
Vervolgens moeten al deze taken worden uitgevoerd, en nu zullen we het hebben over de planner.
Планировщик
De taakplanner van Airflow is erop gebouwd
Elke pool heeft een limiet op het aantal slots. Bij het aanmaken van een DAG krijgt deze een pool:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Een pool die op DAG-niveau is gedefinieerd, kan op taakniveau worden overschreven.
Een apart proces, Scheduler, is verantwoordelijk voor het plannen van alle taken in Airflow. Eigenlijk houdt Scheduler zich bezig met alle mechanismen voor het instellen van taken voor uitvoering. De taak doorloopt verschillende fasen voordat deze wordt uitgevoerd:
- De vorige taken zijn voltooid in de DAG; een nieuwe kan in de wachtrij worden geplaatst.
- De wachtrij wordt gesorteerd afhankelijk van de prioriteit van taken (prioriteiten kunnen ook worden beheerd), en als er een vrije plek in de pool is, kan de taak in werking worden gesteld.
- Als er een vrije werker-selderij is, wordt de taak ernaar verzonden; het werk dat u in het probleem hebt geprogrammeerd, begint met een of andere operator.
Simpel genoeg.
Scheduler draait op de set van alle DAG's en alle taken binnen de DAG's.
Om Scheduler met DAG te laten werken, moet de DAG een schema instellen:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Er is een set kant-en-klare presets: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
U kunt ook cron-expressies gebruiken:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Dag van executie
Om te begrijpen hoe Airflow werkt, is het belangrijk om te begrijpen wat de Uitvoeringsdatum is voor een DAG. In Airflow heeft DAG een dimensie Uitvoeringsdatum, d.w.z. dat er, afhankelijk van het werkschema van de DAG, taakinstanties worden gemaakt voor elke Uitvoeringsdatum. En voor elke Uitvoeringsdatum kunnen taken opnieuw worden uitgevoerd – of kan een DAG bijvoorbeeld tegelijkertijd op meerdere Uitvoeringsdata werken. Dit wordt hier duidelijk weergegeven:
Helaas (of misschien gelukkig: het hangt van de situatie af), als de implementatie van de taak in de DAG wordt gecorrigeerd, zal de uitvoering op de vorige Uitvoeringsdatum doorgaan, rekening houdend met de aanpassingen. Dit is goed als je gegevens uit voorgaande perioden opnieuw moet berekenen met een nieuw algoritme, maar het is slecht omdat de reproduceerbaarheid van het resultaat verloren gaat (natuurlijk stoort niemand je om de vereiste versie van de broncode uit Git terug te sturen en te berekenen wat je hebt het eenmalig nodig, zoals jij het nodig hebt).
Taken genereren
De implementatie van de DAG is code in Python, dus we hebben een zeer handige manier om de hoeveelheid code te verminderen wanneer we bijvoorbeeld met sharded-bronnen werken. Laten we zeggen dat je drie MySQL-scherven als bron hebt, je moet in elke scherven klimmen en wat gegevens verzamelen. Bovendien onafhankelijk en parallel. De Python-code in de DAG kan er als volgt uitzien:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
De DAG ziet er als volgt uit:
In dit geval kunt u een scherf toevoegen of verwijderen door eenvoudigweg de instellingen aan te passen en de DAG bij te werken. Comfortabel!
U kunt ook complexere codegeneratie gebruiken, bijvoorbeeld werken met bronnen in de vorm van een database of een tabelstructuur beschrijven, een algoritme voor het werken met een tabel, en, rekening houdend met de kenmerken van de DWH-infrastructuur, een proces genereren voor het laden van N tabellen in uw opslag. Of als u bijvoorbeeld met een API werkt die het werken met een parameter in de vorm van een lijst niet ondersteunt, kunt u uit deze lijst N taken in een DAG genereren, de parallelliteit van verzoeken in de API beperken tot een pool en schrapen de benodigde gegevens uit de API. Flexibel!
opslagplaats
Airflow heeft zijn eigen backend-repository, een database (kan MySQL of Postgres zijn, we hebben Postgres), die de status van taken, DAG's, verbindingsinstellingen, globale variabelen, enz. Opslaat. Hier zou ik willen zeggen dat de repository in Airflow is heel eenvoudig (ongeveer 20 tabellen) en handig als u er uw eigen processen bovenop wilt bouwen. Ik herinner me de 100500 tabellen in de Informatica-repository, die lange tijd moesten worden bestudeerd voordat ik begreep hoe je een query moest bouwen.
controle
Gezien de eenvoud van de repository kunt u een proces voor taakmonitoring bouwen dat voor u handig is. We gebruiken een notitieblok in Zeppelin, waar we de status van taken bekijken:
Dit zou ook de webinterface van Airflow zelf kunnen zijn:
De Airflow-code is open source, daarom hebben we waarschuwingen aan Telegram toegevoegd. Als er bij elk actief exemplaar van een taak een fout optreedt, wordt de groep in Telegram spamt, waar het hele ontwikkelings- en ondersteuningsteam bestaat.
Via Telegram krijgen we snel antwoord (indien nodig) en via Zeppelin krijgen we een totaalbeeld van de taken in Airflow.
In totaal
Airflow is voornamelijk open source en je moet er geen wonderen van verwachten. Wees bereid om de tijd en moeite te steken in het bouwen van een oplossing die werkt. Het doel is haalbaar, geloof me, het is het waard. Snelheid van ontwikkeling, flexibiliteit, gemak van het toevoegen van nieuwe processen - u zult het leuk vinden. Natuurlijk moet je veel aandacht besteden aan de organisatie van het project, de stabiliteit van de Airflow zelf: wonderen gebeuren niet.
Nu hebben we Airflow dagelijks aan het werk ongeveer 6,5 duizend taken. Ze zijn heel verschillend van karakter. Er zijn taken voor het laden van gegevens in de hoofd-DWH vanuit veel verschillende en zeer specifieke bronnen, er zijn taken voor het berekenen van winkelpuien in de hoofd-DWH, er zijn taken voor het publiceren van gegevens in een snelle DWH, er zijn heel veel verschillende taken - en Airflow kauwt ze dag na dag allemaal op. In cijfers gesproken, dit is het 2,3 duizend ELT-taken van verschillende complexiteit binnen DWH (Hadoop), ca. 2,5 honderd databases bronnen, dit is een team van 4 ETL-ontwikkelaars, die zijn onderverdeeld in ETL-gegevensverwerking in DWH en ELT-gegevensverwerking binnen DWH en natuurlijk meer één beheerder, die zich bezighoudt met de infrastructuur van de dienst.
Plannen voor de toekomst
Het aantal processen groeit onvermijdelijk, en het belangrijkste wat we zullen doen op het gebied van de Airflow-infrastructuur is schaalvergroting. We willen een Airflow-cluster bouwen, een paar benen toewijzen aan Celery-werknemers en een zichzelf duplicerend hoofd maken met taakplanningsprocessen en een opslagplaats.
epiloog
Dit is natuurlijk niet alles wat ik over Airflow zou willen vertellen, maar ik heb geprobeerd de belangrijkste punten te benadrukken. Eetlust komt met eten, probeer het en je zult het leuk vinden :)
Bron: www.habr.com