Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

Hola Habr! En aquest article, vull parlar d'una gran eina per desenvolupar processos de processament de dades per lots, per exemple, a la infraestructura d'un DWH corporatiu o el vostre DataLake. Parlarem d'Apache Airflow (en endavant, Airflow). Ell està injustament privat d'atenció a Habré i, en la part principal, intentaré convèncer-vos que, almenys, val la pena tenir en compte Airflow quan escolliu un programador per als vostres processos ETL / ELT.

Anteriorment, vaig escriure una sèrie d'articles sobre el tema DWH quan treballava a Tinkoff Bank. Ara he entrat a formar part de l'equip de Mail.Ru Group i estic desenvolupant una plataforma per a l'anàlisi de dades a l'àrea de jocs. De fet, a mesura que apareguin notícies i solucions interessants, l'equip i jo parlarem aquí de la nostra plataforma d'anàlisi de dades.

Pròleg

Així doncs, comencem. Què és Airflow? Això és una biblioteca (o conjunt de biblioteques) per desenvolupar, planificar i supervisar els fluxos de treball. La característica principal d'Airflow és que el codi Python s'utilitza per descriure (desenvolupar) processos. Això té molts avantatges per organitzar el vostre projecte i desenvolupament: de fet, el vostre projecte ETL (per exemple) és només un projecte Python i podeu organitzar-lo com vulgueu, tenint en compte les característiques de la infraestructura, la mida de l'equip i altres requisits. . Instrumentalment, tot és senzill. Utilitzeu per exemple PyCharm + Git. És genial i molt convenient!

Mirem ara les principals entitats de Airflow. Un cop entès la seva essència i propòsit, organitzaràs de manera òptima l'arquitectura del procés. Potser l'entitat principal és el gràfic acíclic dirigit (d'ara endavant DAG).

DAG

DAG és una associació semàntica de les vostres tasques que voleu completar en una seqüència estrictament definida en una programació específica. Airflow presenta una interfície web convenient per treballar amb DAG i altres entitats:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

DAG podria semblar així:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

El desenvolupador, quan dissenya un DAG, estableix un conjunt d'operadors sobre els quals es construiran les tasques dins del DAG. Aquí arribem a una altra entitat important: l'operador de flux d'aire.

Operadors

Un operador és una entitat a partir de la qual es creen instàncies de treball, que descriu què passarà durant l'execució d'una instància de treball. Alliberaments de flux d'aire de GitHub ja conté un conjunt de sentències llestes per ser utilitzades. Exemples:

  • BashOperator és un operador per executar una ordre bash.
  • PythonOperator és un operador per cridar codi Python.
  • EmailOperator: operador per enviar correu electrònic.
  • HTTPOperator: un operador per treballar amb peticions http.
  • SqlOperator és un operador per executar codi SQL.
  • Sensor és un operador per esperar un esdeveniment (l'arribada de l'hora desitjada, l'aparició del fitxer requerit, una fila a la base de dades, una resposta de l'API, etc., etc.).

Hi ha operadors més específics: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

També podeu desenvolupar operadors que s'adaptin a les vostres necessitats i utilitzar-los en el vostre projecte. Per exemple, hem creat MongoDBToHiveViaHdfsTransfer, un operador per exportar documents de MongoDB a Hive i diversos operadors per treballar amb Feu clic a Casa: CHLoadFromHiveOperator i CHTableLoaderOperator. De fet, tan bon punt un projecte ha utilitzat freqüentment codi basat en declaracions bàsiques, podeu pensar a compilar-lo en una instrucció nova. Això simplificarà el desenvolupament posterior i afegireu a la vostra biblioteca d'operadors del projecte.

A més, s'han de realitzar totes aquestes instàncies de tasques, i ara parlarem del planificador.

Programador

El programador de tasques a Airflow està integrat Api. Celery és una biblioteca de Python que us permet organitzar una cua més l'execució asíncrona i distribuïda de tasques. Des del costat del flux d'aire, totes les tasques es divideixen en grups. Les piscines es creen manualment. Per regla general, el seu propòsit és limitar la càrrega de treballar amb la font o escriure tasques dins del DWH. Les piscines es poden gestionar mitjançant la interfície web:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

Cada grup té un límit en el nombre de ranures. Quan es crea un DAG, se li dóna un grup:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

L'agrupació establerta al nivell DAG es pot substituir al nivell de tasca.
Un procés independent, Scheduler, s'encarrega de programar totes les tasques a Airflow. En realitat, el Programador s'ocupa de tota la mecànica de la configuració de tasques per a l'execució. Una tasca passa per diverses etapes abans de ser executada:

  1. Les tasques anteriors s'han completat al DAG, se'n pot posar una de nova.
  2. La cua s'ordena en funció de la prioritat de les tasques (també es poden controlar les prioritats) i si hi ha un espai lliure a la piscina, la tasca es pot portar a treballar.
  3. Si hi ha un api obrer lliure, se li envia la tasca; comença el treball que has programat a la tasca, utilitzant un o altre operador.

Prou senzill.

El programador s'executa en un conjunt de tots els DAG i totes les tasques dels DAG.

Perquè el programador comenci a treballar amb el DAG, el DAG ha d'establir una programació:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Hi ha un conjunt de preajustos ja fets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

També podeu utilitzar expressions cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data d'execució

Per entendre com funciona Airflow, és important entendre què és una data d'execució per a un DAG. El DAG de flux d'aire té la dimensió Data d'execució, és a dir, en funció del calendari de treball del DAG, es creen instàncies de tasques per a cada Data d'execució. I per a cada Data d'execució, les tasques es poden tornar a executar, o, per exemple, un DAG pot funcionar simultàniament en diverses Dates d'execució. Això es mostra clarament aquí:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

Malauradament (o potser afortunadament: depèn de la situació), si la implementació de la tasca al DAG és correcta, l'execució a la Data d'execució anterior anirà amb els ajustos. Això és bo si necessiteu tornar a calcular dades en períodes passats amb un algorisme nou, però és dolent perquè es perd la reproductibilitat del resultat (per descomptat, ningú es molesta a tornar la versió necessària del codi font de Git i calcular el que necessita una vegada, segons sigui necessari).

Generació de tasques

La implementació del DAG és codi Python, de manera que tenim una manera molt còmoda de reduir la quantitat de codi quan es treballa, per exemple, amb fonts fragmentades. Suposem que teniu tres fragments de MySQL com a font, heu d'enfilar-vos a cadascun i recollir algunes dades. I de manera independent i paral·lela. El codi Python del DAG podria semblar així:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

El DAG té aquest aspecte:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

Al mateix temps, podeu afegir o eliminar un fragment simplement ajustant la configuració i actualitzant el DAG. Còmode!

També podeu utilitzar una generació de codi més complexa, per exemple, treballar amb fonts en forma de base de dades o descriure una estructura tabular, un algorisme per treballar amb una taula i, tenint en compte les característiques de la infraestructura DWH, generar el procés. de carregar N taules al vostre emmagatzematge. O, per exemple, treballant amb una API que no admet treballar amb un paràmetre en forma de llista, podeu generar N tasques al DAG utilitzant aquesta llista, limitar el paral·lelisme de sol·licituds a l'API a un grup i extreure'n les dades necessàries de l'API. Flexible!

repositori

Airflow té el seu propi repositori backend, una base de dades (potser MySQL o Postgres, tenim Postgres), que emmagatzema els estats de les tasques, DAG, paràmetres de connexió, variables globals, etc., etc. Aquí m'agradaria dir que el repositori a Airflow és molt senzill (unes 20 taules) i convenient si voleu construir-hi algun dels vostres processos. Recordo 100500 taules al repositori d'Informatica, que s'havien de fumar durant molt de temps abans d'entendre com crear una consulta.

Seguiment

Donada la senzillesa del repositori, podeu crear un procés per supervisar les tasques que us convingui. Utilitzem un bloc de notes a Zeppelin, on mirem l'estat de les tasques:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

També pot ser la interfície web del propi Airflow:

Airflow és una eina per desenvolupar i mantenir processos de processament de dades per lots de manera còmoda i ràpida

El codi Airflow està obert, així que hem afegit una alerta a Telegram. Cada instància de la tasca en execució, si es produeix un error, envia correu brossa al grup de Telegram, on està format tot l'equip de desenvolupament i suport.

Obtenim una resposta ràpida a través de Telegram (si cal), a través de Zeppelin: una imatge general de les tasques a Airflow.

En total

El flux d'aire és de codi obert en primer lloc, i no n'esperis miracles. Estigueu preparat per dedicar el temps i l'esforç per construir una solució que funcioni. Un objectiu de la categoria d'assolible, creieu-me, val la pena. Velocitat de desenvolupament, flexibilitat, facilitat per afegir nous processos: us encantarà. Per descomptat, cal prestar molta atenció a l'organització del projecte, a l'estabilitat del treball del mateix Airflow: no hi ha miracles.

Ara tenim Airflow funcionant diàriament unes 6,5 mil tasques. Són de naturalesa força diferents. Hi ha tasques per carregar dades al DWH principal de moltes fonts diferents i molt específiques, hi ha tasques per calcular aparadors dins del DWH principal, hi ha tasques per publicar dades en un DWH ràpid, hi ha moltes, moltes tasques diferents, i Airflow els mastega tot dia rere dia. Parlant en xifres, això és 2,3 mil Tasques ELT de complexitat variable dins de DWH (Hadoop), aproximadament 2,5 bases de dades fonts, aquesta és una comanda de 4 desenvolupadors d'ETL, que es divideixen en processament de dades ETL a DWH i processament de dades ELT dins de DWH i, per descomptat, més un administrador, que s'ocupa de la infraestructura del servei.

Plans per al futur

El nombre de processos està creixent inevitablement, i el que farem principalment pel que fa a la infraestructura Airflow és escalar. Volem crear un clúster de flux d'aire, assignar un parell de potes per als treballadors d'Api i fer un capçal duplicat amb processos de programació de treballs i un repositori.

Epíleg

Això, per descomptat, està lluny de tot el que m'agradaria parlar sobre Airflow, però he intentat destacar els punts principals. La gana ve amb menjar, prova-ho i t'agradarà 🙂

Font: www.habr.com

Afegeix comentari