Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ሰላም፣ እኔ ዲሚትሪ ሎግቪንኮ ነኝ - የVezet የኩባንያዎች ቡድን የትንታኔ ክፍል የውሂብ መሐንዲስ።

የ ETL ሂደቶችን ለማዘጋጀት ስለ አንድ አስደናቂ መሣሪያ እነግርዎታለሁ - Apache Airflow. ነገር ግን የአየር ፍሰት በጣም ሁለገብ እና ዘርፈ-ብዙ ስለሆነ በመረጃ ፍሰቶች ውስጥ ባይሳተፉም በጥንቃቄ ሊመለከቱት ይገባል ነገር ግን ማንኛውንም ሂደቶችን በየጊዜው መጀመር እና አፈፃፀማቸውን መከታተል ያስፈልግዎታል።

እና አዎ, እኔ ብቻ አልነግርም, ግን ደግሞ አሳይ: ፕሮግራሙ ብዙ ኮድ, ቅጽበታዊ ገጽ እይታዎች እና ምክሮች አሉት.

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ
አየር ፍሰት/ዊኪሚዲያ ኮመንስ የሚለውን ቃል ጎግል ሲያደርጉ ብዙውን ጊዜ የሚያዩት ነገር

ማውጫ

መግቢያ

Apache የአየር ፍሰት ልክ እንደ ጃንጎ ነው።

  • በ Python ተፃፈ
  • ጥሩ የአስተዳዳሪ ፓነል አለ ፣
  • ላልተወሰነ ጊዜ ሊሰፋ የሚችል

- የተሻለ ብቻ እና የተሰራው ሙሉ ለሙሉ ለተለያዩ ዓላማዎች ማለትም (ከካት በፊት እንደተጻፈው) ነው።

  • ገደብ በሌለው ማሽኖች ላይ ስራዎችን ማሄድ እና መቆጣጠር (ብዙ ሴሊሪ / ኩበርኔትስ እና ህሊናዎ ይፈቅድልዎታል)
  • የ Python ኮድ ለመጻፍ እና ለመረዳት በጣም ቀላል በሆነ ተለዋዋጭ የስራ ፍሰት ትውልድ
  • እና ሁለቱንም ዝግጁ አካላት እና በቤት ውስጥ የተሰሩ ፕለጊኖችን በመጠቀም ማንኛውንም የውሂብ ጎታ እና ኤፒአይዎችን እርስ በእርስ የማገናኘት ችሎታ (ይህም በጣም ቀላል ነው)።

Apache የአየር ፍሰትን እንደሚከተለው እንጠቀማለን-

  • መረጃን ከተለያዩ ምንጮች እንሰበስባለን (ብዙ የSQL Server እና PostgreSQL ምሳሌዎች፣ የተለያዩ ኤፒአይዎች ከአፕሊኬሽን ሜትሪክስ፣ 1C እንኳን) በDWH እና ODS (Vertica እና Clickhouse አለን)።
  • ምን ያህል የላቀ cronበ ODS ላይ የውሂብ ማጠናከሪያ ሂደቶችን የሚጀምረው እና ጥገናቸውንም የሚከታተል.

እስከ ቅርብ ጊዜ ድረስ፣ ፍላጎታችን 32 ኮር እና 50 ጂቢ ራም ባለው አንድ ትንሽ አገልጋይ ተሸፍኗል። በአየር ፍሰት ውስጥ ይህ ይሰራል፡-

  • ከ 200 ዶግ (በእውነቱ ስራዎችን የሞላንበት የስራ ፍሰቶች)
  • በእያንዳንዱ በአማካይ 70 ተግባራት,
  • ይህ መልካምነት ይጀምራል (በተጨማሪም በአማካይ) በሰዓት አንድ ጊዜ.

እና እንዴት እንደሰፋን ፣ ከዚህ በታች እጽፋለሁ ፣ ግን አሁን የምንፈታውን የ über-ችግርን እንገልፃለን-

ሶስት ምንጭ SQL ሰርቨሮች አሉ እያንዳንዳቸው 50 የውሂብ ጎታዎች - የአንድ ፕሮጀክት ምሳሌዎች እንደቅደም ተከተላቸው, ተመሳሳይ መዋቅር አላቸው (በሁሉም ቦታ ማለት ይቻላል, mua-ha-ha), ይህም ማለት እያንዳንዳቸው የትእዛዝ ሠንጠረዥ አላቸው (እንደ እድል ሆኖ, ከዚህ ጋር ሰንጠረዥ) ስም ወደ ማንኛውም ንግድ ሊገባ ይችላል). እኛ የአገልግሎት መስኮችን (ምንጭ አገልጋይ፣ የምንጭ ዳታቤዝ፣ የኢቲኤል ተግባር መታወቂያ) በማከል ውሂቡን ወስደን በዋህነት ወደ ቨርቲካ እንወረውራለን።

እንሂድ!

ዋናው ክፍል ፣ ተግባራዊ (እና ትንሽ ቲዎሪ)

ለምንድነው ለእኛ (እና ለእርስዎ)

ዛፎቹ ትልቅ ሲሆኑ እና እኔ ቀላል ነበርኩ SQL-schik በአንድ የሩሲያ ችርቻሮ ውስጥ ፣እኛ የሚገኙትን ሁለት መሳሪያዎችን በመጠቀም የኢቲኤል ሂደቶችን አጭበርብተናል።

  • Informatica የኃይል ማዕከል - እጅግ በጣም የተስፋፋ ስርዓት ፣ እጅግ በጣም ውጤታማ ፣ በራሱ ሃርድዌር ፣ የራሱ ስሪት። 1% አቅሙን እግዚአብሔር ይከለክለው ተጠቀምኩ። ለምን? ደህና ፣ በመጀመሪያ ፣ ይህ በይነገጽ ፣ ከ 380 ዎቹ ጀምሮ ፣ በአእምሯችን ላይ ጫና ፈጥሯል። በሁለተኛ ደረጃ፣ ይህ ክልከላ የተነደፈው እጅግ በጣም ቆንጆ ለሆኑ ሂደቶች፣ ቁጡ አካል መልሶ ጥቅም ላይ ለማዋል እና ሌሎች በጣም አስፈላጊ ለሆኑ-የድርጅት-ማታለያዎች ነው። ሾለ ወጪው ፣ እንደ ኤርባስ AXNUMX / ዓመት ክንፍ ፣ ምንም አንልም ።

    ይጠንቀቁ፣ ቅጽበታዊ ገጽ እይታ ከ30 ዓመት በታች የሆኑ ሰዎችን በትንሹ ሊጎዳ ይችላል።

    Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

  • SQL አገልጋይ ውህደት አገልጋይ - ይህንን ጓዳችንን በውስጣችን በፕሮጀክት ፍሰቶች ውስጥ ተጠቅመንበታል። ደህና፣ በእውነቱ፡ አስቀድመን SQL Server እንጠቀማለን፣ እና በሆነ መልኩ የኢቲኤል መሳሪያዎቹን አለመጠቀም ምክንያታዊ አይሆንም። በውስጡ ያለው ነገር ሁሉ ጥሩ ነው: ሁለቱም በይነገጹ ውብ ነው, እና የሂደቱ ዘገባዎች ... ግን ለዚህ አይደለም የሶፍትዌር ምርቶችን የምንወደው, ኦህ, ለዚህ አይደለም. ስሪት ያድርጉት dtsx (ይህም ኤክስኤምኤል ነው አንጓዎች በ save ላይ የተዘበራረቁ) እኛ እንችላለን፣ ግን ነጥቡ ምንድን ነው? በመቶዎች የሚቆጠሩ ሰንጠረዦችን ከአንድ አገልጋይ ወደ ሌላ የሚጎትት የተግባር ፓኬጅ መስራትስ? አዎ ፣ ስንት መቶ ፣ አመልካች ጣትዎ የመዳፊት አዝራሩን ጠቅ በማድረግ ከሃያ ቁርጥራጮች ይወድቃል። ግን በእርግጠኝነት የበለጠ ፋሽን ይመስላል

    Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

በእርግጥ መውጫ መንገዶችን ፈልገን ነበር። ጉዳይ እንኳን ያህል በራሱ ወደ ተጻፈ የSSIS ጥቅል ጀነሬተር መጣ...

... እና ከዚያ አዲስ ሥራ አገኘኝ. እና Apache የአየር ፍሰት በላዩ ላይ ደረሰኝ።

የኢቲኤል ሂደት መግለጫዎች ቀላል የፓይዘን ኮድ መሆናቸውን ሳውቅ ለደስታ አልጨፈርኩም። የመረጃ ዥረቶች የተቀየሩት እና የተከፋፈሉት በዚህ መንገድ ነው እና አንድ ነጠላ መዋቅር ያላቸውን ሰንጠረዦች በመቶዎች ከሚቆጠሩ የውሂብ ጎታዎች ወደ አንድ ኢላማ ማፍሰስ በአንድ እና ተኩል ወይም ሁለት 13 ”ስክሪኖች ውስጥ የፓይዘን ኮድ ጉዳይ ሆኗል።

ክላስተር መሰብሰብ

ሙሉ ለሙሉ ሙአለህፃናት አናዘጋጅ፣ እና እዚህ ሙሉ ለሙሉ ግልፅ የሆኑ ነገሮች እንዳንነጋገር፣ ለምሳሌ የአየር ፍሰት፣ የመረጡት ዳታቤዝ፣ ሴሊሪ እና ሌሎች በመትከያዎች ውስጥ የተገለጹ ጉዳዮች።

ወዲያውኑ ሙከራዎችን እንጀምራለን, እኔ ንድፍ አወጣሁ docker-compose.yml የትኛው ውስጥ:

  • በትክክል እናነሳ የአየር እንቅስቃሴ: መርሐግብር, Webserver. የሴልሪ ስራዎችን ለመከታተል አበባው እዚያ ይሽከረከራል (ምክንያቱም አስቀድሞ ወደ ውስጥ ስለተገፋ ነው። apache/airflow:1.10.10-python3.7ግን ምንም አይደለንም)
  • PostgreSQLየአየር ፍሰት የአገልግሎት መረጃውን (የመርሐግብር አወጣጥ መረጃ ፣ የአፈፃፀም ስታቲስቲክስ ፣ ወዘተ) የሚጽፍበት እና ሴሊሪ የተጠናቀቁ ተግባራትን ምልክት ያደርጋል ።
  • Redisለሴሊየሪ ተግባር ደላላ ሆኖ የሚያገለግል;
  • የሰሊጥ ሰራተኛ, ይህም ተግባራትን በቀጥታ አፈፃፀም ላይ የሚሰማራ.
  • ወደ አቃፊ ./dags ፋይሎቻችንን ከዳግስ መግለጫ ጋር እንጨምራለን ። በበረራ ላይ ይወሰዳሉ, ስለዚህ ከእያንዳንዱ ማስነጠስ በኋላ ሙሉውን ቁልል ማዞር አያስፈልግም.

በአንዳንድ ቦታዎች, በምሳሌዎቹ ውስጥ ያለው ኮድ ሙሉ በሙሉ አይታይም (ጽሑፉን ላለማጨናነቅ), ነገር ግን የሆነ ቦታ በሂደቱ ውስጥ ተስተካክሏል. የተሟሉ የስራ ኮድ ምሳሌዎች በማጠራቀሚያው ውስጥ ይገኛሉ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

ማስታወሻዎች

  • በቅንጅቱ ስብሰባ ውስጥ, በአብዛኛው በታዋቂው ምስል ላይ ተመስርቻለሁ puckel / docker-የአየር ፍሰት - እሱን ማረጋገጥዎን እርግጠኛ ይሁኑ። ምናልባት በህይወትዎ ውስጥ ሌላ ምንም ነገር አያስፈልጎትም.
  • ሁሉም የአየር ፍሰት ቅንጅቶች የሚገኙት በ በኩል ብቻ አይደለም። airflow.cfg, ነገር ግን በአካባቢ ተለዋዋጮች (ለገንቢዎች ምስጋና ይግባው) እኔ በተንኮል ተጠቅሜያለሁ.
  • በተፈጥሮ ፣ ለምርት ዝግጁ አይደለም-ሆን ብዬ የልብ ምቶች በእቃ መያዣዎች ላይ አላስቀመጥኩም ፣ ለደህንነት አላስቸገረኝም። ነገር ግን ለሞካሪዎቻችን የሚስማማውን አነስተኛውን አደረግሁ።
  • አስታውስ አትርሳ:
    • የዳግ አቃፊው ለሁለቱም መርሐግብር አውጪው እና ለሠራተኞቹ ተደራሽ መሆን አለበት።
    • በሁሉም የሶስተኛ ወገን ቤተ-መጻሕፍት ላይም ተመሳሳይ ነው - ሁሉም በጊዜ መርሐግብር እና በሠራተኞች ማሽኖች ላይ መጫን አለባቸው።

ደህና፣ አሁን ቀላል ነው፡-

$ docker-compose up --scale worker=3

ሁሉም ነገር ከተነሳ በኋላ የድር በይነገጽን ማየት ይችላሉ-

መሰረታዊ ጽንሰ-ሀሳቦች

በእነዚህ ሁሉ “ዳግስ” ውስጥ ምንም ነገር ካልተረዳህ፣ አጭር መዝገበ ቃላት እዚህ አለ፡-

  • መርሐግብር - በአየር ፍሰት ውስጥ በጣም አስፈላጊው አጎት ፣ ሮቦቶች ጠንክረው እንደሚሠሩ ፣ እና ሰው አለመሆኑን በመቆጣጠር: የጊዜ ሰሌዳውን ይቆጣጠራል ፣ ዝማኔዎችን ያሻሽላል ፣ ተግባራትን ይጀምራል።

    በአጠቃላይ ፣ በአሮጌ ስሪቶች ፣ እሱ የማስታወስ ችግር ነበረበት (አይ ፣ የመርሳት ችግር አይደለም ፣ ግን መፍሰስ) እና የቅርስ መለኪያው በውቅሮች ውስጥ እንኳን ቆይቷል። run_duration - እንደገና የሚጀምርበት ጊዜ። አሁን ግን ሁሉም ነገር ደህና ነው።

  • DAG (በሚለው "ዳግ") - "ቀጥተኛ አሲክሊክ ግራፍ", ነገር ግን እንዲህ ዓይነቱ ፍቺ ለጥቂት ሰዎች ይነግራል, ነገር ግን በእውነቱ እርስ በርስ መስተጋብር የሚፈጥሩ ተግባራት መያዣ ነው (ከዚህ በታች ይመልከቱ) ወይም በ SSIS ውስጥ ያለው የጥቅል አናሎግ እና የስራ ፍሰት Informatica .

    ከዳግስ በተጨማሪ፣ አሁንም ንዑስ ዳጎች ሊኖሩ ይችላሉ፣ ነገር ግን እኛ በአብዛኛው አንደርስባቸውም።

  • DAG ሩጫ - የመነሻ ዳግ, እሱም የራሱ የተመደበ execution_date. ተመሳሳዩ ዳግራንስ በትይዩ ሊሰሩ ይችላሉ (ተግባሮችዎን ጠንካራ ካደረጉት)።
  • ስልከኛ አንድ የተወሰነ ተግባር ለማከናወን ኃላፊነት ያላቸው የኮድ ቁርጥራጮች ናቸው። ሶስት አይነት ኦፕሬተሮች አሉ፡-
    • እርምጃእንደ የእኛ ተወዳጅ PythonOperatorማንኛውንም (የሚሰራ) የፓይዘን ኮድ ሊፈጽም የሚችል;
    • ዝውውርመረጃን ከቦታ ወደ ቦታ የሚያጓጉዝ፣ MsSqlToHiveTransfer;
    • ዳሳሽ በሌላ በኩል, አንድ ክስተት እስኪከሰት ድረስ ምላሽ እንዲሰጡ ወይም ተጨማሪውን የዳግም አፈፃፀም እንዲቀንሱ ያስችልዎታል. HttpSensor የተገለጸውን የመጨረሻ ነጥብ መሳብ ይችላል, እና የሚፈለገው ምላሽ ሲጠብቅ, ዝውውሩን ይጀምሩ GoogleCloudStorageToS3Operator. ጠያቂ አእምሮ፡- “ለምን? ደግሞም በኦፕሬተሩ ውስጥ ድግግሞሾችን ማድረግ ይችላሉ! እና ከዚያ ፣ ከተንጠለጠሉ ኦፕሬተሮች ጋር የተግባር ገንዳውን ላለመዝጋት። ዳሳሹ ከሚቀጥለው ሙከራ በፊት ይጀምራል፣ ይፈትሻል እና ይሞታል።
  • ተግባር - የታወጁ ኦፕሬተሮች ምንም ዓይነት ዓይነት ሳይሆኑ እና ከዳግ ጋር ተያይዘው ወደ ተግባር ደረጃ ከፍ ብለዋል ።
  • የተግባር ምሳሌ - አጠቃላይ እቅድ አውጪው በአፈፃፀም-ሠራተኞች ላይ ተግባራትን ወደ ጦርነት ለመላክ ጊዜው እንደሆነ ሲወስን (በቦታው ላይ ፣ ከተጠቀምንበት) LocalExecutor ወይም በጉዳዩ ላይ ወደ ሩቅ መስቀለኛ መንገድ CeleryExecutor) ለእነሱ አውድ ይመድባል (ማለትም የተለዋዋጮች ስብስብ - የአፈፃፀም መለኪያዎች) ፣ የትዕዛዝ ወይም የጥያቄ አብነቶችን ያሰፋል እና ያጠራቅማቸዋል።

ስራዎችን እንፈጥራለን

በመጀመሪያ፣ የዶሻችንን አጠቃላይ እቅድ እንዘርዝር፣ ከዚያም ወደ ዝርዝሮቹ የበለጠ እናበዛለን፣ ምክንያቱም አንዳንድ ቀላል ያልሆኑ መፍትሄዎችን እንተገብራለን።

ስለዚህ ፣ በጣም ቀላል በሆነ መልኩ ፣ እንዲህ ዓይነቱ ዳጋ እንደዚህ ይመስላል

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

እስቲ እንመልከት

  • በመጀመሪያ, አስፈላጊ የሆኑትን ሊቢስ እናስመጣለን ሌላ ነገር;
  • sql_server_ds ነው List[namedtuple[str, str]] ከአየር ፍሰት ግንኙነቶች ግንኙነቶች ስሞች እና ሳህናችንን ከምንወስድባቸው የውሂብ ጎታዎች ጋር;
  • dag - የግድ ውስጥ መሆን ያለበት የኛ ዳግ ማስታወቂያ globals()አለበለዚያ የአየር ፍሰት አያገኘውም። ዳግ እንዲህ ማለት አለበት፡-
    • ስሙ ማን ነው orders - ይህ ስም በድር በይነገጽ ውስጥ ይታያል ፣
    • ከሐምሌ ስምንት ቀን እኩለ ሌሊት ጀምሮ እንደሚሠራ ፣
    • እና በየ 6 ሰዓቱ መሮጥ አለበት (በዚህ ፈንታ ለጠንካራ ሰዎች) timedelta() ተቀባይነት ያለው cron- መሾመር 0 0 0/6 ? * * *, ለትንሽ አሪፍ - እንደ አገላለጽ @daily);
  • workflow() ዋናውን ሾል ይሰራል, አሁን ግን አይደለም. ለአሁን፣ አውዳችንን ወደ ምዝግብ ማስታወሻው ውስጥ እናስገባዋለን።
  • እና አሁን ተግባሮችን የመፍጠር ቀላል አስማት-
    • በምንጮቻችን እንሮጣለን;
    • ማስጀመር PythonOperator, ይህም የእኛን dummy ያስፈጽማል workflow(). የተግባሩን ልዩ (በዳግ ውስጥ) ስም መጥቀስ እና ድራሹን እራሱ ማሰር አይርሱ። ባንዲራ provide_context በምላሹ, ተጨማሪ ክርክሮችን ወደ ተግባር ያፈሳሉ, በጥንቃቄ ተጠቅመን እንሰበስባለን **context.

ለአሁን፣ ያ ብቻ ነው። ያገኘነው፡-

  • በድር በይነገጽ ውስጥ አዲስ ዳግ ፣
  • በትይዩ የሚከናወኑ አንድ መቶ ተኩል ተግባራት (የአየር ፍሰት ፣ የሴልሪ ቅንጅቶች እና የአገልጋይ አቅም ከፈቀደ)።

ደህና ፣ ገባኝ ማለት ይቻላል።

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ
ጥገኞቹን ማን ይጭነዋል?

ይህን ሁሉ ነገር ለማቃለል ውስጤ ገባሁ docker-compose.yml ማቀነባበር requirements.txt በሁሉም አንጓዎች ላይ.

አሁን ጠፍቷል፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ግራጫ ካሬዎች በጊዜ ሰሌዳው የሚከናወኑ የተግባር ምሳሌዎች ናቸው።

ትንሽ እንጠብቃለን፣ ተግባሮቹ በሰራተኞች ተወስደዋል፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

አረንጓዴዎቹ እርግጥ ነው, ሥራቸውን በተሳካ ሁኔታ አጠናቀዋል. ቀይ ቀለም በጣም ስኬታማ አይደለም.

በነገራችን ላይ በምርታችን ላይ ምንም አቃፊ የለም። ./dags, በማሽኖች መካከል ምንም ማመሳሰል የለም - ሁሉም ዳጎች ተኝተዋል git በእኛ Gitlab ላይ፣ እና Gitlab CI ሲዋሃዱ ዝማኔዎችን ወደ ማሽኖች ያሰራጫል። master.

ስለ አበባ ትንሽ

ሰራተኞቹ የእኛን ፓክፋፋየር እየደቆሱ ሳለ፣ አንድ ነገር ሊያሳየን የሚችል ሌላ መሳሪያ እናስታውስ - አበባ።

በሠራተኛ አንጓዎች ላይ ማጠቃለያ መረጃ ያለው የመጀመሪያው ገጽ፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ወደ ሥራ ከሄዱ ተግባራት ጋር በጣም ኃይለኛው ገጽ፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ከደላላችን ሁኔታ ጋር በጣም አሰልቺ የሆነው ገጽ፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

በጣም ብሩህ ገጽ የተግባር ሁኔታ ግራፎች እና የአፈፃፀም ጊዜያቸው ነው፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

የተጫነውን እንጭነዋለን

ስለዚህ, ሁሉም ተግባራት ተከናውነዋል, የቆሰሉትን መውሰድ ይችላሉ.

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

እና ብዙ ቆስለዋል - በአንድ ወይም በሌላ ምክንያት። የአየር ፍሰት ትክክለኛ አጠቃቀምን በተመለከተ እነዚህ ካሬዎች መረጃው በእርግጠኝነት እንዳልደረሰ ያመለክታሉ።

ምዝግብ ማስታወሻውን መመልከት እና የወደቁትን የተግባር ምሳሌዎችን እንደገና ማስጀመር ያስፈልግዎታል።

በማንኛውም ካሬ ላይ ጠቅ በማድረግ ለእኛ የሚገኙትን ድርጊቶች እናያለን፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ወስደህ የወደቀውን አጽዳ ማድረግ ትችላለህ። ያም ማለት አንድ ነገር እዚያ እንዳልተሳካ እንረሳዋለን, እና ተመሳሳይ ምሳሌ ተግባር ወደ መርሐግብር አውጪው ይሄዳል.

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

በሁሉም ቀይ ካሬዎች በመዳፊት ይህን ማድረግ በጣም ሰብአዊ እንዳልሆነ ግልጽ ነው - ይህ ከአየር ፍሰት የምንጠብቀው አይደለም. በተፈጥሮ፣ የጅምላ ጨራሽ መሣሪያዎች አሉን፡- Browse/Task Instances

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ሁሉንም ነገር በአንድ ጊዜ እንመርጥና ወደ ዜሮ እንደገና እናስጀምር፣ ትክክለኛውን ንጥል ጠቅ ያድርጉ፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ካጸዱ በኋላ ታክሲዎቻችን ይህን ይመስላል (የጊዜ መርሐግብር አስማሚውን አስቀድመው እየጠበቁ ናቸው)

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ግንኙነቶች, መንጠቆዎች እና ሌሎች ተለዋዋጮች

የሚቀጥለውን DAG ለመመልከት ጊዜው አሁን ነው, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ሁሉም ሰው የሪፖርት ማሻሻያ አድርጓል? ይህ እንደገና እሷ ናት: መረጃውን ከየት ማግኘት እንደሚችሉ ምንጮች ዝርዝር አለ; የሚቀመጥበት ዝርዝር አለ; ሁሉም ነገር ሲከሰት ወይም ሲሰበር ማሰማትዎን አይርሱ (ደህና ፣ ይህ ስለ እኛ አይደለም ፣ አይሆንም)።

እንደገና ወደ ፋይሉ እንሂድ እና አዲስ ግልጽ ያልሆኑ ነገሮችን እንይ፡-

  • from commons.operators import TelegramBotSendMessage - ወደ ያልተከለከሉ መልዕክቶች ለመላክ ትንሽ መጠቅለያ በመስራት የራሳችንን ኦፕሬተሮች እንዳንሠራ የሚከለክለን ምንም ነገር የለም። (ስለዚህ ኦፕሬተር ከዚህ በታች የበለጠ እንነጋገራለን);
  • default_args={} - ዳግ ተመሳሳይ ክርክሮችን ለሁሉም ኦፕሬተሮች ማሰራጨት ይችላል ፣
  • to='{{ var.value.all_the_kings_men }}' - መስክ to እኛ ሃርድ ኮድ አይኖረንም፣ ነገር ግን በተለዋዋጭ የመነጨ ጂንጃ እና የኢሜይሎች ዝርዝር ያለው ተለዋዋጭ ነው፣ ይህም በጥንቃቄ ያስገባሁት Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ኦፕሬተሩን ለመጀመር ሁኔታ. በእኛ ሁኔታ, ደብዳቤው ወደ አለቆቹ የሚበርው ሁሉም ጥገኞች ከሰሩ ብቻ ነው በተሳካ ሁኔታ;
  • tg_bot_conn_id='tg_main' - ክርክሮች conn_id የምንፈጥራቸውን የግንኙነት መታወቂያዎችን ተቀበል Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - በቴሌግራም ውስጥ ያሉ መልእክቶች የሚበሩት የወደቁ ተግባራት ካሉ ብቻ ነው ።
  • task_concurrency=1 - የአንድ ተግባር በርካታ የተግባር ምሳሌዎችን በአንድ ጊዜ ማስጀመርን እንከለክላለን። አለበለዚያ የብዙዎችን በአንድ ጊዜ ማስጀመር እናገኛለን VerticaOperator (በአንድ ጠረጴዛ ላይ መመልከት);
  • report_update >> [email, tg] - ሁሉም VerticaOperator ደብዳቤዎችን እና መልዕክቶችን ለመላክ ይተባበሩ ፣
    Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

    ነገር ግን አሳዋቂ ኦፕሬተሮች የተለያዩ የማስጀመሪያ ሁኔታዎች ስላሏቸው አንድ ብቻ ነው የሚሰራው። በዛፍ እይታ ፣ ሁሉም ነገር በትንሹ ምስላዊ ይመስላል
    Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ስለ ጥቂት ቃላት እናገራለሁ ማክሮዎች እና ጓደኞቻቸው - ተለዋዋጮች.

ማክሮዎች የተለያዩ ጠቃሚ መረጃዎችን ወደ ኦፕሬተር ክርክሮች የሚተኩ የጂንጃ ቦታ ያዢዎች ናቸው። ለምሳሌ፣ እንደዚህ፡-

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ወደ የአውድ ተለዋዋጭ ይዘቶች ይሰፋል execution_date በቅጹ ላይ YYYY-MM-DD: 2020-07-14. በጣም ጥሩው ክፍል የአውድ ተለዋዋጮች በአንድ የተወሰነ ተግባር ምሳሌ ላይ ተቸንክረዋል (በዛፍ እይታ ውስጥ ያለ ካሬ) እና እንደገና ሲጀመር ቦታ ያዢዎቹ ወደ ተመሳሳይ እሴቶች ይሰፋሉ።

የተመደቡት እሴቶች በእያንዳንዱ የተግባር ምሳሌ ላይ የተሰራውን ቁልፍ በመጠቀም ሊታዩ ይችላሉ። ደብዳቤ የመላክ ተግባር እንደዚህ ነው፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

እና ስለዚህ መልእክት በመላክ ተግባር ላይ-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ለቅርብ ጊዜው ስሪት አብሮ የተሰሩ ማክሮዎች ሙሉ ዝርዝር እዚህ ይገኛል። ማክሮዎች ማጣቀሻ

ከዚህም በላይ በተሰኪዎች እገዛ የራሳችንን ማክሮዎች ማወጅ እንችላለን፣ ግን ያ ሌላ ታሪክ ነው።

አስቀድሞ ከተገለጹት ነገሮች በተጨማሪ የእኛን ተለዋዋጮች እሴቶችን መተካት እንችላለን (ይህን ከዚህ በላይ ባለው ኮድ ውስጥ አስቀድሜ ተጠቀምኩበት)። ውስጥ እንፍጠር Admin/Variables ሁለት ነገሮች፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

መጠቀም የሚችሉት ሁሉም ነገር:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

እሴቱ scalar ወይም JSON ሊሆን ይችላል። በJSON ጉዳይ፡-

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

ወደሚፈለገው ቁልፍ የሚወስደውን መንገድ ብቻ ይጠቀሙ፡- {{ var.json.bot_config.bot.token }}.

በጥሬው አንድ ቃል እናገራለሁ እና ስለ አንድ ቅጽበታዊ ገጽ እይታ አሳይሻለሁ። ግንኙነቶች. ሁሉም ነገር እዚህ የመጀመሪያ ደረጃ ነው: በገጹ ላይ Admin/Connections ግንኙነት እንፈጥራለን ፣ የመግቢያ / የይለፍ ቃሎቻችንን እና የበለጠ የተወሰኑ መለኪያዎችን እዚያ እንጨምራለን ። ልክ እንደዚህ:

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

የይለፍ ቃሎች መመስጠር ይችላሉ (ከነባሪው በበለጠ በደንብ) ፣ ወይም የግንኙነት አይነትን መተው ይችላሉ (እኔ እንዳደረግኩት) tg_main) - እውነታው ግን የዓይነቶቹ ዝርዝር በኤር ፍሰት ሞዴሎች ውስጥ የተጠናከረ እና ወደ ምንጭ ኮዶች ውስጥ ሳይገባ ሊሰፋ አይችልም (በድንገት የሆነ ነገር ጎግል ካላደረግኩ እባክዎን ያርሙኝ) ግን ክሬዲቶችን እንዳናገኝ የሚያግደን ምንም ነገር የለም ። ስም.

በተመሳሳይ ስም ብዙ ግንኙነቶችን ማድረግ ይችላሉ-በዚህ ሁኔታ, ዘዴው BaseHook.get_connection()በስም የሚያገናኘን, ይሰጠናል በዘፈቀደ ከበርካታ ስሞች (ሮውንድ ሮቢንን መስራት የበለጠ ምክንያታዊ ይሆናል, ነገር ግን በአየር ፍሰት ገንቢዎች ህሊና ላይ እንተወው).

ተለዋዋጮች እና ግንኙነቶች በእርግጠኝነት ጥሩ መሳሪያዎች ናቸው ፣ ግን ሚዛኑን ላለማጣት አስፈላጊ ነው-የትኞቹን የፍሰቶችዎን ክፍሎች በኮዱ ውስጥ ያከማቻሉ ፣ እና የትኞቹ ክፍሎች ለአየር ፍሰት ለማከማቻ እንደሚሰጡ። በአንድ በኩል እሴቱን በፍጥነት ለመለወጥ ምቹ ሊሆን ይችላል, ለምሳሌ, የፖስታ ሳጥን, በ UI. በሌላ በኩል, ይህ አሁንም ወደ የመዳፊት ጠቅታ መመለስ ነው, እኛ (እኔ) ልናስወግደው ፈለግን.

ከግንኙነቶች ጋር መስራት አንዱ ተግባር ነው። መንጠቆዎች. በአጠቃላይ የአየር ፍሰት መንጠቆዎች ከሶስተኛ ወገን አገልግሎቶች እና ቤተ-መጻሕፍት ጋር ለማገናኘት ነጥቦች ናቸው። ለምሳሌ፣ JiraHook ከጂራ ጋር እንድንገናኝ ደንበኛን ይከፍታል (ተግባራትን ወደ ኋላ እና ወደ ፊት ማንቀሳቀስ ይችላሉ) እና በ እገዛ SambaHook የአካባቢ ፋይልን ወደ ላይ መጫን ይችላሉ። smb- ነጥብ.

ብጁ ኦፕሬተርን መተንተን

እና እንዴት እንደተሰራ ለማየት ተቃርበናል። TelegramBotSendMessage

ኮድ commons/operators.py ከእውነተኛው ኦፕሬተር ጋር;

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

እዚህ፣ ልክ በአየር ፍሰት ውስጥ እንዳለ ሁሉም ነገር፣ ሁሉም ነገር በጣም ቀላል ነው።

  • የተወረሰው BaseOperatorጥቂት የአየር ፍሰት-ተኮር ነገሮችን የሚተገበር (የእረፍት ጊዜዎን ይመልከቱ)
  • የታወጁ መስኮች template_fieldsጂንጃ ለማቀነባበር ማክሮዎችን የሚፈልግበት።
  • ትክክለኛ ክርክሮችን አዘጋጅቷል። __init__(), አስፈላጊ ሆኖ ሲገኝ ነባሪዎችን ያዘጋጁ.
  • ሾለ ቅድመ አያት አጀማመርም አልረሳንም።
  • ተጓዳኝ መንጠቆውን ከፍቷል። TelegramBotHookከእሱ የደንበኛ ነገር ተቀብሏል.
  • የተሻረ (የተገለጸ) ዘዴ BaseOperator.execute()ኦፕሬተሩን ለመጀመር ጊዜው ሲደርስ የትኛው ኤርፎው ይንቀጠቀጣል - በእሱ ውስጥ መግባትን በመርሳት ዋናውን ተግባር እንተገብራለን. (በነገራችን ላይ በትክክል ገብተናል stdout и stderr - የአየር ፍሰት ሁሉንም ነገር ያጠፋል ፣ በሚያምር ሁኔታ ይጠቀለላል ፣ አስፈላጊ በሚሆንበት ጊዜ ያበላሸዋል።)

ያለንን እንይ commons/hooks.py. የፋይሉ የመጀመሪያ ክፍል፣ መንጠቆው ራሱ፡-

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

እዚህ ምን ማብራራት እንዳለብኝ እንኳን አላውቅም፣ አስፈላጊዎቹን ነጥቦች ብቻ አስተውያለሁ፡-

  • እኛ እንወርሳለን ፣ ሾለ ክርክሮቹ አስቡ - በአብዛኛዎቹ ጉዳዮች አንድ ይሆናል conn_id;
  • መደበኛ ዘዴዎችን ማለፍ፡ እራሴን ገድቤአለሁ። get_conn(), በዚህ ውስጥ የግንኙነት መለኪያዎችን በስም አገኛለሁ እና ክፍሉን ብቻ አገኛለሁ extra (ይህ የJSON መስክ ነው)፣ እኔ (በራሴ መመሪያ መሰረት!) የቴሌግራም ቦት ማስመሰያ አስቀመጥኩ፡ {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • የእኛን ምሳሌ እፈጥራለሁ TelegramBot, የተወሰነ ምልክት በመስጠት.

ይኼው ነው. በመጠቀም ደንበኛን ከመንጠቆ ማግኘት ይችላሉ። TelegramBotHook().clent ወይም TelegramBotHook().get_conn().

እና የፋይሉ ሁለተኛ ክፍል ፣ ለቴሌግራም REST ኤፒአይ የማይክሮ መጠቅለያ የምሰራበት ፣ ተመሳሳይ ላለመጎተት python-telegram-bot ለአንድ ዘዴ sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

ትክክለኛው መንገድ ሁሉንም ማከል ነው- TelegramBotSendMessage, TelegramBotHook, TelegramBot - በተሰኪው ውስጥ ፣ የህዝብ ማከማቻ ውስጥ ያስገቡ እና ለክፍት ምንጭ ይስጡት።

ይህን ሁሉ እያጠናን ሳለ የኛ ዘገባ ማሻሻያ በተሳካ ሁኔታ ሳይሳካ ቀርቷል እና በሰርጡ ላይ የስህተት መልእክት ላኩኝ። ስህተት መሆኑን ለማየት እሞክራለሁ...

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ
በእኛ ዶጅ ውስጥ የሆነ ነገር ተሰበረ! ስንጠብቀው የነበረው ነገር አልነበረም? በትክክል!

ልትፈስ ነው?

የሆነ ነገር እንደናፈቀኝ ይሰማዎታል? ከSQL ሰርቨር ወደ ቬርቲካ ለማዛወር ቃል የገባ ይመስላል ከዛም ወስዶ ከርዕሱ ወጣ ወራዳ!

ይህ አረመኔያዊ ድርጊት ሆን ተብሎ የተፈፀመ ነበር፣ በቀላሉ አንዳንድ የቃላት አገባቦችን መፍታት ነበረብኝ። አሁን የበለጠ መሄድ ይችላሉ.

እቅዳችን ይህ ነበር፡-

  1. ዳግ አድርግ
  2. ተግባራትን መፍጠር
  3. ሁሉም ነገር እንዴት ቆንጆ እንደሆነ ይመልከቱ
  4. ለመሙላት የክፍለ-ጊዜ ቁጥሮችን ይመድቡ
  5. ከSQL አገልጋይ ውሂብ ያግኙ
  6. ውሂብን ወደ Vertica ያስገቡ
  7. ስታቲስቲክስን ይሰብስቡ

ስለዚህ፣ ይህንን ሁሉ ለማስኬድ፣ በእኛ ላይ ትንሽ ጭማሪ አድርጌያለሁ docker-compose.yml:

ዶከር-አቀናብር.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

እዚያ እናነሳለን-

  • Vertica እንደ አስተናጋጅ dwh በጣም በነባሪ ቅንጅቶች ፣
  • ሶስት አጋጣሚዎች የ SQL አገልጋይ ፣
  • በኋለኛው ውስጥ የውሂብ ጎታዎችን በአንዳንድ መረጃዎች እንሞላለን (በምንም ሁኔታ አይመልከቱ mssql_init.py!)

ከባለፈው ጊዜ በበለጠ ትንሽ በተወሳሰበ ትእዛዝ በመታገዝ ሁሉንም መልካም ነገሮች እናስጀምራለን።

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

የእኛ ተአምር ራዶሚዘር የፈጠረው ፣ ንጥሉን መጠቀም ይችላሉ። Data Profiling/Ad Hoc Query:

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ
ዋናው ነገር ለተንታኞች ማሳየት አይደለም

ላይ አብራራ የኢቲኤል ክፍለ ጊዜዎች አልፈልግም ፣ እዚያ ሁሉም ነገር ቀላል ነው-መሠረቱን እንሰራለን ፣ በእሱ ውስጥ ምልክት አለ ፣ ሁሉንም ነገር ከአውድ አስተዳዳሪ ጋር እናጠቃልላለን ፣ እና አሁን ይህንን እናደርጋለን

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

ክፍለ ጊዜ.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

ጊዜው ደርሷል የእኛን ውሂብ ሰብስብ ከአንድ መቶ ተኩል ጠረጴዛዎቻችን. በጣም ትርጓሜ በሌላቸው መስመሮች እርዳታ ይህንን እናድርገው-

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. በመንጠቆ እርዳታ ከአየር ፍሰት እናገኛለን pymssql- መገናኘት
  2. በጥያቄው ውስጥ በቀን መልክ እገዳን እንተካ - በአብነት ሞተሩ ወደ ተግባሩ ይጣላል።
  3. ጥያቄያችንን መመገብ pandasማን ያግዘናል DataFrame - ለወደፊቱ ይጠቅመናል.

ምትክ እየተጠቀምኩ ነው። {dt} ከጥያቄ መለኪያ ይልቅ %s እኔ ክፉ ፒኖቺዮ ስለሆንኩ ሳይሆን ስለ pandas መቋቋም አይችልም pymssql እና የመጨረሻውን ያንሸራትታል params: Listእሱ በእውነት ቢፈልግም። tuple.
እንዲሁም ገንቢውን ልብ ይበሉ pymssql እሱን ላለመደገፍ ወሰነ እና ለመልቀቅ ጊዜው አሁን ነው። pyodbc.

የአየር ፍሰት የተግባራችንን ክርክሮች በምን እንደሞላው እንመልከት፡-

Apache የአየር ፍሰት፡ ETLን ቀላል ማድረግ

ምንም ውሂብ ከሌለ, ከዚያ ለመቀጠል ምንም ፋይዳ የለውም. ነገር ግን መሙላትን በተሳካ ሁኔታ መቁጠር እንግዳ ነገር ነው. ግን ይህ ስህተት አይደለም. አ-አህ-አህ፣ ምን ይደረግ?! እና ምን እንደሆነ እነሆ፡-

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ምንም ስህተቶች እንደሌሉ ለአየር ፍሰት ይነግረናል ፣ ግን ተግባሩን እንዘለዋለን። በይነገጹ አረንጓዴ ወይም ቀይ ካሬ አይኖረውም, ግን ሮዝ.

ውሂባችንን እንወረውር በርካታ ዓምዶች:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

የሚታወቀው-

  • ትእዛዙን የወሰድንበት ዳታቤዝ፣
  • የጎርፍ ክፍለ ጊዜያችን መታወቂያ (የተለየ ይሆናል። ለእያንዳንዱ ተግባር),
  • ከምንጩ እና የትዕዛዝ መታወቂያ ሀሽ - በመጨረሻው የውሂብ ጎታ (ሁሉም ነገር በአንድ ጠረጴዛ ውስጥ የሚፈስበት) ልዩ የትዕዛዝ መታወቂያ አለን ።

የመጨረሻው ደረጃ ይቀራል: ሁሉንም ነገር ወደ ቬርቲካ ያፈስሱ. እና፣ በሚገርም ሁኔታ፣ ይህን ለማድረግ በጣም አስደናቂ እና ቀልጣፋ ከሆኑ መንገዶች አንዱ በCSV በኩል ነው!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ልዩ ተቀባይ እየሰራን ነው። StringIO.
  2. pandas በደግነት የእኛን ያስቀምጣል። DataFrame በ CSV- መስመሮች.
  3. ከተወዳጅ ቬርቲካ ጋር ግንኙነትን በመንጠቆ እንክፈት።
  4. እና አሁን በእርዳታ copy() ውሂባችንን በቀጥታ ወደ ቨርቲካ ይላኩ!

ምን ያህል መስመሮች እንደተሞሉ ከሾፌሩ እንወስዳለን እና ለክፍለ-ጊዜው አስተዳዳሪ ሁሉም ነገር ደህና እንደሆነ እንነግራለን።

session.loaded_rows = cursor.rowcount
session.successful = True

ይኼው ነው.

በሽያጭ ላይ, የታለመውን ሰሃን በእጅ እንፈጥራለን. እዚህ ለራሴ ትንሽ ማሽን ፈቀድኩኝ፡-

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

እየተጠቀምኩ ነው። VerticaOperator() የውሂብ ጎታ ንድፍ እና ሠንጠረዥ እፈጥራለሁ (በእርግጥ እነሱ ከሌሉ). ዋናው ነገር ጥገኛዎቹን በትክክል ማዘጋጀት ነው-

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

ማጠቃለል

- ደህና, - ትንሹ አይጥ አለ, - አይደለም, አሁን
እኔ በጫካ ውስጥ በጣም አስፈሪ እንስሳ እንደሆንኩ እርግጠኛ ነዎት?

ጁሊያ ዶናልድሰን ፣ ግሩፋሎ

እኔ እንደማስበው እኔና ባልደረቦቼ ውድድር ቢያጋጥመን፡ ማን በፍጥነት የኢቲኤልን ሂደት ከባዶ የሚፈጥር እና የሚጀምር፡ እነሱ በSSIS እና በመዳፊት እና እኔ ከአየር ፍሰት ጋር...እናም የጥገናን ቀላልነት እናነጻጽራለን ዋው፣ በሁሉም አቅጣጫ እንደምደበድባቸው የምትስማሙ ይመስለኛል!

ትንሽ የበለጠ በቁም ነገር ከሆነ, ከዚያም Apache Airflow - ሂደቶችን በፕሮግራም ኮድ መልክ በመግለጽ - ሥራዬን ሠራሁ. ብዙ የበለጠ ምቹ እና አስደሳች።

በውስጡ ያልተገደበ extensibility, ሁለቱም ተሰኪዎች እና scalability ያለውን ዝንባሌ አንፃር, በማንኛውም አካባቢ ማለት ይቻላል የአየር ፍሰት ለመጠቀም እድል ይሰጥዎታል: እንኳን, የመሰብሰብ, የማዘጋጀት እና ውሂብ ሂደት ሙሉ ዑደት ውስጥ, ሮኬቶችን (ወደ ማርስ, የ ኮርስ)።

የመጨረሻው ክፍል, ማጣቀሻ እና መረጃ

ለአንተ የሰበሰብንልህን መሰቅሰቂያ

  • start_date. አዎ፣ ይህ አስቀድሞ የአካባቢ ትውስታ ነው። በዶግ ዋና ክርክር start_date ሁሉም ያልፋል። በአጭሩ፣ ከገለጹ start_date የአሁኑ ቀን, እና schedule_interval - አንድ ቀን, ከዚያም DAG ነገ ምንም ቀደም ብሎ ይጀምራል.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    እና ምንም ተጨማሪ ችግሮች የሉም.

    ከእሱ ጋር የተያያዘ ሌላ የአሂድ ጊዜ ስህተት አለ፡- Task is missing the start_date parameter, ይህም ብዙውን ጊዜ ከዳግ ኦፕሬተር ጋር ማያያዝን እንደረሱ ያመለክታል.

  • ሁሉም በአንድ ማሽን ላይ. አዎ ፣ እና መሰረቶች (የአየር ፍሰት እራሱ እና የእኛ ሽፋን) ፣ እና የድር አገልጋይ ፣ እና የጊዜ ሰሌዳ አውጪ ፣ እና ሰራተኞች። እና እንዲያውም ሰርቷል. ነገር ግን ከጊዜ በኋላ የአገልግሎቶች ብዛት እየጨመረ እና PostgreSQL ከ 20 ms ይልቅ በ 5 ሰከንድ ውስጥ ለጠቋሚው ምላሽ መስጠት ሲጀምር, ወስደን ወሰድነው.
  • የአካባቢ አስፈፃሚ. አዎን ገና በላዩ ላይ ተቀምጠናል, እና ቀድሞውኑ ወደ ገደል ጫፍ ደርሰናል. የአካባቢ አስፈፃሚ እስካሁን በቂ ሆኖልናል፣ አሁን ግን ቢያንስ ከአንድ ሰራተኛ ጋር የምንሰፋበት ጊዜ ነው፣ እና ወደ CeleryExecutor ለመሄድ ጠንክረን መስራት አለብን። እና በአንድ ማሽን ላይ ከእሱ ጋር አብሮ መስራት ከመቻልዎ አንጻር ሴሊሪን በአገልጋዩ ላይ እንኳን ከመጠቀም የሚያግድዎት ነገር የለም ፣ ይህም “በእርግጥ ፣ በእውነቱ ፣ በጭራሽ ወደ ምርት አይሄድም!”
  • ያለመጠቀም አብሮገነብ መሳሪያዎች:
    • ግንኙነቶች የአገልግሎት ምስክርነቶችን ለማከማቸት ፣
    • SLA ናፈቀ በሰዓቱ ላልተሠሩ ሥራዎች ምላሽ መስጠት ፣
    • xcom ለሜታዳታ ልውውጥ (አልኩት ሜታውሂብ!) በዳግ ተግባራት መካከል።
  • የደብዳቤ አላግባብ መጠቀም። ደህና, ምን ማለት እችላለሁ? ለሁሉም የወደቁ ተግባራት ድግግሞሾች ማንቂያዎች ተዘጋጅተዋል። አሁን የእኔ ሾል Gmail>90k ኢሜይሎች ከአየር ፍሰት አለው፣ እና የድር ሜይል አፈሙዝ በአንድ ጊዜ ከ100 በላይ ለማንሳት እና ለመሰረዝ ፈቃደኛ አይሆንም።

ተጨማሪ ወጥመዶች: Apache የአየር ፍሰት ጉድለቶች

ተጨማሪ አውቶማቲክ መሳሪያዎች

በእጃችን ሳይሆን በጭንቅላታችን እንድንሰራ የአየር ፍሰት ይህንን አዘጋጅቶልናል፡-

  • የ REST ኤ ፒ አይ - እሱ አሁንም የሙከራ ደረጃ አለው, እሱም እንዳይሠራ አያግደውም. በእሱ አማካኝነት ሾለ ዳግስ እና ተግባራት መረጃን ብቻ ሳይሆን ማቆምም / መጀመር, DAG Run ወይም ገንዳ መፍጠር ይችላሉ.
  • CLI - ብዙ መሳሪያዎች በትዕዛዝ መሾመሊ በኩል በWebUI ለመጠቀም የማይመቹ ብቻ ሳይሆን በአጠቃላይ የማይገኙ ናቸው። ለምሳሌ:
    • backfill የተግባር ሁኔታዎችን እንደገና ለማስጀመር ያስፈልጋል።
      ለምሳሌ፣ ተንታኞች መጥተው እንዲህ አሉ፡- “እና አንተ ጓደኛ፣ ከጃንዋሪ 1 እስከ 13 ባለው መረጃ ላይ ትርጉም የለሽ ነገር አለህ! አስተካክል፣ አስተካክል፣ አስተካክል፣ አስተካክል! እና እርስዎ እንደዚህ አይነት ሆብ ነዎት:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • የመሠረት አገልግሎት; initdb, resetdb, upgradedb, checkdb.
    • run, ይህም አንድ ምሳሌ ተግባርን እንዲያካሂዱ እና በሁሉም ጥገኝነቶች ላይ እንኳን ውጤት እንዲያመጡ ያስችልዎታል. በተጨማሪም, በ በኩል ማሄድ ይችላሉ LocalExecutorየሴሊየም ክላስተር ቢኖርዎትም.
    • በጣም ተመሳሳይ ነገር ያደርጋል test, ብቻ ደግሞ ቤዝ ውስጥ ምንም አይጽፍም.
    • connections ከቅርፊቱ ብዙ ግንኙነቶችን ለመፍጠር ያስችላል።
  • ፓይቶን ኤ.ፒ.አይ. - ለፕለጊኖች የታሰበ እና በትንሽ እጆች የማይዋዥቅ የሐርድኮር የግንኙነት መንገድ። ግን እንዳንሄድ ማን ከለከለን። /home/airflow/dags፣ ሩጡ ipython እና መዘባረቅ ጀምር? ለምሳሌ ሁሉንም ግንኙነቶች በሚከተለው ኮድ ወደ ውጭ መላክ ይችላሉ፡
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • ከአየር ፍሰት ሜታዳታቤዝ ጋር በመገናኘት ላይ። እንዲጽፍለት አልመክርም ነገር ግን ለተለያዩ ልዩ መለኪያዎች የተግባር ሁኔታዎችን ማግኘት ከማንኛውም ኤፒአይዎች የበለጠ ፈጣን እና ቀላል ሊሆን ይችላል።

    እንበል ሁሉም ተግባሮቻችን ኃይለኛ አይደሉም ፣ ግን አንዳንድ ጊዜ ሊወድቁ ይችላሉ ፣ እና ይህ የተለመደ ነው። ነገር ግን ጥቂት እገዳዎች ቀድሞውኑ አጠራጣሪ ናቸው, እና መፈተሽ አስፈላጊ ይሆናል.

    ተጠንቀቅ SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

ማጣቀሻዎች

እና በእርግጥ፣ ከ Google መውጣት የመጀመሪያዎቹ አስር አገናኞች የአየር ፍሰት አቃፊው ይዘቶች ከዕልባቶች ናቸው።

እና በአንቀጹ ውስጥ ጥቅም ላይ የዋሉ አገናኞች-

ምንጭ: hab.com