Socodka hawada Apache: Ka dhigida ETL mid sahlan

Hi, Waxaan ahay Dmitry Logvinenko - Injineer Xogta ee Waaxda Falanqaynta ee kooxda Vezet ee shirkadaha.

Waxaan kuu sheegi doonaa qalab cajiib ah oo loogu talagalay horumarinta hababka ETL - Apache Airflow. Laakin socodka hawadu waa mid aad u kala duwan oo dhinacyo badan leh oo waa inaad si dhow u eegtaa xitaa haddii aadan ku lug lahayn socodka xogta, laakiin aad u baahan tahay inaad si xilliyo ah u bilowdo wax kasta oo habab ah oo aad la socoto fulinta.

Oo haa, ma sheegi doono oo kaliya, laakiin sidoo kale muujin: barnaamijku wuxuu leeyahay kood badan, sawirro iyo talooyin.

Socodka hawada Apache: Ka dhigida ETL mid sahlan
Waxa aad inta badan aragto marka aad Google ka gasho kelmadda Airflow / Wikimedia Commons

Tusmada

Horudhac

Apache Airflow waa sida Django:

  • ku qoran Python
  • waxaa jira admin panel weyn,
  • la ballaarin karo si aan xad lahayn

- kaliya ka fiican, waxaana loo sameeyay ujeedooyin kala duwan, kuwaas oo ah (sida ku qoran kat ka hor):

  • socodsiinta iyo la socodka hawlaha tiro aan xadidnayn oo mashiinno ah (sida badan oo Selery / Kubernetes ah iyo damiirkaagu ayaa kuu oggolaanaya)
  • oo leh jiil shaqo firfircoon oo aad u fudud in la qoro oo la fahmo koodka Python
  • iyo awoodda lagu xidho xog kasta iyo API-yada midba midka kale iyadoo la adeegsanayo qaybaha diyaarsan iyo fiilooyinka guriga lagu sameeyay (taas oo aad u fudud).

Waxaan u isticmaalnaa Apache Airflow sida tan:

  • waxaan ka soo aruurineynaa ilo kala duwan (tusaale badan oo SQL Server iyo PostgreSQL ah, API-yo kala duwan oo leh cabbirada codsiga, xitaa 1C) ee DWH iyo ODS (waxaan leenahay Vertica iyo Clickhouse).
  • sidee u hormartay cron, kaas oo ka bilaaba hababka xog ururinta ee ODS, oo waliba la socda dayactirkooda.

Ilaa dhawaan, baahiyahayaga waxaa daboolay hal server oo yar oo leh 32 kore iyo 50 GB oo RAM ah. Socodka Hawada, tani waxay ka shaqeysaa:

  • dheeraad ah oo ku 200 oo dag (dhab ahaantii socodka shaqada, kaas oo aanu ku buuxinay hawlaha),
  • mid walba celcelis ahaan 70 hawlood,
  • Wanaaggani wuxuu bilaabmayaa (sidoo kale celcelis ahaan) saacadii mar.

Iyo sida aan u ballaarinay, hoos ayaan ku qori doonaa, laakiin hadda aan qeexno über-problem in aan xallin doono:

Waxaa jira saddex SQL Servers oo asal ah, mid kastaa wuxuu leeyahay 50 xog ururin - tusaale ahaan hal mashruuc, siday u kala horreeyaan, waxay leeyihiin qaab isku mid ah (ku dhawaad ​​​​meel kasta, mua-ha-ha), taas oo macnaheedu yahay in mid kastaa leeyahay miis amar ah (nasiib wanaag, miis leh taas magaca waxaa lagu riixi karaa ganacsi kasta). Waxaan qaadanaa xogta annagoo ku darayna goobaha adeegga (Serer-ka isha, xogta isha, Aqoonsiga shaqada ETL) oo si fudud ugu tuurno, dheh, Vertica.

Aan tagno!

Qaybta ugu muhiimsan, wax ku ool ah (iyo aragti yar)

Maxaynu (iyo adiga)

Marka geeduhu way weynaayeen oo aan fududaa SQL-schik mid ka mid ah tafaariiqda Ruushka, waxaanu ku khiyaanay hababka ETL aka socodka xogta iyadoo la adeegsanayo laba qalab oo noo diyaar ah:

  • Xarunta Korontada ee Informatica - nidaam aad u faafa, aad u wax soo saar leh, leh qalab u gaar ah, qaabayntiisa. Waxaan isticmaalay ilaah haku xafido 1% awoodeeda. Waa maxay sababtu? Waa hagaag, marka hore, interface this, meel ka 380s, maskax ahaan cadaadis nagu saaray. Marka labaad, ka-hortaggani waxa loogu talagalay hannaan-socod aad u qurux badan, dib-u-isticmaalka qaybaha cadhada leh iyo khiyaamooyinka kale ee-ganacsiga-muhiimka ah. Ku saabsan xaqiiqda ah inay ku kacayso, sida garabka Airbus AXNUMX / sanadka, waxba ma odhan doono.

    Iska jir, sawirku wax yar ayuu wax yeeli karaa dadka ka yar 30 sano

    Socodka hawada Apache: Ka dhigida ETL mid sahlan

  • SQL Server isdhexgalka - Waxaan u isticmaalnay saaxiibkan socodka mashruuca dhexdiisa. Hagaag, dhab ahaantii: waxaan horay u isticmaalnay SQL Server, waxayna noqon doontaa si uun caqli gal ah inaanan isticmaalin qalabkeeda ETL. Wax kasta oo ku jira waa wanaagsan yihiin: labadaba interface-ku waa qurux badan yahay, iyo warbixinnada horumarka ... Laakiin tani maaha sababta aan u jecelnahay alaabta software, oh, maahan tan. Noocee dtsx (kaas oo ah XML leh qanjidhada la isku shaandheeyay ee kaydinta) waan awoodnaa, laakiin waa maxay macnaha? Sidee ku saabsan samaynta xirmo hawleed ka soo jiidaya boqollaal miis mid ka mid ah server-ka mid kale? Haa, waa maxay boqol, fartaadu waxay ka dhici doontaa labaatan qaybood, adigoo gujinaya badhanka jiirka. Laakiin xaqiiqdii waxay u egtahay moodada:

    Socodka hawada Apache: Ka dhigida ETL mid sahlan

Dhab ahaantii waxaan raadinay waddooyin looga baxo. Xataa xitaa ku dhawaad u yimid qalab-dhaliye xirmo SSIS iskiis u qoray...

…ka dibna shaqo cusub ayaa i heshay. Iyo Apache Airflow ayaa igu dul maray.

Markii aan ogaaday in sharraxaadda habka ETL ay yihiin koodka Python fudud, kaliya kumaan dheel dheel farxad darteed. Tani waa sida qulqulka xogta loo habeeyey oo loo kala qaybiyey, iyo ku shubista miisaska hal qaab oo ka yimid boqolaal xog ururin ah oo la geliyay hal bartilmaameed ayaa noqday arrin ku saabsan code Python hal iyo badh ama laba 13 "shashooyin.

Ururinta kooxda

Yaynaan qabanqaabinin dugsiga barbaarinta oo dhammaystiran, oo yeynan ka hadlin waxyaalaha muuqda ee halkan ku yaal, sida rakibida hawada hawada, xogta aad dooratay, Selery iyo kiisaska kale ee lagu sifeeyay doomaha.

Si aan isla markiiba u bilaabi karno tijaabooyinka, ayaan sawiray docker-compose.yml kaas oo:

  • Aynu dhab ahaantii kor u qaadno Socodka hawada: Jadwalka, Webserver. Ubaxa ayaa sidoo kale ku wareegaya halkaas si uu ula socdo hawlaha Selery (maxaa yeelay mar hore ayaa lagu riixay apache/airflow:1.10.10-python3.7, laakiin waxba nagama hayso)
  • PostgreSQL, kaas oo Airflow ay ku qori doonto macluumaadka adeeggeeda (xogta jadwalka, tirakoobka fulinta, iwm.), iyo Celery waxay calaamadin doontaa hawlaha la dhammeeyey;
  • Redis, kaas oo u shaqayn doona sidii dalaal hawsha Selery;
  • Shaqaale celery, kaas oo si toos ah ugu hawlgeli doona fulinta hawlaha.
  • Gal gal ./dags waxaanu ku dari doonaa faylalkayaga sharaxaadda dags. Waxaa lagu soo qaadi doonaa duullimaad, markaa looma baahna in la isku rogo dhammaan xirmooyinka ka dib hindhiso kasta.

Meelaha qaarkood, koodhka tusaalooyinka si buuxda looma muujin (si aanay u dhicin qoraalka), laakiin meel ayaa wax laga beddelay habka. Tusaalooyinka koodhka shaqada oo dhamaystiran waxa laga heli karaa kaydka https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Ogeysiis:

  • Isku dhafka halabuurka, waxaan inta badan ku tiirsanaa sawirka si fiican loo yaqaan qulqulka hawo-mareenka / docker-hawo - hubi inaad hubiso. Waxaa laga yaabaa inaadan u baahnayn wax kale noloshaada.
  • Dhammaan goobaha socodka hawadu waxa lagu heli karaa iyada oo keliya maaha airflow.cfg, laakiin sidoo kale iyada oo loo marayo doorsoomayaasha deegaanka (mahadsanid horumarinta), kuwaas oo aan si xaasidnimo ah uga faa'iidaystay.
  • Dabiici ahaan, maaha wax-soo-saar-diyaar ah: Si ula kac ah uma dhigin garaaca wadnaha weelasha, kuma dhibin amniga. Laakiin waxaan sameeyay ugu yaraan ku habboon tijaabiyeyaasheenna.
  • Ogow taas:
    • Galka daggu waa inuu noqdaa mid ay heli karaan jadwalaha iyo shaqaalaha labadaba.
    • Isla sidaas oo kale ayaa khuseeysa dhammaan maktabadaha qolo saddexaad - waa in dhammaantood lagu rakibaa mashiinnada jadwal-dejiye iyo shaqaale leh.

Hagaag, hadda way fududahay:

$ docker-compose up --scale worker=3

Ka dib markii wax walba kor u kacaan, waxaad eegi kartaa interneedka webka:

Fikradaha aasaasiga ah

Haddii aadan waxba ka fahmin dhammaan "daga"kan, markaa halkan waa qaamuus gaaban:

  • Jadwalka - adeerka ugu muhiimsan ee Airflow, kaas oo koontaroolaya in robotyadu ay si adag u shaqeeyaan, oo aan ahayn qof: kormeera jadwalka, cusbooneysiinta dags, bilaabaya hawlaha.

    Guud ahaan, noocyadii hore, wuxuu lahaa dhibaatooyin xagga xusuusta ah (maya, ma aha amnesia, laakiin daadinta) iyo cabbirka dhaxalka xitaa wuxuu ku sii jiray qaab-dhismeedka run_duration - inta u dhaxaysa dib u bilaabashada. Laakiin hadda wax walba waa hagaagsan yihiin.

  • DAG (loo yaqaan "dag") - "garaafka acyclic toosan", laakiin qeexitaan noocan oo kale ah ayaa u sheegi doona dad yar, laakiin dhab ahaantii waa weel loogu talagalay hawlaha isdhexgalka (hoos fiiri) ama analooga Xidhmada SSIS iyo Workflow in Informatica .

    Marka laga soo tago dalagyada, waxaa laga yaabaa inay weli jiraan subdags, laakiin waxay u badan tahay inaanan heli doonin iyaga.

  • DAG Run - dag bilowga ah, kaas oo loo qoondeeyey u gaar ah execution_date. Dagrans of dag la mid ah waxay u shaqayn karaan si isbarbar ah (haddii aad ka dhigtay hawlahaaga mid awood leh, dabcan).
  • Hawlwadeennada waa qaybo ka mid ah kood ka masuulka ah fulinta fal gaar ah. Waxa jira saddex nooc oo hawl-wadeenno ah:
    • tallaabosida aan ugu jecelnahay PythonOperator, kaas oo fulin kara kood kasta (oo ansax ah) Python;
    • wareejinta, kuwaas oo xogta meelba meel u kala qaada, dheh, MsSqlToHiveTransfer;
    • shidma Dhanka kale, waxay kuu ogolaaneysaa inaad ka falceliso ama aad hoos u dhigto fulinta dheeraadka ah ee toogashada ilaa ay dhacdo dhacdo. HttpSensor jiidi kara barta dhamaadka ee la cayimay, iyo marka jawaabta la rabo la sugo, bilow wareejinta GoogleCloudStorageToS3Operator. Maskaxda wax weyddiinta ayaa ku weydiin doonta: “Sabab? Ka dib oo dhan, waxaad ku samayn kartaa ku celcelinta saxda ah ee hawlwadeenka!" Kadibna, si aan loo xirin barkada hawlaha ee hawlwadeennada shaqada laga joojiyay. Dareemuhu wuu bilaabmaa, hubiyaa wuuna dhimanayaa ka hor isku dayga xiga.
  • Hawl - hawl-wadeennada la caddeeyey, nooca ay doonaan ha yihiine, kuna dheggan toorayaasha waxa loo dallacay darajada shaqada.
  • tusaale ahaan hawsha - markii qorsheeyaha guud uu go'aansaday in la joogo waqtigii loo diri lahaa hawlaha dagaalka ee shaqaalaha hawl-wadeenka ah ( isla markiiba goobta, haddii aan isticmaalno LocalExecutor ama meel fog laga galo xaaladdu CeleryExecutor), waxa ay ku meelaysaa macnaha guud iyaga (ie, set of doors - execution standards), balaadhisa amarka ama hambalyada weydiinta, oo isku daraa iyaga.

Waxaan abuurnaa hawlo

Marka hore, aynu dulmarno nidaamka guud ee dhogortayada, ka dibna waxaan u dhex geli doonaa faahfaahin dheeraad ah iyo in ka badan, sababtoo ah waxaan isticmaalnaa xalal aan sahlanayn.

Marka, qaabkeeda ugu fudud, daggu wuxuu u ekaan doonaa sidan:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Aan qiyaasno:

  • Marka hore, waxaan soo dejineynaa libyada lagama maarmaanka ah iyo wax kale;
  • sql_server_ds Waa List[namedtuple[str, str]] oo leh magacyada isku xirka Airflow Connections iyo database-yada aan ka soo qaadan doono saxankeena;
  • dag - ku dhawaaqida daggeena, oo ay tahay inay daruuri noqoto globals()Haddii kale socodka hawadu ma heli doono. Doug sidoo kale wuxuu u baahan yahay inuu yiraahdo:
    • waa maxay magaciisa orders - Magacani wuxuu markaas ka muuqan doonaa interface-ka shabakadda,
    • in uu shaqayn doono laga bilaabo saqda dhexe ee sideedda Luulyo.
    • waana inay socotaa, qiyaastii 6dii saacadoodba mar (Ragga adag ee halkan jooga timedelta() la ogol yahay cron-line 0 0 0/6 ? * * *, kuwa yar qabow - odhaah ah sida @daily);
  • workflow() qaban doona shaqada ugu weyn, laakiin hadda ma aha. Hadda, waxa aanu ku shubi doonaa qoraalkayaga qoraalka.
  • Oo hadda sixirka fudud ee abuurista hawlaha:
    • waxaan ku dhex ordanaa ilahayada;
    • bilaabid PythonOperator, kaas oo fulin doona dummy our workflow(). Ha iloobin inaad qeexdo magac u gaar ah (dag ku dhex jira) hawsha oo xidh xidhidha lafteeda. Calan provide_context markeeda, waxay ku shubi doontaa doodo dheeraad ah shaqada, taas oo aan si taxadar leh u ururin doono isticmaalka **context.

Hadda, waa intaas. Waxaan helnay:

  • dag cusub oo ku jira interface webka,
  • boqol iyo badh hawlood oo si barbar socda loo fulin doono (haddii Hawo socodka, goobaha Selery iyo awoodda adeegaha ay oggolaadaan).

Waa hagaag, ku dhawaad ​​helay.

Socodka hawada Apache: Ka dhigida ETL mid sahlan
Yaa rakibi doona ku-tiirsanaanta?

Si aan u fududeeyo waxan oo dhan, waan soo galay docker-compose.yml farsamaynta requirements.txt dhammaan qanjidhada.

Hadda way tagtay:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Afar geesoodka cawl waa tusaaleyaal hawleed uu farsameeyo jadwaleeyaha.

Waxoogaa waanu sugayna, hawshu waxay soo diyaariyeen shaqaaluhu:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Kuwa cagaarka ah, dabcan, waxay si guul leh u dhameeyeen shaqadooda. Casaanku aad uma guulaysto.

By habka, ma jiraan wax folder on prod our ./dags, Ma jiraan wax wada shaqayn ah oo u dhexeeya mishiinnada - dhammaan daggu way jiifaan git on Gitlab, iyo Gitlab CI waxay u qaybisaa cusboonaysiinta mashiinada marka la isku daro master.

Wax yar oo ku saabsan Ubaxa

Inta ay shaqaaluhu garaacayaan mujurucyadayada, aynu xasuusano qalab kale oo wax ina tusi kara - Ubax.

Bogga ugu horreeya oo leh macluumaad kooban oo ku saabsan qanjidhada shaqaalaha:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Bogga ugu daran ee leh hawlo shaqo galay:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Bogga ugu caajiska badan ee leh heerka dullaalkeena:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Bogga ugu quruxda badan wuxuu wataa garaafyada heerka shaqada iyo waqtigooda fulinta:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Waxaan soo rarnaa kuwa la raray

Markaa, dhammaan hawlihii la shaqeeyay, waxaad qaadi kartaa dhaawaca.

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Oo waxaa jiray dad badan oo ku dhaawacmay - hal sabab ama mid kale. Marka la eego isticmaalka saxda ah ee socodka hawada, meelahan aad u yar waxay muujinayaan in xogtu aysan xaqiiqdii imaan.

Waxaad u baahan tahay inaad daawato diiwaanka oo aad dib u bilowdo tusaalooyinka hawsha dhacay.

Markaad gujiso labajibbaaran kasta, waxaan arki doonaa ficilada diyaar noo ah:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Waad qaadan kartaa oo nadiifin kartaa wixii dhacay. Taasi waa, waxaan iloobin in wax ay ku fashilmeen halkaas, isla hawsha tusaale ahaan waxay tagi doontaa jadwalka.

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Way caddahay in tan lagu sameeyo jiirka oo dhan afar geesoodka cas ma aha mid aad u bini'aadanti - tani ma aha waxa aan ka fileyno Airflow. Dabcan, waxaan haysanaa hubka wax gumaada: Browse/Task Instances

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Aan halmar doorano wax walba oo dib u dhigno eber, dhagsii shayga saxda ah:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Nadiifinta ka dib, tagsiisyadu waxay u egyihiin sidan (waxay durba sugayaan jadwalaha inuu jadwaleeyo):

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Isku xirka, jillaabyada iyo doorsoomayaasha kale

Waa markii la eegi lahaa DAG soo socda, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Qof kastaa waligiis ma sameeyay warbixin cusub? Tani waa iyada mar kale: waxaa jira liis ilo laga helayo halka laga helo xogta; waxaa jira liis meesha la dhigo; Ha iloobin in aad honk marka wax walba dhaceen ama jabeen (si fiican, tani maaha mid annaga nagu saabsan, maya).

Aan mar kale dhex galno faylka oo aan eegno waxyaabaha cusub ee dahsoon:

  • from commons.operators import TelegramBotSendMessage - Ma jiraan wax naga hor istaagaya in aan samayno hawl-wadeenno noo gaar ah, taas oo aanu ka faa'iidaysanay in aanu samaynay duub yar oo farriimaha loo diro Unlocked. (Waxaan ka hadli doonaa wax badan oo ku saabsan hawlwadeenkan hoos);
  • default_args={} - dag waxay u qaybin kartaa isla doodaha dhammaan hawl-wadeenadeeda;
  • to='{{ var.value.all_the_kings_men }}' - beerta to Ma yeelan doonno mid adag, laakiin si firfircoon ayaa loo soo saaray iyadoo la adeegsanayo Jinja iyo doorsoome leh liis emails ah, oo aan si taxadar leh u geliyay Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - shuruuda lagu bilaabayo hawlwadeenka. Xaaladeena, warqaddu waxay u duuli doontaa madaxda kaliya haddii dhammaan ku-tiirsanaanta ay shaqeeyaan si guul leh;
  • tg_bot_conn_id='tg_main' - doodaha conn_id aqbal aqoonsiyada xidhiidhka ee aan ku abuurno Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - fariimaha ku jira Telegram waxay duuli doonaan oo keliya haddii ay jiraan hawlo dhacay;
  • task_concurrency=1 - Waxaan mamnuucnay in isku mar la bilaabo dhowr hawlood oo hal hawl ah. Haddii kale, waxaan heli doonaa bilaabista isku mar ee dhowr ah VerticaOperator (eeg hal miis);
  • report_update >> [email, tg] - dhammaan VerticaOperator Isku soo duub dirida waraaqaha iyo fariimaha, sida tan:
    Socodka hawada Apache: Ka dhigida ETL mid sahlan

    Laakiin maadaama hawlwadeenada ogeysiisyadu ay leeyihiin shuruudo bilaabista oo kala duwan, mid keliya ayaa shaqayn doona. Muuqaalka Geedka, wax walbaa waxay u egyihiin muuqaal yar:
    Socodka hawada Apache: Ka dhigida ETL mid sahlan

Waxaan dhowr eray ka odhan doonaa macros iyo asxaabtooda - doorsoomayaasha.

Macros waa meel-hayeyaasha Jinja oo ku beddeli kara macluumaad kala duwan oo waxtar leh doodaha hawlwadeennada. Tusaale ahaan, sida tan:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} balaadhin doona nuxurka doorsoomiyaha macnaha guud execution_date qaab ahaan YYYY-MM-DD: 2020-07-14. Qaybta ugu fiican ayaa ah in doorsoomayaasha macnaha guud lagu qodbay tusaale hawleed gaar ah ( labajibbaaran oo ku yaal Muuqaalka Geedka), iyo marka dib loo bilaabo, meeleyayaashu waxay ku fidi doonaan isla qiimayaal.

Qiimayaasha loo qoondeeyay waxaa lagu arki karaa iyadoo la isticmaalayo badhanka la sameeyay ee tusaale hawl kasta. Tani waa sida hawsha warqad dirida:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Oo sidaas awgeed hawsha fariinta dirida:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Liis dhammaystiran oo macros-ku-dhismay oo ah nooca ugu dambeeyay ee la heli karo ayaa laga heli karaa halkan: tixraaca macros

Intaa waxaa dheer, iyadoo la kaashanayo plugins, waxaan ku dhawaaqi karnaa macros noo gaar ah, laakiin taasi waa sheeko kale.

Marka lagu daro waxyaabaha horay loo qeexay, waxaan ku bedeli karnaa qiyamka doorsoomayaashayada (waxaan horeyba ugu isticmaalay koodka kore). Aan ku abuurno Admin/Variables dhowr arrimood:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Wax kasta oo aad isticmaali karto:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Qiimuhu wuxuu noqon karaa scalar, ama wuxuu kaloo noqon karaa JSON. Haddii ay dhacdo JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

Kaliya isticmaal jidka furaha la rabo: {{ var.json.bot_config.bot.token }}.

Waxaan dhab ahaan odhan doonaa hal erey oo waxaan tusi doonaa hal sawir oo ku saabsan isku xirnaanta. Wax walba waa kuwa hoose: bogga Admin/Connections Waxaan abuurnaa xiriir, ku darno logins / ereyada sirta ah iyo xuduudo gaar ah oo dheeraad ah halkaas. Sida tan:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Erayada sirta ah waa la qarin karaa (si aad uga fiican kan caadiga ah), ama waxaad ka tagi kartaa nooca isku xirka (sida aan ku sameeyay tg_main) - Xaqiiqdu waxay tahay in liiska noocyada ay ku adag yihiin moodooyinka Airflow oo aan la ballaarin karin iyada oo aan la gelin koodhka isha (haddii si lama filaan ah aanan wax Google ah u gelin, fadlan i sax), laakiin ma jiraan wax naga joojinaya inaan helno credits kaliya magac.

Waxa kale oo aad samayn kartaa dhowr xiriir oo isku magac ah: kiiskan, habka BaseHook.get_connection(), kaas oo naga helaya xidhiidhada magac ahaan, ayaa ku siin doona random laga bilaabo dhowr magacyo (waxa ay noqon doontaa mid macquul ah in la sameeyo Round Robin, laakiin aan ku dhaafno damiirka horumarinta hawada).

Doorsoomayaasha iyo Xidhiidhada hubaal waa qalab fiican, laakiin waa muhiim inaadan lumin dheelitirka: qaybaha socodkaaga aad ku kaydiso koodhka laftiisa, iyo qaybaha aad siiso Airflow kaydinta. Dhinaca kale, waxay noqon kartaa mid ku habboon in si degdeg ah loo beddelo qiimaha, tusaale ahaan, sanduuqa boostada, iyada oo loo marayo UI. Dhanka kale, tani wali waa ku soo noqoshada jiirka jiirka, kaas oo aan (I) rabnay inaan ka takhalusno.

Ku shaqeynta isku xirka waa mid ka mid ah hawlaha qabatooyin. Guud ahaan, xidhmooyinka hawadu waa dhibco loogu xidho adeegyada iyo maktabadaha cid saddexaad. Tusaale, JiraHook waxay noo furaysaa macmiil si aanu ula falgeli karno Jira (waxaad u dhaqaajin kartaa hawlaha hore iyo dib), iyo iyadoo la kaashanayo SambaHook waxaad ku riixi kartaa faylka deegaanka smb-dhibic.

Kala soocida hawlwadeenka gaarka ah

Waxaana ku dhawaanay inaan eegno sida loo sameeyay TelegramBotSendMessage

Code commons/operators.py oo leh hawlwadeenka dhabta ah:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Halkan, sida wax kasta oo kale oo ku jira qulqulka hawada, wax walba waa mid aad u fudud:

  • Laga dhaxlay BaseOperator, kaas oo fuliya waxyaabo aad u yar oo gaar ah socodka hawada (fiiri wakhtiga firaaqadaada)
  • Beeraha lagu dhawaaqay template_fields, kaas oo Jinja uu raadin doono macros si loo farsameeyo.
  • U diyaariyey doodaha saxda ah __init__(), deji khaladaadka meesha loo baahdo.
  • Ma aan iloobin bilawgii awoowga sidoo kale.
  • furay jillaab u dhigma TelegramBotHookka helay shay macmiil ah.
  • Habka la dulmay (la qeexay). BaseOperator.execute(), taas oo Airfow ay garaaci doonto marka la gaaro wakhtiga la bilaabayo hawlwadeenka - gudaha waxaan ku hirgelin doonaa ficilka ugu muhiimsan, illowda gelitaanka. (Waxaan galnaa, jidka, isla markiiba stdout и stderr - Dulqulka hawadu wuxuu dhexda u geli doonaa wax walba, si qurux badan ayuu u duubi doonaa, wuu dumin doonaa meeshii loo baahdo.)

Aan aragno waxa aan haysano commons/hooks.py. Qaybta hore ee faylka, oo leh jillaab lafteeda:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Xitaa ma aqaano waxaan halkan ku sharxo, waxaan kaliya ku xusi doonaa qodobada muhiimka ah:

  • Waxaan dhaxalnay, ka fikirnaa doodaha - inta badan waxay noqon doontaa mid: conn_id;
  • Hababka caadiga ah ee xad-dhaafka ah: Waxaan xaddiday naftayda get_conn(), kaas oo aan ka helo xuduudaha isku xirka magaca oo kaliya hel qaybta extra (tani waa goob JSON ah), kaas oo aan (sida waafaqsan tilmaamahayga!) dhigay calaamada bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Waxaan abuuraa tusaale ka mid ah our TelegramBot, isagoo siinaya calaamad gaar ah.

Waa intaas. Waxaad ka heli kartaa macmiilka jillaab adoo isticmaalaya TelegramBotHook().clent ama TelegramBotHook().get_conn().

Iyo qaybta labaad ee feylka, oo aan u sameeyo microwrapper Telegram REST API, si aan isku mid u jiidin. python-telegram-bot hal hab sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Habka saxda ah waa in la isku daro: TelegramBotSendMessage, TelegramBotHook, TelegramBot - in plugin, geli meel dadweynaha, oo sii Open Source.

Intaan ku guda jirnay waxaas oo dhan, warbixintayada ayaa ku guuleysatay inay si guul leh u fashilanto oo fariin qalad ah ii soo dirto kanaalka. Waxaan doonayaa inaan hubiyo inay khaldan tahay...

Socodka hawada Apache: Ka dhigida ETL mid sahlan
Wax baa ka jabay xayndaabkayaga! Miyaanay ahayn tii aanu filaynay? Dhab ahaantii!

Ma shubaysaa?

Ma dareentay inaan wax u xiisay? Waxaad mooddaa in uu balan qaaday in uu xogta SQL Server-ka u wareejinayo Vertica, ka dibna uu ka qaaday oo uu mowduucii ka sii dhaqaaqay, faqashtii!

Arxan-darradani waxay ahayd mid ula kac ah, waxay ahayd inaan si fudud kuu qeexo erey-bixinno. Hadda waad sii socon kartaa.

Qorshahayagu wuxuu ahaa sidan:

  1. Dag dag
  2. Shaqo abuur
  3. Bal eeg sida ay wax walba u qurux badan yihiin
  4. U qoondee nambarada fadhiga si aad u buuxiso
  5. Ka hel xogta Server-ka SQL
  6. Ku rid xogta Vertica
  7. Ururi tirakoob

Markaa, si aan waxan oo dhan kor ugu qaadno, waxaan ku daray wax yar oo noo docker-compose.yml:

docker-ka kooban.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Halkaa waxaanu ku soo kordhinaynaa:

  • Vertica martigaliyaha ahaan dwh oo leh dejinta ugu default,
  • Saddex xaaladood oo SQL Server ah,
  • waxaan ku buuxineynaa keydka macluumaadka ee dambe xogta qaar (xaaladna ha eegin mssql_init.py!)

Waxaan bilownaa dhammaan wanaagga annagoo kaashanayna amar xoogaa ka dhib badan kii ugu dambeeyay:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Waxa mucjisadayada randomizer soo saaray, waxaad isticmaali kartaa shayga Data Profiling/Ad Hoc Query:

Socodka hawada Apache: Ka dhigida ETL mid sahlan
Waxa ugu weyni maaha in la tuso falanqeeyayaasha

ku faahfaahiyaan Kulamada ETL Ma yeeli doono, wax walba waa wax aan macquul ahayn: waxaan sameyneynaa saldhig, calaamad ayaa ku jirta, wax walba waxaan ku duubnaa maamulaha macnaha guud, oo hadda waxaan sameynaa tan:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

fadhi.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Waqtigu waa yimid ururin xogtayada laga bilaabo boqol iyo badh miis. Aynu tan ku samayno annagoo kaashanayna khadadka aan micnaha lahayn:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Iyada oo la kaashanayo jillaab waxaan ka helnaa socodka hawada pymssql-isku xidhid
  2. Aynu ku beddelno xaddidaadda qaabka taariikhda codsiga - waxaa lagu tuuri doonaa shaqada mashiinka template.
  3. Quudinta codsigeena pandasyaa naga heli doona DataFrame - waxay anfacaysaa mustaqbalka.

Waxaan isticmaalayaa beddelka {dt} halkii laga isticmaali lahaa cabbirka codsiga %s ma aha sababtoo ah waxaan ahay Pinocchio shar ah, laakiin sababtoo ah pandas ma xamili karo pymssql oo midda u dambaysa siibato params: Listinkastoo uu runtii rabo tuple.
Sidoo kale ogow horumariyaha pymssql wuxuu go'aansaday inuusan mar dambe taageerin, waxaana la gaaray waqtigii laga guuri lahaa pyodbc.

Aynu aragno waxa Airflow ka buuxiyey doodaha shaqadeena:

Socodka hawada Apache: Ka dhigida ETL mid sahlan

Haddii aysan jirin xog, markaa ma jirto wax macno ah oo lagu sii wado. Laakiin sidoo kale waa la yaab in la tixgeliyo buuxinta si guul leh. Laakiin tani khalad maaha. A-ah-ah, maxaa la sameeyaa?! Oo waa kan waxa:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException waxay u sheegi doontaa Airflow in aysan jirin khaladaad, laakiin waan ka boodnay hawsha. Interface-ku ma lahaan doono afar gees oo cagaar ah ama casaan ah, laakiin casaan.

Aan tuurno xogtayada tiirar badan:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Tusaale ahaan:

  • Xogta aanu ka soo qaadanay amarada,
  • Aqoonsiga fadhiga daadadka (waa ka duwanaan doona hawl kasta),
  • Xashiish laga soo bilaabo isha iyo dalbashada aqoonsiga - si markaa xogta ugu dambeysa (halkaas oo wax walba lagu shubo hal miis) waxaan haysanaa aqoonsi dalbashada gaarka ah.

Talaabada ciqaabta ah ayaa hadhaysa: wax walba ku shub Vertica. Iyo, si yaab leh, mid ka mid ah siyaabaha ugu cajiibsan uguna hufan ee tan loo sameeyo waa iyada oo loo marayo CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Waxaan sameyneynaa qaabilaad gaar ah StringIO.
  2. pandas si naxariis leh noo gelin doonaa DataFrame qaab ahaan CSV-khadadka.
  3. Aan furno xiriirka Vertica aan jecelnahay oo leh jillaab.
  4. Oo hadda iyadoo la kaashanayo copy() si toos ah ugu soo dir xogtayada Vertika!

Waxaan ka qaadi doonaa darawalka inta xariiq ee la buuxiyay, waxaanan u sheegi doonaa maamulaha fadhiga in wax walba ay sax yihiin:

session.loaded_rows = cursor.rowcount
session.successful = True

Waa intaas.

Iibka, waxaan u abuurnaa saxanka bartilmaameedka gacanta. Halkan waxaan naftayda u ogolaaday mishiin yar:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

waan isticmaalayaa VerticaOperator() Waxaan abuuraa schema database iyo miis (haddii aysan hore u jirin, dabcan). Waxa ugu weyn waa in si sax ah loo habeeyo ku tiirsanaanta:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Soo koobid

- Waa hagaag, - ayaa yidhi jiirka yar, - ma aha, hadda
Ma ku qanacsan tahay inaan ahay xayawaanka ugu xun kaynta?

Julia Donaldson, The Gruffalo

Waxaan u maleynayaa haddii asxaabteyda iyo aniga aan tartan lahayn: yaa si dhakhso ah u abuuri doona oo bilaabi doona habka ETL ee xoqan: iyaga oo wata SSIS iyo jiir iyo aniga oo leh Hawo-socodka Wow, waxaan filayaa inaad ku heshiin doontaan inaan ku garaaci doono dhinac walba!

Haddii wax yar ka sii daran, ka dibna Apache Airflow - adoo qeexaya hababka qaabka code barnaamijka - shaqadeyda qabtay badan raaxo iyo raaxaysi badan.

Awooddeeda aan xadidnayn, labadaba marka la eego plug-ins iyo saadaalinta miisaanka, waxay ku siinaysaa fursad aad ku isticmaasho hawada hawada ku dhawaad ​​​​meel kasta: xitaa wareegga buuxa ee ururinta, diyaarinta iyo socodsiinta xogta, xitaa marka la soo saaro gantaalaha (ilaa Mars, ee koorsada).

Qaybta kama dambaysta ah, tixraaca iyo macluumaadka

Jeegaantii aanu idiin ururinay

  • start_date. Haa, tani waa horeba meme maxalli ah. Via Doug doodda ugu weyn start_date dhammaan dhaafaan. Si kooban, haddii aad ku qeexdo start_date taariikhda hadda, iyo schedule_interval - maalin maalmaha ka mid ah, markaas DAG bilaabi doonaa berrito aan ka hor.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Oo dhib dambe ma jiraan.

    Waxa jira khalad kale oo runtime ah oo la xidhiidha: Task is missing the start_date parameter, kaas oo inta badan tilmaamaya inaad illowday inaad ku xidho hawlwadeenka dag.

  • Dhammaan hal mashiin. Haa, iyo saldhigyada (Hawo-mareenka laftiisa iyo dahaadhkayaga), iyo server-ka shabakadda, iyo jadwalka, iyo shaqaalaha. Xitaa way shaqeysay. Laakiin waqti ka dib, tirada hawlaha adeegyada ayaa kordhay, iyo markii PostgreSQL uu bilaabay inuu ka jawaabo tusaha 20 s halkii 5 ms, waanu qaadnay oo qaadnay.
  • Fuliyaha deegaanka. Haa, weli waanu ku dul fadhinaa, oo waxaynu mar hore soo gaadhnay godkii cidhifkii. Fuliyaha Maxaliga ah ayaa nagu filan ilaa hadda, laakiin hadda waa waqtigii la ballaarin lahaa ugu yaraan hal shaqaale, waana inaan si adag u shaqeyn doonaa si aan ugu guurno CeleryExecutor. Iyo marka la eego xaqiiqda ah inaad ku shaqeyn karto hal mashiin, ma jiraan wax kaa joojinaya inaad isticmaasho Celery xitaa server, kaas oo "dabcan, waligiis ma geli doono wax soo saarka, si daacad ah!"
  • Isticmaal la'aan qalab lagu dhisay:
    • Connections si loo kaydiyo aqoonsiga adeegga,
    • SLA way seegtay in laga jawaabo hawlaha aan ku qabsoomin waqtigii loogu talagalay.
    • xcom beddelashada metadata (waxaan idhi metaxogta!) inta u dhaxaysa hawlaha dag.
  • Xadgudubka boostada. Hagaag, maxaan dhihi karaa? Ogeysiisyo ayaa loo dejiyay dhammaan ku celcelinta hawlaha dhacay. Hadda shaqadayda Gmail waxa ay haysataa>90k iimaylo ka socda Airflow, iyo xabka mailku waxa uu diiday in uu qaado oo tirtiro in ka badan 100 markiiba.

Dhibaatooyin badan: Cilad-xumada socodka hawada Apache

Qalab otomaatig ah oo badan

Si aan xataa madaxeena ugu shaqayno oo aan gacmaheena ugu shaqayno, Airflow waxa ay inoo diyaarisay sidan:

  • nasasho API - wuxuu weli leeyahay heerka tijaabada, taas oo aan ka hor istaagin inuu shaqeeyo. Iyadoo la adeegsanayo, kaliya ma heli kartid macluumaadka ku saabsan dags iyo hawlaha, laakiin sidoo kale joojin / bilaabi kartaa dag, samee DAG Run ama barkad.
  • CLI - Qalab badan ayaa laga heli karaa khadka taliska kuwaas oo aan ku habboonayn in la isticmaalo iyada oo loo marayo WebUI, laakiin guud ahaan maqan yihiin. Tusaale ahaan:
    • backfill loo baahan yahay in dib loo bilaabo tusaalooyinka hawsha.
      Tusaale ahaan, falanqeeyayaasha ayaa yimid oo yiri: "Adiga, saaxiib, waxaad leedahay wax aan macno lahayn xogta laga bilaabo Janaayo 1 ilaa 13! Hagaaji, Hagaaji, Hagaaji, Hagaaji! Oo waxaad tahay hob sida:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Adeegga saldhiga: initdb, resetdb, upgradedb, checkdb.
    • run, taas oo kuu ogolaanaysa in aad qabato hal hawl tusaale ah, oo xitaa dhibcaha ku tiirsanaanta oo dhan. Waxaa intaa dheer, waxaad ku socodsiin kartaa via LocalExecutor, xitaa haddii aad leedahay koox Selery ah.
    • Wuxuu sameeyaa wax la mid ah test, kaliya sidoo kale saldhigyada waxba kuma qorto.
    • connections ogolaanaya abuurista ballaaran ee isku xirka qolofka.
  • Python api - hab aad u adag oo isdhexgalka, kaas oo loogu talagalay plugins, oo aan ku dhex qulqulin gacmo yar. Laakin yaa naga diidaya inaan aadno /home/airflow/dags, orod ipython oo bilaw inaad isku qasto? Waxaad, tusaale ahaan, ku dhoofin kartaa dhammaan xidhiidhada koodka soo socda:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Ku xidhida xog-ururinta badan ee socodka hawada Kuma talinayo inaan u qoro, laakiin helitaanka xaaladaha shaqada ee cabbirro kala duwan oo gaar ah ayaa aad uga dhaqso badan ugana fududaan kara adeegsiga mid ka mid ah API-yada.

    Aynu nidhaahno dhammaan hawlaheennu maaha kuwo awood leh, laakiin mararka qaarkood way dhici karaan, tanina waa caadi. Laakiin dhowr xannibaad ayaa durba laga shakisan yahay, waxaana lagama maarmaan noqon doonta in la hubiyo.

    Iska jir SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

tixraacyada

Dabcan, tobanka xiriir ee ugu horreeya ee ka soo baxa Google-ka ayaa ah waxa ku jira galka Hawo-mareenka ee bookmarks-kayga.

Iyo xiriirada loo adeegsaday maqaalka:

Source: www.habr.com

U soo iibso martigelin lagu kalsoonaan karo oo loogu talagalay bogagga leh ilaalinta DDoS, VPS VDS servers 🔥 Iibso martigelin degel oo lagu kalsoonaan karo oo leh ilaalinta DDoS, VPS VDS servers | ProHoster