Apache Airflow: Faciens ETL Facilius

Hi, Dmitry Logvinenko sum - Ingeniarius Data Analyticorum Department of the Vezet group of companies.

Dicam tibi de mirabili instrumento ad processuum explicandum ETL - Apache Airflow. Sed Airfluxus tam versatilis et multifacetus est ut propius inspicere debeas etiam si fluit notitia non implicata, sed opus est ut aliquos processus periodice deducat ac monitorem executionis habeat.

Et sic, non solum dico, sed etiam ostendam: programma multum habet codicis, eenshotsscrae et commendationes.

Apache Airflow: Faciens ETL Facilius
Quod plerumque videre cum verbum Airflow google / Wikimedia Commons

mensam de contentis in eodem

introduction

Apache Airflow est sicut Django:

  • in pythonis
  • magna admin panel;
  • expanding in infinitum

- Solo melior, et com- plexus est omnino diversus usus, scilicet (sicut scriptum est ante kat);

  • currendo et vigilantia opera in infinitis machinis (quotquot Apium / Kubernetes et conscientiae tuae tibi permittunt)
  • cum dynamica generationis workflow ab ipso facillime scribere et intelligere Python codicem
  • et facultas coniungendi quasvis databases et APIs inter se utentes tam paratas componentes et plugins domesticas factas (quod est perquam simplex).

Apache Airflow utimur sic:

  • notitias ex variis fontibus colligimus (multae SQL Servae et PostgreSQL instantiae, variae APIs cum applicatione metrica, etiam 1C) in DWH et ODS (habemus Vertica et Clickhouse).
  • quam provectus? cron, quae incipit notitias processus in ODS consolidationis, ac etiam monitores earum sustentationem.

Donec nuper, necessitates nostrae ab uno parvo servo cum 32 coros et 50 GB ipsius RAM tegebantur. In Airflow, haec opera;

  • more 200 dags (laborationes actu, in quibus nos pensa referta);
  • in unaquaque ad mediocris 70 tasks,
  • haec bonitas incipit (etiam in mediocris) semel horae.

Et quomodo tractavimus infra scribemus, nunc definiamus uber-tatem, quam solvemus;

Tres sunt ministri primigenii SQL, singuli cum 50 databases - instantiae unius propositi, respective, eandem structuram habent (fere ubique, mua-ha-ha), quod significat mensam Ordinum unicuique habere (fortunate, mensam cum illa. nomen in aliquo negotio dis fieri potest). Notitias accipimus addendo agrorum servitiorum (principium server, datorum datorum, ETL negotium ID) et in simplice mittemus, inquiunt, Vertica.

Eamus!

Praecipua pars, practica (et parva theorica);

Cur nos (et tu)

Cum arbores erant magnae et ego eram simplex SQL-schik in uno grosso Russico, scammed ETL processuum aka notitia fluit utendo duo instrumenta nobis praesto sint;

  • Informatica Power Center - Ratio diffusa, perquam fertilis, sua ferramenta, sua versione. I% usus sum Deus avertat sui facultatem. Quare? Primum, hoc interface, alicubi ex 1s, mentis premat nos. Secundo, haec contraptio designata est ad valde phantasiam processuum, reuse furiosae componentis et aliae strophae-inceptae valde magnae. De eo quod constat, sicut ala Airbus A380 / anno, nihil dicemus.

    Cave, tortor potest nocere hominibus sub XXX parum

    Apache Airflow: Faciens ETL Facilius

  • SQL Servo Integration Servo — Sodalem hunc in nostrum intra- fluit consilium. Bene enim: iam utimur SQL Servo, et aliquo modo irrationabile esset suis ETL instrumentis uti. Omne quod in eo est bonum est: utrumque interface pulchrum est, et progressus nuntiat... Sed hoc non est cur programmata amamus, oh, non propter hoc. Version it dtsx (quod est XML nodis permixtis nisi) possumus, sed quid punctum? Quomodo sarcina molis facienda est quae centenas mensas ab uno servo ad alium trahet? Etiam, quam centum, index digitus tuus a viginti argenteis decidet, strepitando in muscipulo. Sed plane magis modum spectat:

    Apache Airflow: Faciens ETL Facilius

Nos certe vias exspectavimus. Si even fere SSIS factum est ad sarcina auto-scripta generantis ...

...et deinde officium novum invenerunt me. Ac Apache Airflow advenerunt me in ea.

Cum invenerim descriptiones processus ETL Python esse simplices codicem, modo non saltas prae gaudio. Haec quomodo data rivi sunt versiones et diffusae, et mensas cum una structura e centenis datorum in unum scopis fundentes facta sunt in codice Pythonis in uno et dimidio vel in duobus 13 "tegumenta".

Congregans botrum portassent

Donec non componamus, nec de rebus omnino manifestis hic loquamur, sicut inaugurari Airflow, electa datorum, Apium et alii casus navales descriptos.

Ut experimenta statim incipiamus, adumbravi docker-compose.yml in quibus:

  • Sit scriptor actually suscitare airflow: Scheduler, Webserver. Flos etiam ibi nere erit ad operas Apii monitoris (quia iam in pulsus est apache/airflow:1.10.10-python3.7Sed non sapiunt)
  • PostgreSQL, in quo Airflow scribet informationes muneris eius (schedularum notitiarum, exsecutionis statisticarum, etc.), et Apium opera peracta notabis;
  • redisqui pro Apii munere proxenetarum aget;
  • Apium operariumquae in directo negotiorum executione versabitur.
  • Ut folder ./dags Documenta nostra cum daculorum descriptione addemus. In musca lecta erunt, ut post unumquodque sternumentum iactare non opus sit totum acervum.

In nonnullis locis, signum in exemplis non omnino ostenditur (ne textum clutem) sed alicubi in processu modificatum sit. Complete opus codice exempla reperiantur in eclesiae reposito https://github.com/dm-logv/airflow-tutorial.

Docker compose.yml,

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

dictis:

  • In compositionis conventu, in imagine nota maxime fiebam puckel / docker-airflow — Vide eum e coercere. Fortasse non opus est tibi aliud in vita tua.
  • Omnes Airflow occasus praesto sunt non solum per " airflow.cfgsed etiam per mauris varius, tincidunt quam sed, commodo quam.
  • Naturaliter productione parata non est: consulto cordis pulsationes in vasis non posuit, securitati non molestus sum. Sed minimum idoneus feci experimentis nostris.
  • Nota quod:
    • Pugilla folder pervia esse debet tam schedulis quam opificibus.
    • Eadem ratio est de omnibus bibliothecis tertiae factionis - omnes in machinis cum schedulis et operariis institui debent.

Age, iam simplex est;

$ docker-compose up --scale worker=3

Postquam omnia oriuntur, interretialem telam intueri potes:

basic informatio

Si in his omnibus "dags" aliquid non intellexisti, hic breve dictionarium est:

  • Scheduler - praecipui avunculi in Airflow, continentes robots laborare, et non persona: monitores schedulae, dos updates, opera immittit.

    Fere in codicibus vetustioribus, problemata memoriae (non, non lapsus, sed scillam) habuit, et legatorum modulus etiam in ficubus manebat. run_duration - suum sileo intervallum. Nunc autem omnia bene.

  • DAG (aka "dag") - "graphi acyclici directi", sed haec definitio paucis hominibus indicabit, re vera continens est officiorum inter se mutuo (vide infra) vel analogum sarcinae in SSIS et Workflow in Informatica. .

    Praeter dags, subdags adhuc esse, sed verisimile non est ad eos.

  • DAG Run - dag initialized, quae propria est assignata execution_date. Dagrans eiusdem dag in parallela operari potest (si opera tua idempotent, scilicet).
  • operator sunt pieces of code responsible pro specifica actione faciendo. Tria genera operariorum sunt:
    • actionissicut nostri ventus PythonOperatorqui potest facere aliquem codicem Python;
    • translatioqui onera portant de loco in locum, dicite; MsSqlToHiveTransfer;
    • sensorem ex altera vero parte, sinet te agere vel retardare ulteriorem executionem pedis, donec id fiat. HttpSensor certum terminum trahere potest, et, cum optata responsio exspectat, translationem committitur GoogleCloudStorageToS3Operator. Quaeret mens curiosa: quare? Ceterum repetitiones facere potes in operante!" Ac deinde, ne tardant negotia piscinae cum operariis suspensis. Sensor incipit, compescit et perit ante inceptum proximum.
  • negotium - Declarati operarii, cuiuscumque generis, ac pugionis addicti, ad gradum muneris promoventur.
  • negotium exempli gratia - cum consiliarius imperator placuit operas in praelium mittere operas operarum (recte eo loco, si utimur. LocalExecutor aut remotis nodi apud of * CeleryExecutor), eis contextum assignat (i.e., statuto variabilium - executionum parametrorum), mandatum vel interrogationes templates dilatat atque eas infundit.

Nos generare tasks

Primum formam generalem dougis nostri delineemus, deinde magis ac magis in singula intenderemus, quia solutiones aliquas non parvas applicamus.

Sic in forma simplicissima talis sica videbit sic:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Let's figure is out:

  • Primo, lib Cetera;
  • sql_server_ds - eam List[namedtuple[str, str]] nominibus nexuum ex Airflow Connexionibus et databases e quibus laminam nostram capiemus;
  • dag - nostri dag nuntiatio, quae necesse est esse globals()Aliter Airflow non inveniet. Doug etiam opus est dicere;
    • Quis est nomine orders - Hoc nomen tunc apparebit in instrumento interretiali;
    • e media nocte ad viii Idus Julii operabitur;
    • et currere debet circa singulas 6 horas (pro lenta guys hic pro timedelta() admittenda cron-line 0 0 0/6 ? * * *, ad minus frigidum @daily);
  • workflow() principalis officium faciet, sed non nunc. Nunc enim contextum nostrum in lignum modo effundemus.
  • Et nunc ad simplices magicas operas creandas;
    • per fontes nostros currimus;
    • initialize " PythonOperatorqui faciet phantasma nostrum workflow(). Noli oblivisci singularem (intra dag) nomen negotii denotare et ipsum pugionem ligare. Flag provide_context alias rationes in munere fundemus, quas diligenter utendo colligemus **context.

Nam id nunc est. Quod cepimus;

  • puga nova in instrumento interretiali;
  • unum et dimidium centum officia quae in parallelis exsecutioni erunt (si Airflow, apium occasus ac servientis facultatem permittunt).

Bene fere obtinuit.

Apache Airflow: Faciens ETL Facilius
Quis clientelas instituet?

Ut hoc totum simpliciorem redderet, adveni docker-compose.yml processus requirements.txt omnibus nodis.

Nunc abiit:

Apache Airflow: Faciens ETL Facilius

Cinerea quadrata sunt instantiarum operarum quae a schedula procedebant.

Aliquantum exspectamus, opera ab operariis abripiuntur;

Apache Airflow: Faciens ETL Facilius

Virentes, opus sane feliciter consummarunt. Reds non est valde felix.

Viam folder pro nostro iaculo non est ./dags, nulla est synchronisatio inter machinis - omnes dags jacere git nostro Gitlab, et Gitlab CI updates to machinas distribuit cum in bus master.

Paulo de Flos

Dum opifices nostros pacifientes terunt, recordemur aliud instrumentum quod aliquid nobis ostendere potest - Flos.

Ipsa prima pagina cum summario notitiarum de nodis laborantis:

Apache Airflow: Faciens ETL Facilius

Intensissima pagina cum operibus quae ad opus iverunt;

Apache Airflow: Faciens ETL Facilius

Maxime amet pagina cum status nostri sectoris:

Apache Airflow: Faciens ETL Facilius

Pagina clarissima est cum munere status graphi et temporis exsecutionis;

Apache Airflow: Faciens ETL Facilius

Nos oneratis underloaded

Sic omnia opera elaborata, vulneratum auferre potes.

Apache Airflow: Faciens ETL Facilius

Multi vulnerati sunt, ob aliam causam. In casu recto usu Airflow, haec ipsa quadrata indicant notitias definite non pervenisse.

Opus est stipem spectare et instantiarum lapsus negotium sileo.

Strepitando in platea aliqua, actiones nobis praesto videbimus:

Apache Airflow: Faciens ETL Facilius

Potes tollere et purgare lapsos. Hoc est, obliviscimur aliquid ibi defecisse, idemque negotium in schedula ibit.

Apache Airflow: Faciens ETL Facilius

Patet hoc facere cum mure cum omnibus quadratis rubris non valde humanum est - hoc non est quod exspectamus ab Airflow. Naturaliter arma exstinctionis habemus; Browse/Task Instances

Apache Airflow: Faciens ETL Facilius

Colligamus omnia simul et ad nihilum reset, preme item rectam:

Apache Airflow: Faciens ETL Facilius

Post purgationem, taxis nostri hoc simile spectant (iam exspectant schedulas ut RATIONARIUM mittant);

Apache Airflow: Faciens ETL Facilius

Nexus, hami et aliae variabiles

Tempus est spectare proximum DAG; update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Quisque semper fama renovatio? Haec est illa iterum: est index fontium unde notitias adipiscamur; ubi ponere album est; non obliviscar sonare cum omnia contigerunt aut ruperunt (bene, hoc non est de nobis, non est).

Transeamus iterum per tabellam et nova aspiciamus obscura;

  • from commons.operators import TelegramBotSendMessage - Nihil obstat quominus operatores nostri efficiantur, quibus usus est ut parvam fabricam praebendo nuntiis ad Unblocked transmittat. (De hoc auctore infra plura loquemur);
  • default_args={} - dag eadem argumenta omnibus operariis suis distribuere potest;
  • to='{{ var.value.all_the_kings_men }}' - agrum to non induravimus, sed dynamice generavimus utendo Jinja et variabili cum indice inscriptionum, quam diligenter inserui. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — conditio incipiendi operantis. In casu nostro, litterae bullae advolabunt si omnes clientelas elaboraverunt feliciter;
  • tg_bot_conn_id='tg_main' - rationes conn_id accipere nexum IDs quod creare in Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Mandata in telegraphum avolabunt tantum si officia inciderunt;
  • task_concurrency=1 - Prohibemus simultaneam sollicitudinem plurium instantiarum unius operis. Alioquin simultaneum plurium deducemus VerticaOperator (aspiciens unam mensam);
  • report_update >> [email, tg] - omnis VerticaOperator convenire mittendis litteris nuntiisque, hoc modo:
    Apache Airflow: Faciens ETL Facilius

    Sed cum notificator operariorum condiciones Lorem diversas habent, unus tantum laborabit. In Arbor Visum, omnia paulo minus visualia spectant;
    Apache Airflow: Faciens ETL Facilius

De paucis dicam macros et amicis suis - variabilium.

Macros Jinja collocatores sunt qui varias utiles informationes in rationes operatorium substituere possunt. Exempli gratia;

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} et expand ad contenta in contextu variabilis execution_date in forma YYYY-MM-DD: 2020-07-14. Optima pars est quod contextus variabilium ad certae instantiae (quadratum in Arbor View), et si restarted, loci possessores ad eadem bona expandent.

Valores assignati considerari possunt utens bullam redditam in unaquaque re instantia. Hoc est negotium quomodo litteras mittit;

Apache Airflow: Faciens ETL Facilius

Itaque ad negotium mittendo nuntium;

Apache Airflow: Faciens ETL Facilius

Integrum indicem constructum in macros pro emendatione novissima available hic praesto est: utentis reference

Praeterea auxilio plugins proprias macros declarare possumus, sed alia fabula est.

Praeter res praedefinitas, valores variabilium nostrorum substituere possumus (hoc iam in codice supra usi sumus). Faciamus Admin/Variables duobus rebus;

Apache Airflow: Faciens ETL Facilius

Omnia uti potes:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Valor scalaris esse potest, vel etiam JSON. In casu JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

iustus utor semita ad desideravit clavis: {{ var.json.bot_config.bot.token }}.

Dicam litteram unum verbum et ostendam unum tortor de coniunctionem. Omnia elementaria hic sunt: ​​in pagina Admin/Connections nexum creamus, logins nostros / Tesserae et parametros specialiores ibi addimus. sic:

Apache Airflow: Faciens ETL Facilius

Passwords encrypted possunt (propressius quam defaltam) vel nexum generis excedere potes (sicut per egi tg_main) - re vera indicem typorum fere in exemplorum Airfluviorum propensum esse ac divulgari non posse quin in fontes fontales (si subito aliquid google non egi, corrige me quaeso), sed nihil obstabit quominus credita mox accipienda sint. nomen.

Plures etiam coniunctiones facere cum eodem nomine potes: hoc in casu, methodus BaseHook.get_connection()qui nos hospites nominatim tribuet random ex pluribus nominibus (magis logicum esset ut Round Robin, sed eam ex conscientia Airflow tincidunt relinquamus).

Variabiles et nexus instrumenta certe frigida sunt, sed grave est stateram non amittere: quas partes fluxus tui in ipso codice repones, et quas partes Airflow pro reponendo das. Ex altera parte, commode poterit valorem celeriter mutare, exempli gratia, cistae mail, per UI. E contra, hoc adhuc ad murem deprime, ex quo (I) carere voluimus.

Lorem nexus est opus hami. In genere, hami Airflow sunt puncta ut eam connectant ad officia et bibliothecas tertiae partis. Eg, JiraHook clientem nobis aperiet ut penitus cum Jira (munera ultro citroque movere possis), et ope SambaHook ventilabis file loci ad vos can smb-point.

Parsing consuetudo operator

Et prope accedimus ad spectandum quomodo factum est TelegramBotSendMessage

Code commons/operators.py cum ipsa operante;

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hic, sicut cetera in Airflow, omnia simplicissima sunt;

  • hereditarium BaseOperator, quae instrumenta admodum pauca Airflowit utilia (vide ad otium tuum)
  • Declaravit agros template_fieldsin quo Jinja macros processuum exspectabit.
  • Recta argumenta pro __init__()defaltam, ubi opus fuerit, pone.
  • Neque nos de initializatione antecessoris vel obliti sumus.
  • Aperuit respondentem hamo TelegramBotHookaccepit clientem objectum.
  • Overridden (redemptionem) ratio BaseOperator.execute(), quam Airfow vellicabit cum tempus venit ad operantem deducendum - in ea principalem actionem perficiemus, oblitus log in. (Nos log in, obiter, rectum stdout и stderr - Airfluxus omnia intercipiet, illud pulchre involvet, ubi necesse est putrescere.

Videamus quid habemus commons/hooks.py. Prima pars tabellae cum ipso unco;

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Nescio etiam quid hic explicem, momenta tantum notabo:

  • Heredes, argumenta cogita - in pluribus unum erit; conn_id;
  • Earundem modi vexillum: ego me limited get_conn(), in quo nominatim parametri nexum obtineo et sectionem mox obtineo extra (Is ager JSON est), in quo ego (secundum mandata mea) telegraphum bot tessera posuit; {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Exemplum nostri creo TelegramBotdans ei speciale signum.

Id omne. Potes accipere clientem ex hamo utens TelegramBotHook().clent aut TelegramBotHook().get_conn().

Et secunda pars tabellae, in qua microwrapper pro telegrapho REST API facio, ut non trahat. python-telegram-bot ad modum unum sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Recta via est adiice omnia; TelegramBotSendMessage, TelegramBotHook, TelegramBot - in plugin, in promptuario publico pone, et da Patefacio.

Dum haec omnia studemus, relationes nostrae feliciter deficere curaverunt et nuntium mihi in canalis errorem mittebant. Im 'iens ut vide si iniuriam ...

Apache Airflow: Faciens ETL Facilius
Aliquid in nostro cane fregit! Nonne id quod exspectabamus? Prorsus!

Ecquid infundis?

Sentis aliquid Adfui? Videtur quod pollicitus sit se datam ab SQL Servo Verticam se transferre, et deinde eam cepit ac reiecit, nebulo!

Hoc flagitium erat voluntarium, simpliciter aliquid pro te sumi posse. Iam ulterius progredi potes.

Nostrum consilium hoc erat;

  1. fac dag
  2. Generare tasks
  3. Vide quam pulchra omnia
  4. Assignare sessionem numeri ad implet
  5. Get notitia ex SQL Servo
  6. Data in Vertica posuit
  7. Oratio Statistics

Ita, ut omnia sursum ac currendo adipiscamur, parva additamenta feci nostris docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ibi excitamus;

  • Vertica ut exercitum dwh maxime default occasus;
  • tria exempla SQL Servo,
  • implemus in hoc aliqua notitia databases (nullo modo non introspicias mssql_init.py!)

Omne bonum deprimimus ope paulo implicatior imperio quam ultimo tempore:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Quod miraculum randomizer generatur, uti potes Data Profiling/Ad Hoc Query:

Apache Airflow: Faciens ETL Facilius
Summa res non est ostendere eam analystae

elaborare in ETL sessiones Nolo, omnia ibi levia sunt: ​​turpe facimus, signum est in eo, omnia villico contextu involvimus, hoc nunc agimus;

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tempus venit colligere notitia a nostris una et dimidia centum tabularum. Hoc faciamus ope versuum valde vanorum;

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ope hamo nos ab Airflow pymssql-connect
  2. Restrictionem substituamus in forma cotidiani in petitione - in munus mittetur per machinam templates.
  3. Pascentium petitio nostra pandasquis erit nobis DataFrame — Erit nobis in futurum utile.

Ego uti substitutione {dt} loco petitionem parametri %s non quia malus Pinoculus sum, sed quia pandas non tractamus pymssql et lapsus ultimum unum params: Listquamvis vere velit tuple.
Etiam ut elit pymssql non placuit ei amplius favere et tempus est exire pyodbc.

Videamus quid Airflow referta argumentis nostris functionibus;

Apache Airflow: Faciens ETL Facilius

Si desit notitia, punctum in continuando non est. Sed mirum est etiam saturitatem prosperam considerare. Sed hoc non est error. A-ah, quid agat? Hem quid:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Dicet Airflow qui nullos errores, sed negotium omittimus. Medium interface non habebit quadratum viridis aut rubeum, sed roseum.

Nostra notitia iactare lets plures columnas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

quinque satrapas Philisthinorum:

  • Datorum datorum ex quibus ordines cepimus;
  • Id nostrae sessionis inundationem (diversim erit nam omne opus),
  • Nullam a fonte et ordine ID - ut in datorum finali (ubi omnia in unam tabulam effunduntur) unicum ordinem habemus ID.

Paenultimus gradus manet: effunde in Verticam omnia. Et, satis impariter, unus ex modis maxime spectaculi et efficientis hoc facere est per CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Facimus specialem recipientis StringIO.
  2. pandas et posuit nobis misericordiam DataFrame in forma CSV-linea.
  3. Aperiamus nexum cum hamo ad ventus Verticae nostrae.
  4. Et nunc ope copy() data nostra directe ad Vertika mitte!

Ab auriga quot lineae impletae sunt tollemus, et procuratori sessionis omnia OK esse narrabimus;

session.loaded_rows = cursor.rowcount
session.successful = True

Quod suus omnes.

In venditione, laminam manualem scopo creamus. Hic mihi parva licet machina;

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

usus sum VerticaOperator() Schema database et mensam condo (si non iam exstant, utique). Summa est clientelas recte disponere;

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

quoquo

- Bene, - inquit mus pusillus, - non est, modo?
Persuasumne es me animal in silva foedissimum esse?

Iulia Donaldson

Cogito si collegas meos et certamen habui: qui cito creabit et deducet processum ETL de scabere: illi cum SSIS et mure et me cum Airflow. Wow, puto te consentire quod omnes frontes percutiam!

Si paulo gravius, tum Apache Airflow - describendo processus in forma programmatis codice - officium meum fecerunt magis commodius ac jucundius.

Eius extensio illimitata, tum propter obturaculum et proclivitas ad scalability, facultatem tibi tribuit utendi Airflow in quavis fere area: etiam in pleno cyclo colligendi, parandi et dispensandi data, etiam in scopulis deducendis (Marti, cf. cursum).

Pars finalis, relatio et notitia

Sarculum collegimus tibi

  • start_date. Ita est, iam est quentiam localem. Via Doug scriptor principalis argumentum start_date omnia praetereunt. Breviter, si definias start_date current date, et schedule_interval - Unus dies, tunc DAG incipiet cras non prius.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Nulla ac volutpat augue.

    Est alius error runtime sociatus; Task is missing the start_date parameterquod saepissime indicat oblitus es te ligare ad pyxidem operatorem.

  • Omnia in unum machina. Ita, et bases (Arisflow ipse et nostra coatingis), et telarum cultor, et schedulae, et operae sunt. Et vel elaboraret. Super tempore autem numerus officiorum pro officiis crevit, et cum PostgreSQL respondere coepit indice in 20 s pro 5 ms, cepimus et asportavimus.
  • LocalExecutor. In eo adhuc sedemus, et ad extremam abyssi iam pervenimus. LocalExecutor nobis hactenus satis fuit, sed nunc tempus est cum uno saltem artifice dilatare, et ad CeleryExecutorem movere laborandum erit. Quod cum in una machina operari possis, nihil obstat quominus Apium etiam in servulo utatur, quod "sane nunquam in productionem honeste ingredietur!"
  • Non usu constructum-in tools:
    • Contrahentes documentorum officium condere;
    • SLA Misses respondeat muneribus quae non elaborant in tempore;
    • xcom nam metadata commutationem (dixi metadata!) inter dag pensa.
  • Convicium epistularum. Hem, quid dicam? Erecti omnes repetitiones operum lapsorum positae sunt. Nunc opus meum Gmail invenias >90k epistulas ex Airflow habet, et rostrum interretialem renuit tollere ac delere plus quam 100 ad tempus.

Plures foveae; Apache Airflow Pitfails

Plura instrumenta automation

Ut nos magis capitibus nostris et non manibus laborare, Airflow hoc nobis paravit;

  • API CETEROQUIN — Status Experimentalis adhuc habet, qui eum operari non impedit. Cum ea, non solum informationes de daculis et operibus acquirere potes, sed etiam pugionem/stare/incipere, DAG Run vel stagnum creare.
  • CLI - Multa instrumenta per lineam mandatorum praesto sunt quae non solum per WebUI uti incommodum sunt, sed plerumque desunt. Exempli gratia:
    • backfill opus est ut sileo negotium exempla.
      Verbi gratia, coniectores venientes dixerunt: “Et tu, comes, nugas habes in notitia a die 1 ianuarii ad 13! Figere, reficere, reficere, reficere!" Et talis es HOB:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Vile officium; initdb, resetdb, upgradedb, checkdb.
    • runquae permittit ut unum instantia negotium curras, atque etiam in omnes clientelas detrahas. Praeterea, per hoc currere potes LocalExecutoretsi habeas botrum apium.
    • Idem fere facit testtantum in basibus nihil scribit.
    • connections permittit massam creationem hospites e testa.
  • API pythonem - Via difficilior inter se occurrunt, quae plugins destinata est, et in ea non minutulis manibus examinatur. Sed quis prohibet nos ire ad /home/airflow/dags, Curre ipython et circa satus officere? Exempli gratia potes omnes nexus cum codice sequenti exportare:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connectens cum metadatabase Airflow. Non commendo scripturam ad eam, sed negotium asserit pro variis metricis specificis multo velocius et facilius esse quam per quemlibet APIs.

    Dicamus quod non omnia opera nostra idempotent esse, sed aliquando cadere possunt, et hoc est normale. Sed paucae obstructiones suspectae iam sunt et inhibere necesse esset.

    Cave SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

References

Et sane, primos decem nexus ab editae Google contenta sunt contenta folder Airflow ex notis meis.

Et nexus usus in articulo:

Source: www.habr.com