¡Hola Habr! En este artículo quiero hablar sobre una gran herramienta para desarrollar procesos de procesamiento de datos por lotes, por ejemplo, en la infraestructura de un DWH corporativo o su DataLake. Hablaremos de Apache Airflow (en adelante, Airflow). Habré le ha privado injustamente de su atención y, principalmente, intentaré convencerlo de que al menos vale la pena considerar Airflow al elegir un programador para sus procesos ETL/ELT.
Anteriormente, escribí una serie de artículos sobre el tema DWH cuando trabajaba en Tinkoff Bank. Ahora formo parte del equipo de Mail.Ru Group y estoy desarrollando una plataforma para el análisis de datos en el área de juegos. De hecho, a medida que aparezcan novedades y soluciones interesantes, mi equipo y yo hablaremos aquí sobre nuestra plataforma para análisis de datos.
Prólogo
Vamos a empezar. ¿Qué es el flujo de aire? Esta es una biblioteca (o
Ahora veamos las principales entidades de Airflow. Al comprender su esencia y propósito, podrá organizar de manera óptima la arquitectura de su proceso. Quizás la entidad principal sea el gráfico acíclico dirigido (en adelante, DAG).
DÍA
Un DAG es una asociación significativa de sus tareas que desea completar en una secuencia estrictamente definida de acuerdo con un cronograma específico. Airflow proporciona una interfaz web conveniente para trabajar con DAG y otras entidades:
El DAG podría verse así:
El desarrollador, al diseñar un DAG, establece un conjunto de operadores sobre los cuales se construirán las tareas dentro del DAG. Aquí llegamos a otra entidad importante: Airflow Operador.
operadores
Un operador es una entidad sobre la base de la cual se crean instancias de trabajo, que describe lo que sucederá durante la ejecución de una instancia de trabajo.
- BashOperator: operador para ejecutar un comando bash.
- PythonOperator: operador para llamar al código Python.
- EmailOperator: operador para enviar correo electrónico.
- HTTPOperator: operador para trabajar con solicitudes http.
- SqlOperator: operador para ejecutar código SQL.
- Sensor es un operador de espera de un evento (la llegada del tiempo requerido, la aparición del archivo requerido, una línea en la base de datos, una respuesta de la API, etc., etc.).
Hay operadores más específicos: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
También puedes desarrollar operadores basados en tus propias características y utilizarlos en tu proyecto. Por ejemplo, creamos MongoDBToHiveViaHdfsTransfer, un operador para exportar documentos de MongoDB a Hive, y varios operadores para trabajar con
A continuación, es necesario ejecutar todas estas instancias de tareas y ahora hablaremos del programador.
Planificador
El programador de tareas de Airflow se basa en
Cada grupo tiene un límite en el número de espacios. Al crear un DAG, se le proporciona un grupo:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Un grupo definido en el nivel de DAG se puede anular en el nivel de tarea.
Un proceso separado, el Programador, es responsable de programar todas las tareas en Airflow. En realidad, Scheduler se ocupa de todos los mecanismos de configuración de tareas para su ejecución. La tarea pasa por varias etapas antes de ser ejecutada:
- Las tareas anteriores se han completado en el DAG; se puede poner en cola una nueva.
- La cola se clasifica según la prioridad de las tareas (las prioridades también se pueden controlar) y, si hay un espacio libre en el grupo, la tarea se puede poner en funcionamiento.
- Si hay un apio trabajador libre, se le envía la tarea; Comienza el trabajo que programaste en el problema, utilizando uno u otro operador.
Bastante simple
El programador se ejecuta en el conjunto de todos los DAG y en todas las tareas dentro de los DAG.
Para que Scheduler comience a trabajar con DAG, DAG debe establecer un cronograma:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Hay un conjunto de ajustes preestablecidos ya preparados: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
También puedes usar expresiones cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Fecha de ejecución
Para comprender cómo funciona Airflow, es importante comprender cuál es la fecha de ejecución para un DAG. En Airflow, DAG tiene una dimensión de Fecha de ejecución, es decir, según el cronograma de trabajo del DAG, se crean instancias de tareas para cada Fecha de ejecución. Y para cada Fecha de ejecución, las tareas se pueden volver a ejecutar o, por ejemplo, un DAG puede funcionar simultáneamente en varias Fechas de ejecución. Esto se muestra claramente aquí:
Desafortunadamente (o tal vez afortunadamente: depende de la situación), si se corrige la implementación de la tarea en el DAG, entonces la ejecución en la Fecha de Ejecución anterior continuará teniendo en cuenta los ajustes. Esto es bueno si necesita recalcular datos en períodos pasados usando un nuevo algoritmo, pero es malo porque se pierde la reproducibilidad del resultado (por supuesto, nadie le molesta en devolver la versión requerida del código fuente de Git y calcular qué lo necesitas una vez, como lo necesitas).
Generando tareas
La implementación de DAG es código en Python, por lo que tenemos una forma muy conveniente de reducir la cantidad de código cuando trabajamos, por ejemplo, con fuentes fragmentadas. Digamos que tienes tres fragmentos de MySQL como fuente, necesitas acceder a cada uno y recoger algunos datos. Además, de forma independiente y en paralelo. El código Python en DAG podría verse así:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
El DAG se ve así:
En este caso, puede agregar o eliminar un fragmento simplemente ajustando la configuración y actualizando el DAG. ¡Cómodo!
También puede utilizar una generación de código más compleja, por ejemplo, trabajar con fuentes en forma de base de datos o describir una estructura de tabla, un algoritmo para trabajar con una tabla y, teniendo en cuenta las características de la infraestructura DWH, generar un proceso. para cargar N tablas en su almacenamiento. O, por ejemplo, trabajando con una API que no admite trabajar con un parámetro en forma de lista, puede generar N tareas en un DAG a partir de esta lista, limitar el paralelismo de solicitudes en la API a un grupo y eliminar los datos necesarios de la API. ¡Flexible!
repositorio
Airflow tiene su propio repositorio backend, una base de datos (puede ser MySQL o Postgres, tenemos Postgres), que almacena los estados de las tareas, DAG, configuraciones de conexión, variables globales, etc., etc. Aquí me gustaría poder decir que el El repositorio en Airflow es muy simple (alrededor de 20 tablas) y conveniente si desea crear cualquiera de sus propios procesos sobre él. Recuerdo las 100500 tablas del repositorio de Informatica, que hubo que estudiar durante mucho tiempo antes de entender cómo crear una consulta.
Monitoreo
Dada la simplicidad del repositorio, puede crear un proceso de monitoreo de tareas que sea conveniente para usted. Usamos un bloc de notas en Zeppelin, donde miramos el estado de las tareas:
Esta también podría ser la interfaz web del propio Airflow:
El código de Airflow es de código abierto, por lo que hemos agregado alertas a Telegram. Cada instancia en ejecución de una tarea, si ocurre un error, envía spam al grupo en Telegram, donde está todo el equipo de desarrollo y soporte.
Recibimos una respuesta rápida a través de Telegram (si es necesario) y a través de Zeppelin recibimos una imagen general de las tareas en Airflow.
En total
Airflow es principalmente de código abierto y no debes esperar milagros de él. Esté preparado para dedicar tiempo y esfuerzo a crear una solución que funcione. El objetivo es alcanzable, créanme, vale la pena. Velocidad de desarrollo, flexibilidad, facilidad para agregar nuevos procesos: le gustará. Por supuesto, es necesario prestar mucha atención a la organización del proyecto, a la estabilidad del Airflow en sí: los milagros no suceden.
Ahora tenemos Airflow trabajando diariamente alrededor de 6,5 mil tareas. Son de carácter bastante diferente. Hay tareas de cargar datos en el DWH principal desde muchas fuentes diferentes y muy específicas, hay tareas de calcular escaparates dentro del DWH principal, hay tareas de publicar datos en un DWH rápido, hay muchas, muchas tareas diferentes, y Airflow los mastica todos día tras día. Hablando en números, esto es 2,3 miles Tareas ELT de diversa complejidad dentro de DWH (Hadoop), aprox. 2,5 bases de datos fuentes, este es un equipo de 4 desarrolladores ETL, que se dividen en procesamiento de datos ETL en DWH y procesamiento de datos ELT dentro de DWH y por supuesto más un administrador, quien se ocupa de la infraestructura del servicio.
Planes para el futuro
La cantidad de procesos crece inevitablemente y lo principal que haremos en términos de infraestructura de Airflow es escalar. Queremos construir un clúster de Airflow, asignar un par de patas para los trabajadores de Celery y crear una cabeza autoduplicante con procesos de programación de trabajos y un repositorio.
El acto final
Esto, por supuesto, no es todo lo que me gustaría contar sobre Airflow, pero intenté resaltar los puntos principales. El apetito viene con la comida, pruébalo y te gustará :)
Fuente: habr.com