Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Ola, Habr! Neste artigo quero falar dunha gran ferramenta para desenvolver procesos de procesamento de datos por lotes, por exemplo, na infraestrutura dun DWH corporativo ou o seu DataLake. Falaremos de Apache Airflow (en diante Airflow). Habré está inxustamente privado de atención e, na parte principal, tentarei convencerte de que polo menos paga a pena mirar Airflow ao elixir un programador para os teus procesos ETL/ELT.

Anteriormente, escribín unha serie de artigos sobre o tema DWH cando traballaba no Tinkoff Bank. Agora formei parte do equipo do Grupo Mail.Ru e estou a desenvolver unha plataforma para a análise de datos na área de xogos. En realidade, a medida que aparecen novas e solucións interesantes, o meu equipo e eu falaremos aquí da nosa plataforma de análise de datos.

Prólogo

Entón, imos comezar. Que é Airflow? Esta é unha biblioteca (ou conxunto de bibliotecas) desenvolver, planificar e supervisar os procesos de traballo. A principal característica de Airflow: o código Python úsase para describir (desenvolver) procesos. Isto ten moitas vantaxes para organizar o teu proxecto e desenvolvemento: en esencia, o teu (por exemplo) proxecto ETL é só un proxecto Python, e podes organizalo como queiras, tendo en conta as especificidades da infraestrutura, o tamaño do equipo e outros requisitos. Instrumentalmente todo é sinxelo. Use por exemplo PyCharm + Git. É marabilloso e moi cómodo!

Agora vexamos as principais entidades de Airflow. Ao comprender a súa esencia e propósito, pode organizar de forma óptima a súa arquitectura de procesos. Quizais a entidade principal sexa o Gráfico Acíclico Dirixido (en diante denominado DAG).

DAG

Un DAG é unha asociación significativa das túas tarefas que queres completar nunha secuencia estritamente definida segundo unha programación específica. Airflow ofrece unha interface web conveniente para traballar con DAG e outras entidades:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

O DAG pode verse así:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

O desenvolvedor, ao deseñar un DAG, establece un conxunto de operadores sobre os que se construirán as tarefas dentro do DAG. Aquí chegamos a outra entidade importante: Airflow Operator.

Operadores

Un operador é unha entidade sobre a base da cal se crean instancias de traballo, que describe o que ocorrerá durante a execución dunha instancia de traballo. Lanzamentos de Airflow de GitHub xa conteñen un conxunto de operadores listos para usar. Exemplos:

  • BashOperator - operador para executar un comando bash.
  • PythonOperator - operador para chamar código Python.
  • EmailOperator: operador para enviar correo electrónico.
  • HTTPOperator - operador para traballar con solicitudes http.
  • SqlOperator - operador para executar código SQL.
  • Sensor é un operador para agardar un evento (a chegada do tempo necesario, a aparición do ficheiro necesario, unha liña na base de datos, unha resposta da API, etc., etc.).

Existen operadores máis específicos: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Tamén pode desenvolver operadores en función das súas propias características e utilizalos no seu proxecto. Por exemplo, creamos MongoDBToHiveViaHdfsTransfer, un operador para exportar documentos de MongoDB a Hive e varios operadores para traballar con clickhouse: CHLoadFromHiveOperator e CHTableLoaderOperator. Esencialmente, tan pronto como un proxecto utilizou con frecuencia código construído sobre instrucións básicas, podes pensar en crealo nunha nova declaración. Isto simplificará o desenvolvemento e ampliará a súa biblioteca de operadores no proxecto.

A continuación, todas estas instancias de tarefas deben ser executadas, e agora falaremos do planificador.

Programador

O programador de tarefas de Airflow está construído apio. Celery é unha biblioteca de Python que permite organizar unha cola máis a execución asincrónica e distribuída de tarefas. No lado do fluxo de aire, todas as tarefas divídense en grupos. As piscinas créanse manualmente. Normalmente, o seu propósito é limitar a carga de traballo de traballar coa fonte ou tipificar tarefas dentro do DWH. As piscinas pódense xestionar a través da interface web:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Cada grupo ten un límite no número de slots. Ao crear un DAG, dáselle un pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Unha agrupación definida a nivel de DAG pódese substituír a nivel de tarefa.
Un proceso separado, Scheduler, é o responsable de programar todas as tarefas en Airflow. En realidade, Scheduler encárgase de toda a mecánica de establecer tarefas para a súa execución. A tarefa pasa por varias etapas antes de ser executada:

  1. As tarefas anteriores realizáronse no DAG, pódese poñer unha nova.
  2. A cola ordénase en función da prioridade das tarefas (tamén se poden controlar as prioridades), e se hai un espazo libre no pool, a tarefa pódese poñer en funcionamento.
  3. Se hai un apio obreiro libre, envíalle a tarefa; comeza o traballo que programaches no problema, utilizando un ou outro operador.

Abondo sinxelo.

O Scheduler execútase no conxunto de todos os DAG e todas as tarefas dentro dos DAG.

Para que Scheduler comece a traballar con DAG, o DAG debe establecer unha programación:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Hai un conxunto de predefinidos preparados: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Tamén podes usar expresións cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data de execución

Para entender como funciona Airflow, é importante comprender cal é a data de execución dun DAG. En Airflow, DAG ten unha dimensión de Data de execución, é dicir, dependendo da programación de traballo do DAG, créanse instancias de tarefas para cada Data de execución. E para cada Data de Execución, pódense volver executar tarefas ou, por exemplo, un DAG pode traballar simultaneamente en varias Datas de Execución. Isto móstrase claramente aquí:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Desafortunadamente (ou quizais afortunadamente: depende da situación), se se corrixe a execución da tarefa no DAG, procederase á execución na Data de Execución anterior tendo en conta os axustes. Isto é bo se necesitas volver calcular datos en períodos pasados ​​usando un novo algoritmo, pero é malo porque se perde a reproducibilidade do resultado (por suposto, ninguén te molesta en devolver a versión requirida do código fonte de Git e calcular o que precisa unha vez, como o necesita).

Xeración de tarefas

A implementación do DAG é código en Python, polo que temos unha forma moi cómoda de reducir a cantidade de código cando se traballa, por exemplo, con fontes fragmentadas. Digamos que tes tres fragmentos de MySQL como fonte, tes que subir a cada un e recoller algúns datos. Ademais, de forma independente e paralela. O código de Python no DAG pode verse así:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

O DAG ten o seguinte aspecto:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Neste caso, pode engadir ou eliminar un fragmento simplemente axustando a configuración e actualizando o DAG. Cómodo!

Tamén pode usar a xeración de código máis complexo, por exemplo, traballar con fontes en forma de base de datos ou describir unha estrutura de táboa, un algoritmo para traballar cunha táboa e, tendo en conta as características da infraestrutura DWH, xerar un proceso. para cargar N táboas no teu almacenamento. Ou, por exemplo, traballando cunha API que non admite traballar cun parámetro en forma de lista, pode xerar N tarefas nun DAG a partir desta lista, limitar o paralelismo de solicitudes na API a un grupo e raspar os datos necesarios da API. Flexible!

repositorio

Airflow ten o seu propio repositorio back-end, unha base de datos (pode ser MySQL ou Postgres, temos Postgres), que almacena os estados das tarefas, DAG, axustes de conexión, variables globais, etc., etc. Aquí gustaríame dicir que o O repositorio en Airflow é moi sinxelo (unhas 20 táboas) e cómodo se queres construír algún dos teus propios procesos encima. Recordo as 100500 táboas do repositorio de Informatica, que houbo que estudalas durante moito tempo antes de entender como construír unha consulta.

Seguimento

Dada a sinxeleza do repositorio, podes crear un proceso de seguimento de tarefas que che resulte cómodo. Usamos un bloc de notas en Zeppelin, onde observamos o estado das tarefas:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

Esta tamén podería ser a interface web do propio Airflow:

Airflow é unha ferramenta para desenvolver e manter de forma cómoda e rápida procesos de procesamento de datos por lotes

O código Airflow é de código aberto, polo que engadimos alertas a Telegram. Cada instancia en execución dunha tarefa, se se produce un erro, envía spam ao grupo en Telegram, onde está formado todo o equipo de desenvolvemento e soporte.

Recibimos unha resposta rápida a través de Telegram (se é necesario) e a través de Zeppelin recibimos unha imaxe xeral das tarefas en Airflow.

En total

Airflow é principalmente de código aberto e non debes esperar milagres del. Estea preparado para dedicar tempo e esforzo para construír unha solución que funcione. O obxectivo é alcanzable, créame, paga a pena. Velocidade de desenvolvemento, flexibilidade, facilidade para engadir novos procesos: gustarache. Por suposto, cómpre prestar moita atención á organización do proxecto, á estabilidade do fluxo de aire en si: os milagres non ocorren.

Agora temos Airflow traballando a diario preto de 6,5 mil tarefas. Teñen un carácter bastante diferente. Hai tarefas de carga de datos no DWH principal de moitas fontes diferentes e moi específicas, hai tarefas de cálculo de escaparates dentro do DWH principal, hai tarefas de publicación de datos nun DWH rápido, hai moitas, moitas tarefas diferentes e Airflow. mastigaos todos día tras día. Falando en números, isto é 2,3 mil Tarefas ELT de complexidade variable dentro de DWH (Hadoop), aprox. 2,5 bases de datos fontes, este é un equipo de 4 desenvolvedores de ETL, que se dividen en procesamento de datos ETL en DWH e procesamento de datos ELT dentro de DWH e, por suposto, máis un administrador, que se ocupa da infraestrutura do servizo.

Planos para o futuro

O número de procesos está crecendo inevitablemente, e o principal que faremos en canto á infraestrutura Airflow é escalar. Queremos construír un clúster Airflow, asignar un par de patas para os traballadores de Celery e facer unha cabeza auto-duplicada con procesos de programación de traballos e un repositorio.

Epílogo

Isto, por suposto, non é todo o que me gustaría contar sobre Airflow, pero tentei destacar os puntos principais. O apetito vén con comer, probalo e gustarache :)

Fonte: www.habr.com

Engadir un comentario