Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

¡Hola Habr! En este artículo quiero hablar sobre una gran herramienta para desarrollar procesos de procesamiento de datos por lotes, por ejemplo, en la infraestructura de un DWH corporativo o su DataLake. Hablaremos de Apache Airflow (en adelante, Airflow). Habré le ha privado injustamente de su atención y, principalmente, intentaré convencerlo de que al menos vale la pena considerar Airflow al elegir un programador para sus procesos ETL/ELT.

Anteriormente, escribí una serie de artículos sobre el tema DWH cuando trabajaba en Tinkoff Bank. Ahora formo parte del equipo de Mail.Ru Group y estoy desarrollando una plataforma para el análisis de datos en el área de juegos. De hecho, a medida que aparezcan novedades y soluciones interesantes, mi equipo y yo hablaremos aquí sobre nuestra plataforma para análisis de datos.

Prólogo

Vamos a empezar. ¿Qué es el flujo de aire? Esta es una biblioteca (o conjunto de bibliotecas) para desarrollar, planificar y monitorear procesos de trabajo. La característica principal de Airflow: el código Python se utiliza para describir (desarrollar) procesos. Esto tiene muchas ventajas para organizar su proyecto y desarrollo: en esencia, su (por ejemplo) proyecto ETL es solo un proyecto Python y puede organizarlo como desee, teniendo en cuenta las características específicas de la infraestructura, el tamaño del equipo y otros requerimientos. Instrumentalmente todo es sencillo. Utilice, por ejemplo, PyCharm + Git. ¡Es maravilloso y muy conveniente!

Ahora veamos las principales entidades de Airflow. Al comprender su esencia y propósito, podrá organizar de manera óptima la arquitectura de su proceso. Quizás la entidad principal sea el gráfico acíclico dirigido (en adelante, DAG).

DÍA

Un DAG es una asociación significativa de sus tareas que desea completar en una secuencia estrictamente definida de acuerdo con un cronograma específico. Airflow proporciona una interfaz web conveniente para trabajar con DAG y otras entidades:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

El DAG podría verse así:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

El desarrollador, al diseñar un DAG, establece un conjunto de operadores sobre los cuales se construirán las tareas dentro del DAG. Aquí llegamos a otra entidad importante: Airflow Operador.

operadores

Un operador es una entidad sobre la base de la cual se crean instancias de trabajo, que describe lo que sucederá durante la ejecución de una instancia de trabajo. Lanzamientos de flujo de aire de GitHub ya contienen un conjunto de operadores listos para usar. Ejemplos:

  • BashOperator: operador para ejecutar un comando bash.
  • PythonOperator: operador para llamar al código Python.
  • EmailOperator: operador para enviar correo electrónico.
  • HTTPOperator: operador para trabajar con solicitudes http.
  • SqlOperator: operador para ejecutar código SQL.
  • Sensor es un operador de espera de un evento (la llegada del tiempo requerido, la aparición del archivo requerido, una línea en la base de datos, una respuesta de la API, etc., etc.).

Hay operadores más específicos: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

También puedes desarrollar operadores basados ​​en tus propias características y utilizarlos en tu proyecto. Por ejemplo, creamos MongoDBToHiveViaHdfsTransfer, un operador para exportar documentos de MongoDB a Hive, y varios operadores para trabajar con casa de clics: CHLoadFromHiveOperator y CHTableLoaderOperator. Básicamente, tan pronto como un proyecto utiliza con frecuencia código basado en declaraciones básicas, puede pensar en incorporarlo a una nueva declaración. Esto simplificará el desarrollo posterior y ampliará su biblioteca de operadores en el proyecto.

A continuación, es necesario ejecutar todas estas instancias de tareas y ahora hablaremos del programador.

Planificador

El programador de tareas de Airflow se basa en Apio. Celery es una biblioteca de Python que le permite organizar una cola además de la ejecución asincrónica y distribuida de tareas. En el lado de Airflow, todas las tareas se dividen en grupos. Los grupos se crean manualmente. Normalmente, su propósito es limitar la carga de trabajo de trabajar con la fuente o tipificar tareas dentro del DWH. Los grupos se pueden gestionar a través de la interfaz web:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

Cada grupo tiene un límite en el número de espacios. Al crear un DAG, se le proporciona un grupo:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Un grupo definido en el nivel de DAG se puede anular en el nivel de tarea.
Un proceso separado, el Programador, es responsable de programar todas las tareas en Airflow. En realidad, Scheduler se ocupa de todos los mecanismos de configuración de tareas para su ejecución. La tarea pasa por varias etapas antes de ser ejecutada:

  1. Las tareas anteriores se han completado en el DAG; se puede poner en cola una nueva.
  2. La cola se clasifica según la prioridad de las tareas (las prioridades también se pueden controlar) y, si hay un espacio libre en el grupo, la tarea se puede poner en funcionamiento.
  3. Si hay un apio trabajador libre, se le envía la tarea; Comienza el trabajo que programaste en el problema, utilizando uno u otro operador.

Bastante simple

El programador se ejecuta en el conjunto de todos los DAG y en todas las tareas dentro de los DAG.

Para que Scheduler comience a trabajar con DAG, DAG debe establecer un cronograma:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Hay un conjunto de ajustes preestablecidos ya preparados: @once, @hourly, @daily, @weekly, @monthly, @yearly.

También puedes usar expresiones cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Fecha de ejecución

Para comprender cómo funciona Airflow, es importante comprender cuál es la fecha de ejecución para un DAG. En Airflow, DAG tiene una dimensión de Fecha de ejecución, es decir, según el cronograma de trabajo del DAG, se crean instancias de tareas para cada Fecha de ejecución. Y para cada Fecha de ejecución, las tareas se pueden volver a ejecutar o, por ejemplo, un DAG puede funcionar simultáneamente en varias Fechas de ejecución. Esto se muestra claramente aquí:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

Desafortunadamente (o tal vez afortunadamente: depende de la situación), si se corrige la implementación de la tarea en el DAG, entonces la ejecución en la Fecha de Ejecución anterior continuará teniendo en cuenta los ajustes. Esto es bueno si necesita recalcular datos en períodos pasados ​​usando un nuevo algoritmo, pero es malo porque se pierde la reproducibilidad del resultado (por supuesto, nadie le molesta en devolver la versión requerida del código fuente de Git y calcular qué lo necesitas una vez, como lo necesitas).

Generando tareas

La implementación de DAG es código en Python, por lo que tenemos una forma muy conveniente de reducir la cantidad de código cuando trabajamos, por ejemplo, con fuentes fragmentadas. Digamos que tienes tres fragmentos de MySQL como fuente, necesitas acceder a cada uno y recoger algunos datos. Además, de forma independiente y en paralelo. El código Python en DAG podría verse así:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

El DAG se ve así:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

En este caso, puede agregar o eliminar un fragmento simplemente ajustando la configuración y actualizando el DAG. ¡Cómodo!

También puede utilizar una generación de código más compleja, por ejemplo, trabajar con fuentes en forma de base de datos o describir una estructura de tabla, un algoritmo para trabajar con una tabla y, teniendo en cuenta las características de la infraestructura DWH, generar un proceso. para cargar N tablas en su almacenamiento. O, por ejemplo, trabajando con una API que no admite trabajar con un parámetro en forma de lista, puede generar N tareas en un DAG a partir de esta lista, limitar el paralelismo de solicitudes en la API a un grupo y eliminar los datos necesarios de la API. ¡Flexible!

repositorio

Airflow tiene su propio repositorio backend, una base de datos (puede ser MySQL o Postgres, tenemos Postgres), que almacena los estados de las tareas, DAG, configuraciones de conexión, variables globales, etc., etc. Aquí me gustaría poder decir que el El repositorio en Airflow es muy simple (alrededor de 20 tablas) y conveniente si desea crear cualquiera de sus propios procesos sobre él. Recuerdo las 100500 tablas del repositorio de Informatica, que hubo que estudiar durante mucho tiempo antes de entender cómo crear una consulta.

Monitoreo

Dada la simplicidad del repositorio, puede crear un proceso de monitoreo de tareas que sea conveniente para usted. Usamos un bloc de notas en Zeppelin, donde miramos el estado de las tareas:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

Esta también podría ser la interfaz web del propio Airflow:

Airflow es una herramienta para desarrollar y mantener de forma cómoda y rápida procesos de procesamiento de datos por lotes.

El código de Airflow es de código abierto, por lo que hemos agregado alertas a Telegram. Cada instancia en ejecución de una tarea, si ocurre un error, envía spam al grupo en Telegram, donde está todo el equipo de desarrollo y soporte.

Recibimos una respuesta rápida a través de Telegram (si es necesario) y a través de Zeppelin recibimos una imagen general de las tareas en Airflow.

En total

Airflow es principalmente de código abierto y no debes esperar milagros de él. Esté preparado para dedicar tiempo y esfuerzo a crear una solución que funcione. El objetivo es alcanzable, créanme, vale la pena. Velocidad de desarrollo, flexibilidad, facilidad para agregar nuevos procesos: le gustará. Por supuesto, es necesario prestar mucha atención a la organización del proyecto, a la estabilidad del Airflow en sí: los milagros no suceden.

Ahora tenemos Airflow trabajando diariamente alrededor de 6,5 mil tareas. Son de carácter bastante diferente. Hay tareas de cargar datos en el DWH principal desde muchas fuentes diferentes y muy específicas, hay tareas de calcular escaparates dentro del DWH principal, hay tareas de publicar datos en un DWH rápido, hay muchas, muchas tareas diferentes, y Airflow los mastica todos día tras día. Hablando en números, esto es 2,3 miles Tareas ELT de diversa complejidad dentro de DWH (Hadoop), aprox. 2,5 bases de datos fuentes, este es un equipo de 4 desarrolladores ETL, que se dividen en procesamiento de datos ETL en DWH y procesamiento de datos ELT dentro de DWH y por supuesto más un administrador, quien se ocupa de la infraestructura del servicio.

Planes para el futuro

La cantidad de procesos crece inevitablemente y lo principal que haremos en términos de infraestructura de Airflow es escalar. Queremos construir un clúster de Airflow, asignar un par de patas para los trabajadores de Celery y crear una cabeza autoduplicante con procesos de programación de trabajos y un repositorio.

El acto final

Esto, por supuesto, no es todo lo que me gustaría contar sobre Airflow, pero intenté resaltar los puntos principales. El apetito viene con la comida, pruébalo y te gustará :)

Fuente: habr.com

Añadir un comentario