ሰላም፣ እኔ ዲሚትሪ ሎግቪንኮ ነኝ - የVezet የኩባንያዎች ቡድን የትንታኔ ክፍል የውሂብ መሐንዲስ።
የ ETL ሂደቶችን ለማዘጋጀት ስለ አንድ አስደናቂ መሣሪያ እነግርዎታለሁ - Apache Airflow. ነገር ግን የአየር ፍሰት በጣም ሁለገብ እና ዘርፈ-ብዙ ስለሆነ በመረጃ ፍሰቶች ውስጥ ባይሳተፉም በጥንቃቄ ሊመለከቱት ይገባል ነገር ግን ማንኛውንም ሂደቶችን በየጊዜው መጀመር እና አፈፃፀማቸውን መከታተል ያስፈልግዎታል።
እና አዎ, እኔ ብቻ አልነግርም, ግን ደግሞ አሳይ: ፕሮግራሙ ብዙ ኮድ, ቅጽበታዊ ገጽ እይታዎች እና ምክሮች አሉት.

አየር ፍሰት/ዊኪሚዲያ ኮመንስ የሚለውን ቃል ጎግል ሲያደርጉ ብዙውን ጊዜ የሚያዩት ነገር
ማውጫ
መግቢያ
Apache የአየር ፍሰት ልክ እንደ ጃንጎ ነው።
- በ Python ተፃፈ
- ጥሩ የአስተዳዳሪ ፓነል አለ ፣
- ላልተወሰነ ጊዜ ሊሰፋ የሚችል
- የተሻለ ብቻ እና የተሰራው ሙሉ ለሙሉ ለተለያዩ ዓላማዎች ማለትም (ከካት በፊት እንደተጻፈው) ነው።
- ገደብ በሌለው ማሽኖች ላይ ስራዎችን ማሄድ እና መቆጣጠር (ብዙ ሴሊሪ / ኩበርኔትስ እና ህሊናዎ ይፈቅድልዎታል)
- የ Python ኮድ ለመጻፍ እና ለመረዳት በጣም ቀላል በሆነ ተለዋዋጭ የስራ ፍሰት ትውልድ
- እና ሁለቱንም ዝግጁ አካላት እና በቤት ውስጥ የተሰሩ ፕለጊኖችን በመጠቀም ማንኛውንም የውሂብ ጎታ እና ኤፒአይዎችን እርስ በእርስ የማገናኘት ችሎታ (ይህም በጣም ቀላል ነው)።
Apache የአየር ፍሰትን እንደሚከተለው እንጠቀማለን-
- መረጃን ከተለያዩ ምንጮች እንሰበስባለን (ብዙ የSQL Server እና PostgreSQL ምሳሌዎች፣ የተለያዩ ኤፒአይዎች ከአፕሊኬሽን ሜትሪክስ፣ 1C እንኳን) በDWH እና ODS (Vertica እና Clickhouse አለን)።
- ምን ያህል የላቀ
cronበ ODS ላይ የውሂብ ማጠናከሪያ ሂደቶችን የሚጀምረው እና ጥገናቸውንም የሚከታተል.
እስከ ቅርብ ጊዜ ድረስ፣ ፍላጎታችን 32 ኮር እና 50 ጂቢ ራም ባለው አንድ ትንሽ አገልጋይ ተሸፍኗል። በአየር ፍሰት ውስጥ ይህ ይሰራል፡-
- ከ 200 ዶግ (በእውነቱ ስራዎችን የሞላንበት የስራ ፍሰቶች)
- በእያንዳንዱ በአማካይ 70 ተግባራት,
- ይህ መልካምነት ይጀምራል (በተጨማሪም በአማካይ) በሰዓት አንድ ጊዜ.
እና እንዴት እንደሰፋን ፣ ከዚህ በታች እጽፋለሁ ፣ ግን አሁን የምንፈታውን የ über-ችግርን እንገልፃለን-
ሶስት ምንጭ SQL ሰርቨሮች አሉ እያንዳንዳቸው 50 የውሂብ ጎታዎች - የአንድ ፕሮጀክት ምሳሌዎች እንደቅደም ተከተላቸው, ተመሳሳይ መዋቅር አላቸው (በሁሉም ቦታ ማለት ይቻላል, mua-ha-ha), ይህም ማለት እያንዳንዳቸው የትእዛዝ ሠንጠረዥ አላቸው (እንደ እድል ሆኖ, ከዚህ ጋር ሰንጠረዥ) ስም ወደ ማንኛውም ንግድ ሊገባ ይችላል). እኛ የአገልግሎት መስኮችን (ምንጭ አገልጋይ፣ የምንጭ ዳታቤዝ፣ የኢቲኤል ተግባር መታወቂያ) በማከል ውሂቡን ወስደን በዋህነት ወደ ቨርቲካ እንወረውራለን።
እንሂድ!
ዋናው ክፍል ፣ ተግባራዊ (እና ትንሽ ቲዎሪ)
ለምንድነው ለእኛ (እና ለእርስዎ)
ዛፎቹ ትልቅ ሲሆኑ እና እኔ ቀላል ነበርኩ SQL-schik በአንድ የሩሲያ ችርቻሮ ውስጥ ፣እኛ የሚገኙትን ሁለት መሳሪያዎችን በመጠቀም የኢቲኤል ሂደቶችን አጭበርብተናል።
- Informatica የኃይል ማዕከል - እጅግ በጣም የተስፋፋ ስርዓት ፣ እጅግ በጣም ውጤታማ ፣ በራሱ ሃርድዌር ፣ የራሱ ስሪት። 1% አቅሙን እግዚአብሔር ይከለክለው ተጠቀምኩ። ለምን? ደህና ፣ በመጀመሪያ ፣ ይህ በይነገጽ ፣ ከ 380 ዎቹ ጀምሮ ፣ በአእምሯችን ላይ ጫና ፈጥሯል። በሁለተኛ ደረጃ፣ ይህ ክልከላ የተነደፈው እጅግ በጣም ቆንጆ ለሆኑ ሂደቶች፣ ቁጡ አካል መልሶ ጥቅም ላይ ለማዋል እና ሌሎች በጣም አስፈላጊ ለሆኑ-የድርጅት-ማታለያዎች ነው። ስለ ወጪው ፣ እንደ ኤርባስ AXNUMX / ዓመት ክንፍ ፣ ምንም አንልም ።
ይጠንቀቁ፣ ቅጽበታዊ ገጽ እይታ ከ30 ዓመት በታች የሆኑ ሰዎችን በትንሹ ሊጎዳ ይችላል።

- SQL አገልጋይ ውህደት አገልጋይ - ይህንን ጓዳችንን በውስጣችን በፕሮጀክት ፍሰቶች ውስጥ ተጠቅመንበታል። ደህና፣ በእውነቱ፡ አስቀድመን SQL Server እንጠቀማለን፣ እና በሆነ መልኩ የኢቲኤል መሳሪያዎቹን አለመጠቀም ምክንያታዊ አይሆንም። በውስጡ ያለው ነገር ሁሉ ጥሩ ነው: ሁለቱም በይነገጹ ውብ ነው, እና የሂደቱ ዘገባዎች ... ግን ለዚህ አይደለም የሶፍትዌር ምርቶችን የምንወደው, ኦህ, ለዚህ አይደለም. ስሪት ያድርጉት
dtsx(ይህም ኤክስኤምኤል ነው አንጓዎች በ save ላይ የተዘበራረቁ) እኛ እንችላለን፣ ግን ነጥቡ ምንድን ነው? በመቶዎች የሚቆጠሩ ሰንጠረዦችን ከአንድ አገልጋይ ወደ ሌላ የሚጎትት የተግባር ፓኬጅ መስራትስ? አዎ ፣ ስንት መቶ ፣ አመልካች ጣትዎ የመዳፊት አዝራሩን ጠቅ በማድረግ ከሃያ ቁርጥራጮች ይወድቃል። ግን በእርግጠኝነት የበለጠ ፋሽን ይመስላል
በእርግጥ መውጫ መንገዶችን ፈልገን ነበር። ጉዳይ እንኳን ያህል በራሱ ወደ ተጻፈ የSSIS ጥቅል ጀነሬተር መጣ...
... እና ከዚያ አዲስ ሥራ አገኘኝ. እና Apache የአየር ፍሰት በላዩ ላይ ደረሰኝ።
የኢቲኤል ሂደት መግለጫዎች ቀላል የፓይዘን ኮድ መሆናቸውን ሳውቅ ለደስታ አልጨፈርኩም። የመረጃ ዥረቶች የተቀየሩት እና የተከፋፈሉት በዚህ መንገድ ነው እና አንድ ነጠላ መዋቅር ያላቸውን ሰንጠረዦች በመቶዎች ከሚቆጠሩ የውሂብ ጎታዎች ወደ አንድ ኢላማ ማፍሰስ በአንድ እና ተኩል ወይም ሁለት 13 ”ስክሪኖች ውስጥ የፓይዘን ኮድ ጉዳይ ሆኗል።
ክላስተር መሰብሰብ
ሙሉ ለሙሉ ሙአለህፃናት አናዘጋጅ፣ እና እዚህ ሙሉ ለሙሉ ግልፅ የሆኑ ነገሮች እንዳንነጋገር፣ ለምሳሌ የአየር ፍሰት፣ የመረጡት ዳታቤዝ፣ ሴሊሪ እና ሌሎች በመትከያዎች ውስጥ የተገለጹ ጉዳዮች።
ወዲያውኑ ሙከራዎችን እንጀምራለን, እኔ ንድፍ አወጣሁ docker-compose.yml የትኛው ውስጥ:
- በትክክል እናነሳ የአየር እንቅስቃሴ: መርሐግብር, Webserver. የሴልሪ ስራዎችን ለመከታተል አበባው እዚያ ይሽከረከራል (ምክንያቱም አስቀድሞ ወደ ውስጥ ስለተገፋ ነው።
apache/airflow:1.10.10-python3.7ግን ምንም አይደለንም) - PostgreSQLየአየር ፍሰት የአገልግሎት መረጃውን (የመርሐግብር አወጣጥ መረጃ ፣ የአፈፃፀም ስታቲስቲክስ ፣ ወዘተ) የሚጽፍበት እና ሴሊሪ የተጠናቀቁ ተግባራትን ምልክት ያደርጋል ።
- Redisለሴሊየሪ ተግባር ደላላ ሆኖ የሚያገለግል;
- የሰሊጥ ሰራተኛ, ይህም ተግባራትን በቀጥታ አፈፃፀም ላይ የሚሰማራ.
- ወደ አቃፊ
./dagsፋይሎቻችንን ከዳግስ መግለጫ ጋር እንጨምራለን ። በበረራ ላይ ይወሰዳሉ, ስለዚህ ከእያንዳንዱ ማስነጠስ በኋላ ሙሉውን ቁልል ማዞር አያስፈልግም.
በአንዳንድ ቦታዎች, በምሳሌዎቹ ውስጥ ያለው ኮድ ሙሉ በሙሉ አይታይም (ጽሑፉን ላለማጨናነቅ), ነገር ግን የሆነ ቦታ በሂደቱ ውስጥ ተስተካክሏል. የተሟሉ የስራ ኮድ ምሳሌዎች በማጠራቀሚያው ውስጥ ይገኛሉ .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerማስታወሻዎች
- በቅንጅቱ ስብሰባ ውስጥ, በአብዛኛው በታዋቂው ምስል ላይ ተመስርቻለሁ - እሱን ማረጋገጥዎን እርግጠኛ ይሁኑ። ምናልባት በህይወትዎ ውስጥ ሌላ ምንም ነገር አያስፈልጎትም.
- ሁሉም የአየር ፍሰት ቅንጅቶች የሚገኙት በ በኩል ብቻ አይደለም።
airflow.cfg, ነገር ግን በአካባቢ ተለዋዋጮች (ለገንቢዎች ምስጋና ይግባው) እኔ በተንኮል ተጠቅሜያለሁ. - በተፈጥሮ ፣ ለምርት ዝግጁ አይደለም-ሆን ብዬ የልብ ምቶች በእቃ መያዣዎች ላይ አላስቀመጥኩም ፣ ለደህንነት አላስቸገረኝም። ነገር ግን ለሞካሪዎቻችን የሚስማማውን አነስተኛውን አደረግሁ።
- አስታውስ አትርሳ:
- የዳግ አቃፊው ለሁለቱም መርሐግብር አውጪው እና ለሠራተኞቹ ተደራሽ መሆን አለበት።
- በሁሉም የሶስተኛ ወገን ቤተ-መጻሕፍት ላይም ተመሳሳይ ነው - ሁሉም በጊዜ መርሐግብር እና በሠራተኞች ማሽኖች ላይ መጫን አለባቸው።
ደህና፣ አሁን ቀላል ነው፡-
$ docker-compose up --scale worker=3ሁሉም ነገር ከተነሳ በኋላ የድር በይነገጽን ማየት ይችላሉ-
- የአየር እንቅስቃሴ:
- አበባ፡
መሰረታዊ ጽንሰ-ሀሳቦች
በእነዚህ ሁሉ “ዳግስ” ውስጥ ምንም ነገር ካልተረዳህ፣ አጭር መዝገበ ቃላት እዚህ አለ፡-
- መርሐግብር - በአየር ፍሰት ውስጥ በጣም አስፈላጊው አጎት ፣ ሮቦቶች ጠንክረው እንደሚሠሩ ፣ እና ሰው አለመሆኑን በመቆጣጠር: የጊዜ ሰሌዳውን ይቆጣጠራል ፣ ዝማኔዎችን ያሻሽላል ፣ ተግባራትን ይጀምራል።
በአጠቃላይ ፣ በአሮጌ ስሪቶች ፣ እሱ የማስታወስ ችግር ነበረበት (አይ ፣ የመርሳት ችግር አይደለም ፣ ግን መፍሰስ) እና የቅርስ መለኪያው በውቅሮች ውስጥ እንኳን ቆይቷል።
run_duration- እንደገና የሚጀምርበት ጊዜ። አሁን ግን ሁሉም ነገር ደህና ነው። - DAG (በሚለው "ዳግ") - "ቀጥተኛ አሲክሊክ ግራፍ", ነገር ግን እንዲህ ዓይነቱ ፍቺ ለጥቂት ሰዎች ይነግራል, ነገር ግን በእውነቱ እርስ በርስ መስተጋብር የሚፈጥሩ ተግባራት መያዣ ነው (ከዚህ በታች ይመልከቱ) ወይም በ SSIS ውስጥ ያለው የጥቅል አናሎግ እና የስራ ፍሰት Informatica .
ከዳግስ በተጨማሪ፣ አሁንም ንዑስ ዳጎች ሊኖሩ ይችላሉ፣ ነገር ግን እኛ በአብዛኛው አንደርስባቸውም።
- DAG ሩጫ - የመነሻ ዳግ, እሱም የራሱ የተመደበ
execution_date. ተመሳሳዩ ዳግራንስ በትይዩ ሊሰሩ ይችላሉ (ተግባሮችዎን ጠንካራ ካደረጉት)። - ስልከኛ አንድ የተወሰነ ተግባር ለማከናወን ኃላፊነት ያላቸው የኮድ ቁርጥራጮች ናቸው። ሶስት አይነት ኦፕሬተሮች አሉ፡-
- እርምጃእንደ የእኛ ተወዳጅ
PythonOperatorማንኛውንም (የሚሰራ) የፓይዘን ኮድ ሊፈጽም የሚችል; - ዝውውርመረጃን ከቦታ ወደ ቦታ የሚያጓጉዝ፣
MsSqlToHiveTransfer; - ዳሳሽ በሌላ በኩል, አንድ ክስተት እስኪከሰት ድረስ ምላሽ እንዲሰጡ ወይም ተጨማሪውን የዳግም አፈፃፀም እንዲቀንሱ ያስችልዎታል.
HttpSensorየተገለጸውን የመጨረሻ ነጥብ መሳብ ይችላል, እና የሚፈለገው ምላሽ ሲጠብቅ, ዝውውሩን ይጀምሩGoogleCloudStorageToS3Operator. ጠያቂ አእምሮ፡- “ለምን? ደግሞም በኦፕሬተሩ ውስጥ ድግግሞሾችን ማድረግ ይችላሉ! እና ከዚያ ፣ ከተንጠለጠሉ ኦፕሬተሮች ጋር የተግባር ገንዳውን ላለመዝጋት። ዳሳሹ ከሚቀጥለው ሙከራ በፊት ይጀምራል፣ ይፈትሻል እና ይሞታል።
- እርምጃእንደ የእኛ ተወዳጅ
- ተግባር - የታወጁ ኦፕሬተሮች ምንም ዓይነት ዓይነት ሳይሆኑ እና ከዳግ ጋር ተያይዘው ወደ ተግባር ደረጃ ከፍ ብለዋል ።
- የተግባር ምሳሌ - አጠቃላይ እቅድ አውጪው በአፈፃፀም-ሠራተኞች ላይ ተግባራትን ወደ ጦርነት ለመላክ ጊዜው እንደሆነ ሲወስን (በቦታው ላይ ፣ ከተጠቀምንበት)
LocalExecutorወይም በጉዳዩ ላይ ወደ ሩቅ መስቀለኛ መንገድCeleryExecutor) ለእነሱ አውድ ይመድባል (ማለትም የተለዋዋጮች ስብስብ - የአፈፃፀም መለኪያዎች) ፣ የትዕዛዝ ወይም የጥያቄ አብነቶችን ያሰፋል እና ያጠራቅማቸዋል።
ስራዎችን እንፈጥራለን
በመጀመሪያ፣ የዶሻችንን አጠቃላይ እቅድ እንዘርዝር፣ ከዚያም ወደ ዝርዝሮቹ የበለጠ እናበዛለን፣ ምክንያቱም አንዳንድ ቀላል ያልሆኑ መፍትሄዎችን እንተገብራለን።
ስለዚህ ፣ በጣም ቀላል በሆነ መልኩ ፣ እንዲህ ዓይነቱ ዳጋ እንደዚህ ይመስላል
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)እስቲ እንመልከት
- በመጀመሪያ, አስፈላጊ የሆኑትን ሊቢስ እናስመጣለን ሌላ ነገር;
sql_server_dsነውList[namedtuple[str, str]]ከአየር ፍሰት ግንኙነቶች ግንኙነቶች ስሞች እና ሳህናችንን ከምንወስድባቸው የውሂብ ጎታዎች ጋር;dag- የግድ ውስጥ መሆን ያለበት የኛ ዳግ ማስታወቂያglobals()አለበለዚያ የአየር ፍሰት አያገኘውም። ዳግ እንዲህ ማለት አለበት፡-- ስሙ ማን ነው
orders- ይህ ስም በድር በይነገጽ ውስጥ ይታያል ፣ - ከሐምሌ ስምንት ቀን እኩለ ሌሊት ጀምሮ እንደሚሠራ ፣
- እና በየ 6 ሰዓቱ መሮጥ አለበት (በዚህ ፈንታ ለጠንካራ ሰዎች)
timedelta()ተቀባይነት ያለውcron- መስመር0 0 0/6 ? * * *, ለትንሽ አሪፍ - እንደ አገላለጽ@daily);
- ስሙ ማን ነው
workflow()ዋናውን ስራ ይሰራል, አሁን ግን አይደለም. ለአሁን፣ አውዳችንን ወደ ምዝግብ ማስታወሻው ውስጥ እናስገባዋለን።- እና አሁን ተግባሮችን የመፍጠር ቀላል አስማት-
- በምንጮቻችን እንሮጣለን;
- ማስጀመር
PythonOperator, ይህም የእኛን dummy ያስፈጽማልworkflow(). የተግባሩን ልዩ (በዳግ ውስጥ) ስም መጥቀስ እና ድራሹን እራሱ ማሰር አይርሱ። ባንዲራprovide_contextበምላሹ, ተጨማሪ ክርክሮችን ወደ ተግባር ያፈሳሉ, በጥንቃቄ ተጠቅመን እንሰበስባለን**context.
ለአሁን፣ ያ ብቻ ነው። ያገኘነው፡-
- በድር በይነገጽ ውስጥ አዲስ ዳግ ፣
- በትይዩ የሚከናወኑ አንድ መቶ ተኩል ተግባራት (የአየር ፍሰት ፣ የሴልሪ ቅንጅቶች እና የአገልጋይ አቅም ከፈቀደ)።
ደህና ፣ ገባኝ ማለት ይቻላል።

ጥገኞቹን ማን ይጭነዋል?
ይህን ሁሉ ነገር ለማቃለል ውስጤ ገባሁ docker-compose.yml ማቀነባበር requirements.txt በሁሉም አንጓዎች ላይ.
አሁን ጠፍቷል፡-

ግራጫ ካሬዎች በጊዜ ሰሌዳው የሚከናወኑ የተግባር ምሳሌዎች ናቸው።
ትንሽ እንጠብቃለን፣ ተግባሮቹ በሰራተኞች ተወስደዋል፡-

አረንጓዴዎቹ እርግጥ ነው, ሥራቸውን በተሳካ ሁኔታ አጠናቀዋል. ቀይ ቀለም በጣም ስኬታማ አይደለም.
በነገራችን ላይ በምርታችን ላይ ምንም አቃፊ የለም።
./dags, በማሽኖች መካከል ምንም ማመሳሰል የለም - ሁሉም ዳጎች ተኝተዋልgitበእኛ Gitlab ላይ፣ እና Gitlab CI ሲዋሃዱ ዝማኔዎችን ወደ ማሽኖች ያሰራጫል።master.
ስለ አበባ ትንሽ
ሰራተኞቹ የእኛን ፓክፋፋየር እየደቆሱ ሳለ፣ አንድ ነገር ሊያሳየን የሚችል ሌላ መሳሪያ እናስታውስ - አበባ።
በሠራተኛ አንጓዎች ላይ ማጠቃለያ መረጃ ያለው የመጀመሪያው ገጽ፡-

ወደ ሥራ ከሄዱ ተግባራት ጋር በጣም ኃይለኛው ገጽ፡-

ከደላላችን ሁኔታ ጋር በጣም አሰልቺ የሆነው ገጽ፡-

በጣም ብሩህ ገጽ የተግባር ሁኔታ ግራፎች እና የአፈፃፀም ጊዜያቸው ነው፡-

የተጫነውን እንጭነዋለን
ስለዚህ, ሁሉም ተግባራት ተከናውነዋል, የቆሰሉትን መውሰድ ይችላሉ.

እና ብዙ ቆስለዋል - በአንድ ወይም በሌላ ምክንያት። የአየር ፍሰት ትክክለኛ አጠቃቀምን በተመለከተ እነዚህ ካሬዎች መረጃው በእርግጠኝነት እንዳልደረሰ ያመለክታሉ።
ምዝግብ ማስታወሻውን መመልከት እና የወደቁትን የተግባር ምሳሌዎችን እንደገና ማስጀመር ያስፈልግዎታል።
በማንኛውም ካሬ ላይ ጠቅ በማድረግ ለእኛ የሚገኙትን ድርጊቶች እናያለን፡-

ወስደህ የወደቀውን አጽዳ ማድረግ ትችላለህ። ያም ማለት አንድ ነገር እዚያ እንዳልተሳካ እንረሳዋለን, እና ተመሳሳይ ምሳሌ ተግባር ወደ መርሐግብር አውጪው ይሄዳል.

በሁሉም ቀይ ካሬዎች በመዳፊት ይህን ማድረግ በጣም ሰብአዊ እንዳልሆነ ግልጽ ነው - ይህ ከአየር ፍሰት የምንጠብቀው አይደለም. በተፈጥሮ፣ የጅምላ ጨራሽ መሣሪያዎች አሉን፡- Browse/Task Instances

ሁሉንም ነገር በአንድ ጊዜ እንመርጥና ወደ ዜሮ እንደገና እናስጀምር፣ ትክክለኛውን ንጥል ጠቅ ያድርጉ፡-

ካጸዱ በኋላ ታክሲዎቻችን ይህን ይመስላል (የጊዜ መርሐግብር አስማሚውን አስቀድመው እየጠበቁ ናቸው)

ግንኙነቶች, መንጠቆዎች እና ሌሎች ተለዋዋጮች
የሚቀጥለውን DAG ለመመልከት ጊዜው አሁን ነው, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]ሁሉም ሰው የሪፖርት ማሻሻያ አድርጓል? ይህ እንደገና እሷ ናት: መረጃውን ከየት ማግኘት እንደሚችሉ ምንጮች ዝርዝር አለ; የሚቀመጥበት ዝርዝር አለ; ሁሉም ነገር ሲከሰት ወይም ሲሰበር ማሰማትዎን አይርሱ (ደህና ፣ ይህ ስለ እኛ አይደለም ፣ አይሆንም)።
እንደገና ወደ ፋይሉ እንሂድ እና አዲስ ግልጽ ያልሆኑ ነገሮችን እንይ፡-
from commons.operators import TelegramBotSendMessage- ወደ ያልተከለከሉ መልዕክቶች ለመላክ ትንሽ መጠቅለያ በመስራት የራሳችንን ኦፕሬተሮች እንዳንሠራ የሚከለክለን ምንም ነገር የለም። (ስለዚህ ኦፕሬተር ከዚህ በታች የበለጠ እንነጋገራለን);default_args={}- ዳግ ተመሳሳይ ክርክሮችን ለሁሉም ኦፕሬተሮች ማሰራጨት ይችላል ፣to='{{ var.value.all_the_kings_men }}'- መስክtoእኛ ሃርድ ኮድ አይኖረንም፣ ነገር ግን በተለዋዋጭ የመነጨ ጂንጃ እና የኢሜይሎች ዝርዝር ያለው ተለዋዋጭ ነው፣ ይህም በጥንቃቄ ያስገባሁትAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- ኦፕሬተሩን ለመጀመር ሁኔታ. በእኛ ሁኔታ, ደብዳቤው ወደ አለቆቹ የሚበርው ሁሉም ጥገኞች ከሰሩ ብቻ ነው በተሳካ ሁኔታ;tg_bot_conn_id='tg_main'- ክርክሮችconn_idየምንፈጥራቸውን የግንኙነት መታወቂያዎችን ተቀበልAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- በቴሌግራም ውስጥ ያሉ መልእክቶች የሚበሩት የወደቁ ተግባራት ካሉ ብቻ ነው ።task_concurrency=1- የአንድ ተግባር በርካታ የተግባር ምሳሌዎችን በአንድ ጊዜ ማስጀመርን እንከለክላለን። አለበለዚያ የብዙዎችን በአንድ ጊዜ ማስጀመር እናገኛለንVerticaOperator(በአንድ ጠረጴዛ ላይ መመልከት);report_update >> [email, tg]- ሁሉምVerticaOperatorደብዳቤዎችን እና መልዕክቶችን ለመላክ ይተባበሩ ፣

ነገር ግን አሳዋቂ ኦፕሬተሮች የተለያዩ የማስጀመሪያ ሁኔታዎች ስላሏቸው አንድ ብቻ ነው የሚሰራው። በዛፍ እይታ ፣ ሁሉም ነገር በትንሹ ምስላዊ ይመስላል

ስለ ጥቂት ቃላት እናገራለሁ ማክሮዎች እና ጓደኞቻቸው - ተለዋዋጮች.
ማክሮዎች የተለያዩ ጠቃሚ መረጃዎችን ወደ ኦፕሬተር ክርክሮች የሚተኩ የጂንጃ ቦታ ያዢዎች ናቸው። ለምሳሌ፣ እንደዚህ፡-
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ወደ የአውድ ተለዋዋጭ ይዘቶች ይሰፋል execution_date በቅጹ ላይ YYYY-MM-DD: 2020-07-14. በጣም ጥሩው ክፍል የአውድ ተለዋዋጮች በአንድ የተወሰነ ተግባር ምሳሌ ላይ ተቸንክረዋል (በዛፍ እይታ ውስጥ ያለ ካሬ) እና እንደገና ሲጀመር ቦታ ያዢዎቹ ወደ ተመሳሳይ እሴቶች ይሰፋሉ።
የተመደቡት እሴቶች በእያንዳንዱ የተግባር ምሳሌ ላይ የተሰራውን ቁልፍ በመጠቀም ሊታዩ ይችላሉ። ደብዳቤ የመላክ ተግባር እንደዚህ ነው፡-

እና ስለዚህ መልእክት በመላክ ተግባር ላይ-

ለቅርብ ጊዜው ስሪት አብሮ የተሰሩ ማክሮዎች ሙሉ ዝርዝር እዚህ ይገኛል።
ከዚህም በላይ በተሰኪዎች እገዛ የራሳችንን ማክሮዎች ማወጅ እንችላለን፣ ግን ያ ሌላ ታሪክ ነው።
አስቀድሞ ከተገለጹት ነገሮች በተጨማሪ የእኛን ተለዋዋጮች እሴቶችን መተካት እንችላለን (ይህን ከዚህ በላይ ባለው ኮድ ውስጥ አስቀድሜ ተጠቀምኩበት)። ውስጥ እንፍጠር Admin/Variables ሁለት ነገሮች፡-

መጠቀም የሚችሉት ሁሉም ነገር:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')እሴቱ scalar ወይም JSON ሊሆን ይችላል። በJSON ጉዳይ፡-
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}ወደሚፈለገው ቁልፍ የሚወስደውን መንገድ ብቻ ይጠቀሙ፡- {{ var.json.bot_config.bot.token }}.
በጥሬው አንድ ቃል እናገራለሁ እና ስለ አንድ ቅጽበታዊ ገጽ እይታ አሳይሻለሁ። ግንኙነቶች. ሁሉም ነገር እዚህ የመጀመሪያ ደረጃ ነው: በገጹ ላይ Admin/Connections ግንኙነት እንፈጥራለን ፣ የመግቢያ / የይለፍ ቃሎቻችንን እና የበለጠ የተወሰኑ መለኪያዎችን እዚያ እንጨምራለን ። ልክ እንደዚህ:

የይለፍ ቃሎች መመስጠር ይችላሉ (ከነባሪው በበለጠ በደንብ) ፣ ወይም የግንኙነት አይነትን መተው ይችላሉ (እኔ እንዳደረግኩት) tg_main) - እውነታው ግን የዓይነቶቹ ዝርዝር በኤር ፍሰት ሞዴሎች ውስጥ የተጠናከረ እና ወደ ምንጭ ኮዶች ውስጥ ሳይገባ ሊሰፋ አይችልም (በድንገት የሆነ ነገር ጎግል ካላደረግኩ እባክዎን ያርሙኝ) ግን ክሬዲቶችን እንዳናገኝ የሚያግደን ምንም ነገር የለም ። ስም.
በተመሳሳይ ስም ብዙ ግንኙነቶችን ማድረግ ይችላሉ-በዚህ ሁኔታ, ዘዴው BaseHook.get_connection()በስም የሚያገናኘን, ይሰጠናል በዘፈቀደ ከበርካታ ስሞች (ሮውንድ ሮቢንን መስራት የበለጠ ምክንያታዊ ይሆናል, ነገር ግን በአየር ፍሰት ገንቢዎች ህሊና ላይ እንተወው).
ተለዋዋጮች እና ግንኙነቶች በእርግጠኝነት ጥሩ መሳሪያዎች ናቸው ፣ ግን ሚዛኑን ላለማጣት አስፈላጊ ነው-የትኞቹን የፍሰቶችዎን ክፍሎች በኮዱ ውስጥ ያከማቻሉ ፣ እና የትኞቹ ክፍሎች ለአየር ፍሰት ለማከማቻ እንደሚሰጡ። በአንድ በኩል እሴቱን በፍጥነት ለመለወጥ ምቹ ሊሆን ይችላል, ለምሳሌ, የፖስታ ሳጥን, በ UI. በሌላ በኩል, ይህ አሁንም ወደ የመዳፊት ጠቅታ መመለስ ነው, እኛ (እኔ) ልናስወግደው ፈለግን.
ከግንኙነቶች ጋር መስራት አንዱ ተግባር ነው። መንጠቆዎች. በአጠቃላይ የአየር ፍሰት መንጠቆዎች ከሶስተኛ ወገን አገልግሎቶች እና ቤተ-መጻሕፍት ጋር ለማገናኘት ነጥቦች ናቸው። ለምሳሌ፣ JiraHook ከጂራ ጋር እንድንገናኝ ደንበኛን ይከፍታል (ተግባራትን ወደ ኋላ እና ወደ ፊት ማንቀሳቀስ ይችላሉ) እና በ እገዛ SambaHook የአካባቢ ፋይልን ወደ ላይ መጫን ይችላሉ። smb- ነጥብ.
ብጁ ኦፕሬተርን መተንተን
እና እንዴት እንደተሰራ ለማየት ተቃርበናል። TelegramBotSendMessage
ኮድ commons/operators.py ከእውነተኛው ኦፕሬተር ጋር;
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)እዚህ፣ ልክ በአየር ፍሰት ውስጥ እንዳለ ሁሉም ነገር፣ ሁሉም ነገር በጣም ቀላል ነው።
- የተወረሰው
BaseOperatorጥቂት የአየር ፍሰት-ተኮር ነገሮችን የሚተገበር (የእረፍት ጊዜዎን ይመልከቱ) - የታወጁ መስኮች
template_fieldsጂንጃ ለማቀነባበር ማክሮዎችን የሚፈልግበት። - ትክክለኛ ክርክሮችን አዘጋጅቷል።
__init__(), አስፈላጊ ሆኖ ሲገኝ ነባሪዎችን ያዘጋጁ. - ስለ ቅድመ አያት አጀማመርም አልረሳንም።
- ተጓዳኝ መንጠቆውን ከፍቷል።
TelegramBotHookከእሱ የደንበኛ ነገር ተቀብሏል. - የተሻረ (የተገለጸ) ዘዴ
BaseOperator.execute()ኦፕሬተሩን ለመጀመር ጊዜው ሲደርስ የትኛው ኤርፎው ይንቀጠቀጣል - በእሱ ውስጥ መግባትን በመርሳት ዋናውን ተግባር እንተገብራለን. (በነገራችን ላይ በትክክል ገብተናልstdoutиstderr- የአየር ፍሰት ሁሉንም ነገር ያጠፋል ፣ በሚያምር ሁኔታ ይጠቀለላል ፣ አስፈላጊ በሚሆንበት ጊዜ ያበላሸዋል።)
ያለንን እንይ commons/hooks.py. የፋይሉ የመጀመሪያ ክፍል፣ መንጠቆው ራሱ፡-
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientእዚህ ምን ማብራራት እንዳለብኝ እንኳን አላውቅም፣ አስፈላጊዎቹን ነጥቦች ብቻ አስተውያለሁ፡-
- እኛ እንወርሳለን ፣ ስለ ክርክሮቹ አስቡ - በአብዛኛዎቹ ጉዳዮች አንድ ይሆናል
conn_id; - መደበኛ ዘዴዎችን ማለፍ፡ እራሴን ገድቤአለሁ።
get_conn(), በዚህ ውስጥ የግንኙነት መለኪያዎችን በስም አገኛለሁ እና ክፍሉን ብቻ አገኛለሁextra(ይህ የJSON መስክ ነው)፣ እኔ (በራሴ መመሪያ መሰረት!) የቴሌግራም ቦት ማስመሰያ አስቀመጥኩ፡{"bot_token": "YOuRAwEsomeBOtToKen"}. - የእኛን ምሳሌ እፈጥራለሁ
TelegramBot, የተወሰነ ምልክት በመስጠት.
ይኼው ነው. በመጠቀም ደንበኛን ከመንጠቆ ማግኘት ይችላሉ። TelegramBotHook().clent ወይም TelegramBotHook().get_conn().
እና የፋይሉ ሁለተኛ ክፍል ፣ ለቴሌግራም REST ኤፒአይ የማይክሮ መጠቅለያ የምሰራበት ፣ ተመሳሳይ ላለመጎተት ለአንድ ዘዴ sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))ትክክለኛው መንገድ ሁሉንም ማከል ነው-
TelegramBotSendMessage,TelegramBotHook,TelegramBot- በተሰኪው ውስጥ ፣ የህዝብ ማከማቻ ውስጥ ያስገቡ እና ለክፍት ምንጭ ይስጡት።
ይህን ሁሉ እያጠናን ሳለ የኛ ዘገባ ማሻሻያ በተሳካ ሁኔታ ሳይሳካ ቀርቷል እና በሰርጡ ላይ የስህተት መልእክት ላኩኝ። ስህተት መሆኑን ለማየት እሞክራለሁ...

በእኛ ዶጅ ውስጥ የሆነ ነገር ተሰበረ! ስንጠብቀው የነበረው ነገር አልነበረም? በትክክል!
ልትፈስ ነው?
የሆነ ነገር እንደናፈቀኝ ይሰማዎታል? ከSQL ሰርቨር ወደ ቬርቲካ ለማዛወር ቃል የገባ ይመስላል ከዛም ወስዶ ከርዕሱ ወጣ ወራዳ!
ይህ አረመኔያዊ ድርጊት ሆን ተብሎ የተፈፀመ ነበር፣ በቀላሉ አንዳንድ የቃላት አገባቦችን መፍታት ነበረብኝ። አሁን የበለጠ መሄድ ይችላሉ.
እቅዳችን ይህ ነበር፡-
- ዳግ አድርግ
- ተግባራትን መፍጠር
- ሁሉም ነገር እንዴት ቆንጆ እንደሆነ ይመልከቱ
- ለመሙላት የክፍለ-ጊዜ ቁጥሮችን ይመድቡ
- ከSQL አገልጋይ ውሂብ ያግኙ
- ውሂብን ወደ Vertica ያስገቡ
- ስታቲስቲክስን ይሰብስቡ
ስለዚህ፣ ይህንን ሁሉ ለማስኬድ፣ በእኛ ላይ ትንሽ ጭማሪ አድርጌያለሁ docker-compose.yml:
ዶከር-አቀናብር.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyእዚያ እናነሳለን-
- Vertica እንደ አስተናጋጅ
dwhበጣም በነባሪ ቅንጅቶች ፣ - ሶስት አጋጣሚዎች የ SQL አገልጋይ ፣
- በኋለኛው ውስጥ የውሂብ ጎታዎችን በአንዳንድ መረጃዎች እንሞላለን (በምንም ሁኔታ አይመልከቱ
mssql_init.py!)
ከባለፈው ጊዜ በበለጠ ትንሽ በተወሳሰበ ትእዛዝ በመታገዝ ሁሉንም መልካም ነገሮች እናስጀምራለን።
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3የእኛ ተአምር ራዶሚዘር የፈጠረው ፣ ንጥሉን መጠቀም ይችላሉ። Data Profiling/Ad Hoc Query:

ዋናው ነገር ለተንታኞች ማሳየት አይደለም
ላይ አብራራ የኢቲኤል ክፍለ ጊዜዎች አልፈልግም ፣ እዚያ ሁሉም ነገር ቀላል ነው-መሠረቱን እንሰራለን ፣ በእሱ ውስጥ ምልክት አለ ፣ ሁሉንም ነገር ከአውድ አስተዳዳሪ ጋር እናጠቃልላለን ፣ እና አሁን ይህንን እናደርጋለን
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15ክፍለ ጊዜ.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passጊዜው ደርሷል የእኛን ውሂብ ሰብስብ ከአንድ መቶ ተኩል ጠረጴዛዎቻችን. በጣም ትርጓሜ በሌላቸው መስመሮች እርዳታ ይህንን እናድርገው-
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- በመንጠቆ እርዳታ ከአየር ፍሰት እናገኛለን
pymssql- መገናኘት - በጥያቄው ውስጥ በቀን መልክ እገዳን እንተካ - በአብነት ሞተሩ ወደ ተግባሩ ይጣላል።
- ጥያቄያችንን መመገብ
pandasማን ያግዘናልDataFrame- ለወደፊቱ ይጠቅመናል.
ምትክ እየተጠቀምኩ ነው።
{dt}ከጥያቄ መለኪያ ይልቅ%sእኔ ክፉ ፒኖቺዮ ስለሆንኩ ሳይሆን ስለpandasመቋቋም አይችልምpymssqlእና የመጨረሻውን ያንሸራትታልparams: Listእሱ በእውነት ቢፈልግም።tuple.
እንዲሁም ገንቢውን ልብ ይበሉpymssqlእሱን ላለመደገፍ ወሰነ እና ለመልቀቅ ጊዜው አሁን ነው።pyodbc.
የአየር ፍሰት የተግባራችንን ክርክሮች በምን እንደሞላው እንመልከት፡-

ምንም ውሂብ ከሌለ, ከዚያ ለመቀጠል ምንም ፋይዳ የለውም. ነገር ግን መሙላትን በተሳካ ሁኔታ መቁጠር እንግዳ ነገር ነው. ግን ይህ ስህተት አይደለም. አ-አህ-አህ፣ ምን ይደረግ?! እና ምን እንደሆነ እነሆ፡-
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ምንም ስህተቶች እንደሌሉ ለአየር ፍሰት ይነግረናል ፣ ግን ተግባሩን እንዘለዋለን። በይነገጹ አረንጓዴ ወይም ቀይ ካሬ አይኖረውም, ግን ሮዝ.
ውሂባችንን እንወረውር በርካታ ዓምዶች:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])የሚታወቀው-
- ትእዛዙን የወሰድንበት ዳታቤዝ፣
- የጎርፍ ክፍለ ጊዜያችን መታወቂያ (የተለየ ይሆናል። ለእያንዳንዱ ተግባር),
- ከምንጩ እና የትዕዛዝ መታወቂያ ሀሽ - በመጨረሻው የውሂብ ጎታ (ሁሉም ነገር በአንድ ጠረጴዛ ውስጥ የሚፈስበት) ልዩ የትዕዛዝ መታወቂያ አለን ።
የመጨረሻው ደረጃ ይቀራል: ሁሉንም ነገር ወደ ቬርቲካ ያፈስሱ. እና፣ በሚገርም ሁኔታ፣ ይህን ለማድረግ በጣም አስደናቂ እና ቀልጣፋ ከሆኑ መንገዶች አንዱ በCSV በኩል ነው!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- ልዩ ተቀባይ እየሰራን ነው።
StringIO. pandasበደግነት የእኛን ያስቀምጣል።DataFrameበCSV- መስመሮች.- ከተወዳጅ ቬርቲካ ጋር ግንኙነትን በመንጠቆ እንክፈት።
- እና አሁን በእርዳታ
copy()ውሂባችንን በቀጥታ ወደ ቨርቲካ ይላኩ!
ምን ያህል መስመሮች እንደተሞሉ ከሾፌሩ እንወስዳለን እና ለክፍለ-ጊዜው አስተዳዳሪ ሁሉም ነገር ደህና እንደሆነ እንነግራለን።
session.loaded_rows = cursor.rowcount
session.successful = Trueይኼው ነው.
በሽያጭ ላይ, የታለመውን ሰሃን በእጅ እንፈጥራለን. እዚህ ለራሴ ትንሽ ማሽን ፈቀድኩኝ፡-
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)እየተጠቀምኩ ነው።
VerticaOperator()የውሂብ ጎታ ንድፍ እና ሠንጠረዥ እፈጥራለሁ (በእርግጥ እነሱ ከሌሉ). ዋናው ነገር ጥገኛዎቹን በትክክል ማዘጋጀት ነው-
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadማጠቃለል
- ደህና, - ትንሹ አይጥ አለ, - አይደለም, አሁን
እኔ በጫካ ውስጥ በጣም አስፈሪ እንስሳ እንደሆንኩ እርግጠኛ ነዎት?
ጁሊያ ዶናልድሰን ፣ ግሩፋሎ
እኔ እንደማስበው እኔና ባልደረቦቼ ውድድር ቢያጋጥመን፡ ማን በፍጥነት የኢቲኤልን ሂደት ከባዶ የሚፈጥር እና የሚጀምር፡ እነሱ በSSIS እና በመዳፊት እና እኔ ከአየር ፍሰት ጋር...እናም የጥገናን ቀላልነት እናነጻጽራለን ዋው፣ በሁሉም አቅጣጫ እንደምደበድባቸው የምትስማሙ ይመስለኛል!
ትንሽ የበለጠ በቁም ነገር ከሆነ, ከዚያም Apache Airflow - ሂደቶችን በፕሮግራም ኮድ መልክ በመግለጽ - ሥራዬን ሠራሁ. ብዙ የበለጠ ምቹ እና አስደሳች።
በውስጡ ያልተገደበ extensibility, ሁለቱም ተሰኪዎች እና scalability ያለውን ዝንባሌ አንፃር, በማንኛውም አካባቢ ማለት ይቻላል የአየር ፍሰት ለመጠቀም እድል ይሰጥዎታል: እንኳን, የመሰብሰብ, የማዘጋጀት እና ውሂብ ሂደት ሙሉ ዑደት ውስጥ, ሮኬቶችን (ወደ ማርስ, የ ኮርስ)።
የመጨረሻው ክፍል, ማጣቀሻ እና መረጃ
ለአንተ የሰበሰብንልህን መሰቅሰቂያ
start_date. አዎ፣ ይህ አስቀድሞ የአካባቢ ትውስታ ነው። በዶግ ዋና ክርክርstart_dateሁሉም ያልፋል። በአጭሩ፣ ከገለጹstart_dateየአሁኑ ቀን, እናschedule_interval- አንድ ቀን, ከዚያም DAG ነገ ምንም ቀደም ብሎ ይጀምራል.start_date = datetime(2020, 7, 7, 0, 1, 2)እና ምንም ተጨማሪ ችግሮች የሉም.
ከእሱ ጋር የተያያዘ ሌላ የአሂድ ጊዜ ስህተት አለ፡-
Task is missing the start_date parameter, ይህም ብዙውን ጊዜ ከዳግ ኦፕሬተር ጋር ማያያዝን እንደረሱ ያመለክታል.- ሁሉም በአንድ ማሽን ላይ. አዎ ፣ እና መሰረቶች (የአየር ፍሰት እራሱ እና የእኛ ሽፋን) ፣ እና የድር አገልጋይ ፣ እና የጊዜ ሰሌዳ አውጪ ፣ እና ሰራተኞች። እና እንዲያውም ሰርቷል. ነገር ግን ከጊዜ በኋላ የአገልግሎቶች ብዛት እየጨመረ እና PostgreSQL ከ 20 ms ይልቅ በ 5 ሰከንድ ውስጥ ለጠቋሚው ምላሽ መስጠት ሲጀምር, ወስደን ወሰድነው.
- የአካባቢ አስፈፃሚ. አዎን ገና በላዩ ላይ ተቀምጠናል, እና ቀድሞውኑ ወደ ገደል ጫፍ ደርሰናል. የአካባቢ አስፈፃሚ እስካሁን በቂ ሆኖልናል፣ አሁን ግን ቢያንስ ከአንድ ሰራተኛ ጋር የምንሰፋበት ጊዜ ነው፣ እና ወደ CeleryExecutor ለመሄድ ጠንክረን መስራት አለብን። እና በአንድ ማሽን ላይ ከእሱ ጋር አብሮ መስራት ከመቻልዎ አንጻር ሴሊሪን በአገልጋዩ ላይ እንኳን ከመጠቀም የሚያግድዎት ነገር የለም ፣ ይህም “በእርግጥ ፣ በእውነቱ ፣ በጭራሽ ወደ ምርት አይሄድም!”
- ያለመጠቀም አብሮገነብ መሳሪያዎች:
- ግንኙነቶች የአገልግሎት ምስክርነቶችን ለማከማቸት ፣
- SLA ናፈቀ በሰዓቱ ላልተሠሩ ሥራዎች ምላሽ መስጠት ፣
- xcom ለሜታዳታ ልውውጥ (አልኩት ሜታውሂብ!) በዳግ ተግባራት መካከል።
- የደብዳቤ አላግባብ መጠቀም። ደህና, ምን ማለት እችላለሁ? ለሁሉም የወደቁ ተግባራት ድግግሞሾች ማንቂያዎች ተዘጋጅተዋል። አሁን የእኔ ስራ Gmail>90k ኢሜይሎች ከአየር ፍሰት አለው፣ እና የድር ሜይል አፈሙዝ በአንድ ጊዜ ከ100 በላይ ለማንሳት እና ለመሰረዝ ፈቃደኛ አይሆንም።
ተጨማሪ ወጥመዶች:
ተጨማሪ አውቶማቲክ መሳሪያዎች
በእጃችን ሳይሆን በጭንቅላታችን እንድንሰራ የአየር ፍሰት ይህንን አዘጋጅቶልናል፡-
- - እሱ አሁንም የሙከራ ደረጃ አለው, እሱም እንዳይሠራ አያግደውም. በእሱ አማካኝነት ስለ ዳግስ እና ተግባራት መረጃን ብቻ ሳይሆን ማቆምም / መጀመር, DAG Run ወይም ገንዳ መፍጠር ይችላሉ.
- - ብዙ መሳሪያዎች በትዕዛዝ መስመሩ በኩል በWebUI ለመጠቀም የማይመቹ ብቻ ሳይሆን በአጠቃላይ የማይገኙ ናቸው። ለምሳሌ:
backfillየተግባር ሁኔታዎችን እንደገና ለማስጀመር ያስፈልጋል።
ለምሳሌ፣ ተንታኞች መጥተው እንዲህ አሉ፡- “እና አንተ ጓደኛ፣ ከጃንዋሪ 1 እስከ 13 ባለው መረጃ ላይ ትርጉም የለሽ ነገር አለህ! አስተካክል፣ አስተካክል፣ አስተካክል፣ አስተካክል! እና እርስዎ እንደዚህ አይነት ሆብ ነዎት:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- የመሠረት አገልግሎት;
initdb,resetdb,upgradedb,checkdb. run, ይህም አንድ ምሳሌ ተግባርን እንዲያካሂዱ እና በሁሉም ጥገኝነቶች ላይ እንኳን ውጤት እንዲያመጡ ያስችልዎታል. በተጨማሪም, በ በኩል ማሄድ ይችላሉLocalExecutorየሴሊየም ክላስተር ቢኖርዎትም.- በጣም ተመሳሳይ ነገር ያደርጋል
test, ብቻ ደግሞ ቤዝ ውስጥ ምንም አይጽፍም. connectionsከቅርፊቱ ብዙ ግንኙነቶችን ለመፍጠር ያስችላል።
- - ለፕለጊኖች የታሰበ እና በትንሽ እጆች የማይዋዥቅ የሐርድኮር የግንኙነት መንገድ። ግን እንዳንሄድ ማን ከለከለን።
/home/airflow/dags፣ ሩጡipythonእና መዘባረቅ ጀምር? ለምሳሌ ሁሉንም ግንኙነቶች በሚከተለው ኮድ ወደ ውጭ መላክ ይችላሉ፡from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - ከአየር ፍሰት ሜታዳታቤዝ ጋር በመገናኘት ላይ። እንዲጽፍለት አልመክርም ነገር ግን ለተለያዩ ልዩ መለኪያዎች የተግባር ሁኔታዎችን ማግኘት ከማንኛውም ኤፒአይዎች የበለጠ ፈጣን እና ቀላል ሊሆን ይችላል።
እንበል ሁሉም ተግባሮቻችን ኃይለኛ አይደሉም ፣ ግን አንዳንድ ጊዜ ሊወድቁ ይችላሉ ፣ እና ይህ የተለመደ ነው። ነገር ግን ጥቂት እገዳዎች ቀድሞውኑ አጠራጣሪ ናቸው, እና መፈተሽ አስፈላጊ ይሆናል.
ተጠንቀቅ SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ማጣቀሻዎች
እና በእርግጥ፣ ከ Google መውጣት የመጀመሪያዎቹ አስር አገናኞች የአየር ፍሰት አቃፊው ይዘቶች ከዕልባቶች ናቸው።
- - እርግጥ ነው, ከቢሮው መጀመር አለብን. ሰነዶች ፣ ግን መመሪያዎቹን ማን ያነባቸዋል?
- - ደህና, ቢያንስ ከፈጣሪዎች የተሰጡትን ምክሮች ያንብቡ.
- - በጣም ጅምር-በስዕሎች ውስጥ የተጠቃሚ በይነገጽ
- - መሰረታዊ ፅንሰ-ሀሳቦች በደንብ ተብራርተዋል, ከሆነ (በድንገት!) ከእኔ የሆነ ነገር አልገባህም.
- - የአየር ፍሰት ክላስተር ለማቋቋም አጭር መመሪያ።
- - ከሞላ ጎደል ተመሳሳይ ሳቢ ጽሑፍ, ምናልባት ተጨማሪ formalism, እና ጥቂት ምሳሌዎች በስተቀር.
- - ከሴሊየሪ ጋር በመተባበር ስለመሥራት.
- - ስለ ተግባራት አቅም ፣ ከቀን ይልቅ መታወቂያ መጫን ፣ መለወጥ ፣ የፋይል አወቃቀር እና ሌሎች አስደሳች ነገሮች።
- - በማለፍ ላይ ብቻ የጠቀስኩት የተግባሮች ጥገኝነት እና ቀስቃሽ ደንብ።
- - በጊዜ መርሐግብር ውስጥ አንዳንድ "እንደታሰበው ይሰራል" እንዴት ማሸነፍ እንደሚቻል, የጠፋውን ውሂብ መጫን እና ቅድሚያ መስጠት.
- - ጠቃሚ የSQL ጥያቄዎች ለአየር ፍሰት ሜታዳታ።
- - ብጁ ዳሳሽ ስለመፍጠር ጠቃሚ ክፍል አለ።
- - በAWS ለዳታ ሳይንስ መሠረተ ልማት ስለመገንባት አስደሳች አጭር ማስታወሻ።
- - የተለመዱ ስህተቶች (አንድ ሰው አሁንም መመሪያውን ሳያነብ ሲቀር).
- - ምንም እንኳን ግንኙነቶችን ብቻ መጠቀም የምትችል ቢሆንም ሰዎች የይለፍ ቃሎችን እንዴት እንደሚያከማቻሉ ፈገግ ይበሉ።
- - ስውር የDAG ማስተላለፍ፣ ዐውደ-ጽሑፍ ተግባራትን መጣል፣ እንደገና ስለ ጥገኞች እና እንዲሁም የተግባር ጅምርን ስለ መዝለል።
- - ስለ አጠቃቀሙ
default argumentsиparamsበአብነት, እንዲሁም ተለዋዋጮች እና ግንኙነቶች. - እቅድ አውጪው ለአየር ፍሰት 2.0 እንዴት እየተዘጋጀ እንዳለ ታሪክ።
- - የእኛን ዘለላ ወደ ውስጥ ስለማሰማራት ትንሽ ጊዜ ያለፈበት መጣጥፍ
docker-compose. - - አብነቶችን እና አውድ ማስተላለፍን በመጠቀም ተለዋዋጭ ተግባራት።
- - መደበኛ እና ብጁ ማሳወቂያዎች በፖስታ እና በ Slack።
- - የቅርንጫፍ ስራዎች, ማክሮዎች እና XCom.
እና በአንቀጹ ውስጥ ጥቅም ላይ የዋሉ አገናኞች-
- - በአብነት ውስጥ ጥቅም ላይ የሚውሉ ቦታዎች ያዢዎች።
- - ዳጎችን ሲፈጥሩ የተለመዱ ስህተቶች.
- -
docker-composeለሙከራ, ለማረም እና ለሌሎችም. - - የፓይዘን መጠቅለያ ለቴሌግራም REST API።
ምንጭ: hab.com




