வணக்கம், நான் டிமிட்ரி லோக்வினென்கோ - Vezet குழும நிறுவனங்களின் பகுப்பாய்வுத் துறையின் தரவுப் பொறியாளர்.
ETL செயல்முறைகளை உருவாக்குவதற்கான ஒரு அற்புதமான கருவியைப் பற்றி நான் உங்களுக்கு சொல்கிறேன் - Apache Airflow. ஆனால் காற்றோட்டம் மிகவும் பல்துறை மற்றும் பன்முகத்தன்மை கொண்டது, நீங்கள் தரவு ஓட்டங்களில் ஈடுபடாவிட்டாலும் அதை உன்னிப்பாகப் பார்க்க வேண்டும், ஆனால் அவ்வப்போது எந்த செயல்முறையையும் துவக்கி அவற்றின் செயல்பாட்டைக் கண்காணிக்க வேண்டும்.
ஆம், நான் சொல்லுவது மட்டுமல்லாமல், காண்பிப்பேன்: நிரலில் நிறைய குறியீடு, ஸ்கிரீன் ஷாட்கள் மற்றும் பரிந்துரைகள் உள்ளன.

ஏர்ஃப்ளோ / விக்கிமீடியா காமன்ஸ் என்ற வார்த்தையை கூகிள் செய்யும் போது நீங்கள் வழக்கமாகப் பார்ப்பது
உள்ளடக்க அட்டவணை
அறிமுகம்
அப்பாச்சி காற்றோட்டம் ஜாங்கோவைப் போன்றது:
- மலைப்பாம்பில் எழுதப்பட்டது
- ஒரு சிறந்த நிர்வாக குழு உள்ளது,
- காலவரையின்றி விரிவாக்கக்கூடியது
- மட்டுமே சிறந்தது, மேலும் இது முற்றிலும் மாறுபட்ட நோக்கங்களுக்காக உருவாக்கப்பட்டது, அதாவது (கேட் முன் எழுதப்பட்டதைப் போல):
- வரம்பற்ற இயந்திரங்களில் பணிகளை இயக்குதல் மற்றும் கண்காணித்தல் (பல செலரி / குபெர்னெட்ஸ் மற்றும் உங்கள் மனசாட்சி உங்களை அனுமதிக்கும்)
- பைதான் குறியீட்டை எழுதுவதற்கும் புரிந்துகொள்வதற்கும் மிகவும் எளிதாக இருந்து மாறும் பணிப்பாய்வு உருவாக்கத்துடன்
- மற்றும் ஆயத்த கூறுகள் மற்றும் வீட்டில் தயாரிக்கப்பட்ட செருகுநிரல்கள் (இது மிகவும் எளிமையானது) இரண்டையும் பயன்படுத்தி எந்த தரவுத்தளங்களையும் API களையும் ஒன்றோடொன்று இணைக்கும் திறன்.
நாங்கள் இப்படி Apache Airflow பயன்படுத்துகிறோம்:
- DWH மற்றும் ODS இல் (எங்களிடம் Vertica மற்றும் Clickhouse உள்ளது) பல்வேறு மூலங்களிலிருந்து (பல SQL சர்வர் மற்றும் PostgreSQL நிகழ்வுகள், பயன்பாட்டு அளவீடுகளுடன் கூடிய பல்வேறு APIகள், 1C கூட) தரவைச் சேகரிக்கிறோம்.
- எவ்வளவு முன்னேறியது
cron, இது ODS இல் தரவு ஒருங்கிணைப்பு செயல்முறைகளைத் தொடங்குகிறது, மேலும் அவற்றின் பராமரிப்பையும் கண்காணிக்கிறது.
சமீப காலம் வரை, 32 கோர்கள் மற்றும் 50 ஜிபி ரேம் கொண்ட ஒரு சிறிய சர்வரால் எங்கள் தேவைகள் பூர்த்தி செய்யப்பட்டன. காற்றோட்டத்தில், இது வேலை செய்கிறது:
- மேலும் 200 டேக்ஸ் (உண்மையில் பணிப்பாய்வுகள், இதில் நாங்கள் பணிகளை அடைத்துள்ளோம்)
- ஒவ்வொன்றிலும் சராசரியாக 70 பணிகள்,
- இந்த நன்மை தொடங்குகிறது (சராசரியாகவும்) ஒரு மணி நேரத்திற்கு ஒரு முறை.
நாங்கள் எவ்வாறு விரிவாக்கினோம் என்பது பற்றி, நான் கீழே எழுதுவேன், ஆனால் இப்போது நாம் தீர்க்கும் über-பிரச்சினையை வரையறுப்போம்:
மூன்று மூல SQL சேவையகங்கள் உள்ளன, ஒவ்வொன்றும் 50 தரவுத்தளங்களைக் கொண்டவை - முறையே ஒரு திட்டத்தின் நிகழ்வுகள், அவை ஒரே அமைப்பைக் கொண்டுள்ளன (கிட்டத்தட்ட எல்லா இடங்களிலும், mua-ha-ha), அதாவது ஒவ்வொன்றும் ஒரு ஆர்டர் அட்டவணையைக் கொண்டுள்ளது (அதிர்ஷ்டவசமாக, அதனுடன் ஒரு அட்டவணை எந்த வியாபாரத்திலும் பெயர் தள்ளப்படலாம்). சேவை புலங்களை (மூல சேவையகம், மூல தரவுத்தளம், ETL பணி ஐடி) சேர்ப்பதன் மூலம் நாங்கள் தரவை எடுத்து, வெர்டிகாவில் அப்பாவியாக வீசுகிறோம்.
போகலாம்!
முக்கிய பகுதி, நடைமுறை (மற்றும் ஒரு சிறிய கோட்பாட்டு)
நாங்கள் ஏன் (நீங்களும்)
மரங்கள் பெரியதாகவும் நான் எளிமையாகவும் இருந்தபோது SQL-ஸ்கிக் ஒரு ரஷ்ய சில்லறை விற்பனையில், எங்களிடம் உள்ள இரண்டு கருவிகளைப் பயன்படுத்தி ETL செயல்முறைகளை நாங்கள் மோசடி செய்தோம்:
- தகவல் சக்தி மையம் - மிகவும் பரவலான அமைப்பு, மிகவும் உற்பத்தித் திறன், அதன் சொந்த வன்பொருள், அதன் சொந்த பதிப்பு. நான் கடவுள் தடை 1% அதன் திறன்களை பயன்படுத்தினார். ஏன்? சரி, முதலில், இந்த இடைமுகம், எங்காவது 380 களில் இருந்து, மனரீதியாக எங்களுக்கு அழுத்தம் கொடுத்தது. இரண்டாவதாக, இந்த முரண்பாடானது மிகவும் ஆடம்பரமான செயல்முறைகள், கோபமான கூறுகளை மறுபயன்பாடு மற்றும் பிற மிக முக்கியமான-நிறுவன-தந்திரங்களுக்கு வடிவமைக்கப்பட்டுள்ளது. ஆண்டுக்கு ஏர்பஸ் ஏ XNUMX இன் இறக்கை போன்ற அதன் விலையைப் பற்றி நாங்கள் எதுவும் சொல்ல மாட்டோம்.
ஜாக்கிரதை, ஸ்கிரீன்ஷாட் 30 வயதிற்குட்பட்டவர்களை கொஞ்சம் காயப்படுத்தும்

- SQL சர்வர் ஒருங்கிணைப்பு சேவையகம் - இந்த தோழரை எங்கள் உள் திட்ட ஓட்டங்களில் பயன்படுத்தினோம். சரி, உண்மையில்: நாங்கள் ஏற்கனவே SQL சேவையகத்தைப் பயன்படுத்துகிறோம், மேலும் அதன் ETL கருவிகளைப் பயன்படுத்தாமல் இருப்பது எப்படியோ நியாயமற்றது. அதில் உள்ள அனைத்தும் நன்றாக உள்ளன: இடைமுகம் அழகாக இருக்கிறது, மற்றும் முன்னேற்ற அறிக்கைகள் ... ஆனால் இதற்காக நாங்கள் மென்பொருள் தயாரிப்புகளை விரும்புவதில்லை, ஓ, இதற்காக அல்ல. அதை பதிப்பு
dtsx(சேமித்ததில் கணுக்கள் மாற்றப்பட்ட எக்ஸ்எம்எல்) நம்மால் முடியும், ஆனால் என்ன பயன்? நூற்றுக்கணக்கான டேபிள்களை ஒரு சர்வரில் இருந்து மற்றொன்றுக்கு இழுக்கும் பணித் தொகுப்பை உருவாக்குவது எப்படி? ஆம், என்ன நூறு, உங்கள் ஆள்காட்டி விரல் இருபது துண்டுகளிலிருந்து விழும், மவுஸ் பொத்தானைக் கிளிக் செய்க. ஆனால் அது நிச்சயமாக மிகவும் நாகரீகமாக தெரிகிறது:
நாங்கள் நிச்சயமாக வழிகளைத் தேடினோம். வழக்கு கூட கிட்டத்தட்ட சுயமாக எழுதப்பட்ட SSIS தொகுப்பு ஜெனரேட்டருக்கு வந்தது ...
பின்னர் ஒரு புதிய வேலை எனக்கு கிடைத்தது. அப்பாச்சி ஏர்ஃப்ளோ என்னை முந்தியது.
ETL செயல்முறை விளக்கங்கள் எளிமையான பைதான் குறியீடு என்பதை நான் கண்டறிந்ததும், நான் மகிழ்ச்சிக்காக நடனமாடவில்லை. இப்படித்தான் டேட்டா ஸ்ட்ரீம்கள் பதிப்பு செய்யப்பட்டு வேறுபட்டது, மேலும் நூற்றுக்கணக்கான தரவுத்தளங்களிலிருந்து ஒரே கட்டமைப்பைக் கொண்ட டேபிள்களை ஒரு இலக்கில் ஊற்றுவது ஒன்றரை அல்லது இரண்டு 13 ”திரைகளில் பைதான் குறியீட்டின் விஷயமாக மாறியது.
கிளஸ்டரை அசெம்பிள் செய்தல்
முற்றிலும் மழலையர் பள்ளியை ஏற்பாடு செய்ய வேண்டாம், ஏர்ஃப்ளோ, நீங்கள் தேர்ந்தெடுத்த தரவுத்தளம், செலரி மற்றும் கப்பல்துறைகளில் விவரிக்கப்பட்டுள்ள பிற நிகழ்வுகளை நிறுவுதல் போன்ற முற்றிலும் வெளிப்படையான விஷயங்களைப் பற்றி இங்கு பேச வேண்டாம்.
நாம் உடனடியாக சோதனைகளைத் தொடங்கலாம் என்று, நான் வரைந்தேன் docker-compose.yml இதில்:
- உண்மையில் உயர்த்துவோம் காற்றோட்டம்: திட்டமிடுபவர், வெப்சர்வர். செலரி பணிகளைக் கண்காணிக்க பூவும் அங்கு சுழன்று கொண்டிருக்கும் (ஏனென்றால் அது ஏற்கனவே தள்ளப்பட்டுள்ளது
apache/airflow:1.10.10-python3.7, ஆனால் நாங்கள் கவலைப்படவில்லை) - போஸ்ட்கெரே, இதில் Airflow அதன் சேவைத் தகவலை எழுதும் (திட்டமிடல் தரவு, செயல்படுத்தல் புள்ளிவிவரங்கள், முதலியன), மற்றும் செலரி முடிக்கப்பட்ட பணிகளைக் குறிக்கும்;
- Redis, இது செலரிக்கு பணி தரகராக செயல்படும்;
- செலரி தொழிலாளி, இது பணிகளை நேரடியாக நிறைவேற்றுவதில் ஈடுபடும்.
- கோப்புறைக்கு
./dagsடாக்ஸின் விளக்கத்துடன் எங்கள் கோப்புகளைச் சேர்ப்போம். அவை பறக்கும்போது எடுக்கப்படும், எனவே ஒவ்வொரு தும்மலுக்குப் பிறகும் முழு அடுக்கையும் ஏமாற்ற வேண்டிய அவசியமில்லை.
சில இடங்களில், எடுத்துக்காட்டுகளில் உள்ள குறியீடு முழுமையாகக் காட்டப்படவில்லை (உரையை ஒழுங்கீனம் செய்யாமல் இருக்க), ஆனால் எங்காவது அது செயல்பாட்டில் மாற்றியமைக்கப்படுகிறது. முழுமையான வேலை குறியீடு எடுத்துக்காட்டுகளை களஞ்சியத்தில் காணலாம் .
கூலியாள்-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerகருத்துக்கள்:
- கலவையின் சட்டசபையில், நான் பெரும்பாலும் நன்கு அறியப்பட்ட படத்தை நம்பியிருந்தேன் - அதை சரிபார்க்கவும். ஒருவேளை உங்கள் வாழ்க்கையில் உங்களுக்கு வேறு எதுவும் தேவையில்லை.
- அனைத்து ஏர்ஃப்ளோ அமைப்புகளும் மூலம் மட்டும் கிடைக்காது
airflow.cfg, ஆனால் சுற்றுச்சூழல் மாறிகள் மூலமாகவும் (டெவலப்பர்களுக்கு நன்றி), நான் தீங்கிழைக்கும் வகையில் பயன்படுத்திக்கொண்டேன். - இயற்கையாகவே, இது உற்பத்திக்கு தயாராக இல்லை: நான் வேண்டுமென்றே கொள்கலன்களில் இதயத் துடிப்பை வைக்கவில்லை, பாதுகாப்பில் நான் கவலைப்படவில்லை. ஆனால் எங்கள் பரிசோதனையாளர்களுக்கு குறைந்தபட்சம் பொருத்தமானதை நான் செய்தேன்.
- குறிப்பு:
- டேக் கோப்புறையை திட்டமிடுபவர் மற்றும் தொழிலாளர்கள் இருவரும் அணுகக்கூடியதாக இருக்க வேண்டும்.
- அனைத்து மூன்றாம் தரப்பு நூலகங்களுக்கும் இது பொருந்தும் - அவை அனைத்தும் திட்டமிடுபவர் மற்றும் பணியாளர்களைக் கொண்ட இயந்திரங்களில் நிறுவப்பட வேண்டும்.
சரி, இப்போது இது எளிது:
$ docker-compose up --scale worker=3எல்லாம் உயர்ந்த பிறகு, நீங்கள் இணைய இடைமுகங்களைப் பார்க்கலாம்:
- காற்றோட்டம்:
- பூ:
அடிப்படை கருத்துக்கள்
இந்த எல்லா "டாக்களிலும்" உங்களுக்கு எதுவும் புரியவில்லை என்றால், இங்கே ஒரு சிறிய அகராதி உள்ளது:
- திட்டமிடுதல் - ஏர்ஃப்ளோவில் மிக முக்கியமான மாமா, ரோபோக்கள் கடினமாக உழைப்பதைக் கட்டுப்படுத்துகிறார், ஆனால் ஒரு நபர் அல்ல: அட்டவணையை கண்காணிக்கிறது, டேக்களைப் புதுப்பிக்கிறது, பணிகளைத் தொடங்குகிறது.
பொதுவாக, பழைய பதிப்புகளில், அவருக்கு நினைவகத்தில் சிக்கல்கள் இருந்தன (இல்லை, மறதி அல்ல, ஆனால் கசிவுகள்) மற்றும் மரபு அளவுரு கட்டமைப்புகளில் கூட இருந்தது.
run_duration- அதன் மறுதொடக்கம் இடைவெளி. ஆனால் இப்போது எல்லாம் சரியாகிவிட்டது. - DAG உத்தியோகத்தர் (aka "dag") - "இயக்கிய அசைக்ளிக் வரைபடம்", ஆனால் அத்தகைய வரையறை சில நபர்களுக்குச் சொல்லும், ஆனால் உண்மையில் இது ஒருவருக்கொருவர் தொடர்பு கொள்ளும் பணிகளுக்கான கொள்கலன் (கீழே காண்க) அல்லது SSIS இல் தொகுப்பு மற்றும் இன்ஃபர்மேட்டிகாவில் பணிப்பாய்வு ஆகியவற்றின் அனலாக் ஆகும். .
டாக்ஸைத் தவிர, இன்னும் சப்டேக்குகள் இருக்கலாம், ஆனால் நாம் பெரும்பாலும் அவற்றைப் பெற மாட்டோம்.
- DAG ரன் - துவக்கப்பட்ட டாக், இது அதன் சொந்தமாக ஒதுக்கப்பட்டுள்ளது
execution_date. அதே டாக்கின் டாக்ரான்கள் இணையாக வேலை செய்யலாம் (உங்கள் பணிகளை நீங்கள் திறமையற்றதாக செய்திருந்தால், நிச்சயமாக). - ஆபரேட்டர் ஒரு குறிப்பிட்ட செயலைச் செய்வதற்குப் பொறுப்பான குறியீட்டுத் துண்டுகள். மூன்று வகையான ஆபரேட்டர்கள் உள்ளனர்:
- நடவடிக்கைநமக்கு பிடித்தது போல்
PythonOperator, எந்த (செல்லுபடியாகும்) பைதான் குறியீட்டையும் இயக்க முடியும்; - பரிமாற்ற, எந்த இடத்திலிருந்து இடத்திற்கு தரவை கொண்டு செல்கிறது, சொல்லுங்கள்,
MsSqlToHiveTransfer; - சென்சார் மறுபுறம், இது ஒரு நிகழ்வு நிகழும் வரை டாக்கை மேலும் செயல்படுத்துவதை எதிர்வினை செய்ய அல்லது மெதுவாக்க உங்களை அனுமதிக்கும்.
HttpSensorகுறிப்பிட்ட இறுதிப் புள்ளியை இழுக்க முடியும், மேலும் விரும்பிய பதில் காத்திருக்கும் போது, பரிமாற்றத்தைத் தொடங்கவும்GoogleCloudStorageToS3Operator. ஒரு ஆர்வமுள்ள மனம் கேட்கும்: "ஏன்? எல்லாவற்றிற்கும் மேலாக, நீங்கள் ஆபரேட்டரில் மீண்டும் மீண்டும் செய்யலாம்!" பின்னர், இடைநிறுத்தப்பட்ட ஆபரேட்டர்களுடன் பணிகளின் குளத்தை அடைக்கக்கூடாது என்பதற்காக. அடுத்த முயற்சிக்கு முன் சென்சார் தொடங்குகிறது, சரிபார்க்கிறது மற்றும் இறக்கிறது.
- நடவடிக்கைநமக்கு பிடித்தது போல்
- டாஸ்க் - அறிவிக்கப்பட்ட ஆபரேட்டர்கள், வகையைப் பொருட்படுத்தாமல், மற்றும் டாக் உடன் இணைக்கப்பட்டவர்கள் பணியின் தரத்திற்கு உயர்த்தப்படுகிறார்கள்.
- பணி நிகழ்வு - பொதுத் திட்டமிடுபவர், நடிகர்-தொழிலாளர்களுக்குப் போருக்குப் பணிகளை அனுப்புவதற்கான நேரம் இது என்று முடிவு செய்தபோது (அந்த இடத்திலேயே, நாங்கள் பயன்படுத்தினால்.
LocalExecutorஅல்லது வழக்கில் தொலை முனைக்குCeleryExecutor), இது அவர்களுக்கு ஒரு சூழலை ஒதுக்குகிறது (அதாவது, மாறிகளின் தொகுப்பு - செயல்படுத்தும் அளவுருக்கள்), கட்டளை அல்லது வினவல் வார்ப்புருக்களை விரிவுபடுத்துகிறது, மேலும் அவற்றைக் கூட்டுகிறது.
நாங்கள் பணிகளை உருவாக்குகிறோம்
முதலில், எங்கள் டக்கின் பொதுவான திட்டத்தை கோடிட்டுக் காட்டுவோம், பின்னர் மேலும் மேலும் விவரங்களுக்கு முழுக்கு போடுவோம், ஏனென்றால் நாங்கள் சில அற்பமான தீர்வுகளைப் பயன்படுத்துகிறோம்.
எனவே, அதன் எளிய வடிவத்தில், அத்தகைய டாக் இப்படி இருக்கும்:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)அதைக் கண்டுபிடிப்போம்:
- முதலில், தேவையான லிப்களை இறக்குமதி செய்கிறோம் வேறு ஏதாவது;
sql_server_ds- அதுList[namedtuple[str, str]]ஏர்ஃப்ளோ இணைப்புகளில் இருந்து இணைப்புகளின் பெயர்கள் மற்றும் தரவுத்தளங்களில் இருந்து நாங்கள் எங்கள் தட்டை எடுக்கிறோம்;dag- எங்கள் டக் அறிவிப்பு, இது அவசியம் இருக்க வேண்டும்globals(), இல்லையெனில் Airflow அதைக் கண்டுபிடிக்காது. டக் மேலும் சொல்ல வேண்டும்:- அவன் பெயர் என்ன
orders- இந்த பெயர் பின்னர் இணைய இடைமுகத்தில் தோன்றும், - அவர் ஜூலை எட்டாம் தேதி நள்ளிரவு முதல் வேலை செய்வார் என்று,
- மேலும் இது தோராயமாக ஒவ்வொரு 6 மணி நேரத்திற்கும் ஒருமுறை இயங்க வேண்டும் (இங்கு கடினமான நபர்களுக்கு பதிலாக
timedelta()அனுமதிக்கப்பட்டதுcron-வரி0 0 0/6 ? * * *, குறைந்த குளிர் - போன்ற ஒரு வெளிப்பாடு@daily);
- அவன் பெயர் என்ன
workflow()முக்கிய வேலையைச் செய்வார், ஆனால் இப்போது இல்லை. இப்போதைக்கு, எங்கள் சூழலை பதிவில் போடுவோம்.- இப்போது பணிகளை உருவாக்கும் எளிய மந்திரம்:
- நாங்கள் எங்கள் ஆதாரங்கள் மூலம் ஓடுகிறோம்;
- துவக்க
PythonOperator, இது எங்கள் டம்மியை செயல்படுத்தும்workflow(). பணியின் ஒரு தனித்துவமான (டாக்கிற்குள்) பெயரைக் குறிப்பிட மறந்துவிடாதீர்கள் மற்றும் டாக்கையே கட்டவும். கொடிprovide_contextஇதையொட்டி, செயல்பாட்டில் கூடுதல் வாதங்களை ஊற்றுவோம், அதை நாங்கள் கவனமாகப் பயன்படுத்தி சேகரிப்போம்**context.
இப்போதைக்கு அவ்வளவுதான். எங்களுக்கு கிடைத்தது:
- வலை இடைமுகத்தில் புதிய டாக்,
- ஒன்றரை நூறு பணிகள் இணையாக செயல்படுத்தப்படும் (காற்றோட்டம், செலரி அமைப்புகள் மற்றும் சேவையக திறன் அனுமதித்தால்).
சரி, கிட்டத்தட்ட கிடைத்தது.

சார்புகளை யார் நிறுவுவார்கள்?
இந்த முழு விஷயத்தையும் எளிமைப்படுத்த, நான் திருகினேன் docker-compose.yml செயலாக்கம் requirements.txt அனைத்து முனைகளிலும்.
இப்போது அது போய்விட்டது:

சாம்பல் சதுரங்கள் என்பது அட்டவணையாளரால் செயலாக்கப்பட்ட பணி நிகழ்வுகள்.
நாங்கள் சிறிது காத்திருக்கிறோம், வேலைகள் தொழிலாளர்களால் முறியடிக்கப்படுகின்றன:

பசுமையானவை, நிச்சயமாக, தங்கள் வேலையை வெற்றிகரமாக முடித்துவிட்டன. சிவப்பு மிகவும் வெற்றிகரமாக இல்லை.
மூலம், எங்கள் தயாரிப்பில் எந்த கோப்புறையும் இல்லை
./dags, இயந்திரங்களுக்கிடையில் ஒத்திசைவு இல்லை - அனைத்து டாக்களும் உள்ளனgitஎங்கள் Gitlab இல், மற்றும் Gitlab CI இணைக்கும்போது இயந்திரங்களுக்கு புதுப்பிப்புகளை விநியோகிக்கிறதுmaster.
பூவைப் பற்றி கொஞ்சம்
வேலையாட்கள் நமது அமைதிகாட்டிகளை அடித்து நொறுக்கும்போது, நமக்கு எதையாவது காட்டக்கூடிய மற்றொரு கருவியை நினைவில் கொள்வோம் - மலர்.
தொழிலாளர் முனைகள் பற்றிய சுருக்கமான தகவலுடன் முதல் பக்கம்:

வேலைக்குச் சென்ற பணிகளைக் கொண்ட மிகவும் தீவிரமான பக்கம்:

எங்கள் தரகரின் நிலையைக் கொண்ட மிகவும் சலிப்பான பக்கம்:

பணி நிலை வரைபடங்கள் மற்றும் அவற்றின் செயலாக்க நேரத்துடன் கூடிய பிரகாசமான பக்கம்:

குறைந்த சுமைகளை நாங்கள் ஏற்றுகிறோம்
எனவே, அனைத்து பணிகளும் முடிந்துவிட்டன, நீங்கள் காயமடைந்தவர்களை எடுத்துச் செல்லலாம்.

மற்றும் பலர் காயமடைந்தனர் - ஒரு காரணத்திற்காக அல்லது இன்னொரு காரணத்திற்காக. காற்றோட்டத்தின் சரியான பயன்பாட்டின் விஷயத்தில், இந்த சதுரங்கள் தரவு நிச்சயமாக வரவில்லை என்பதைக் குறிக்கிறது.
நீங்கள் பதிவைப் பார்த்து, விழுந்த பணி நிகழ்வுகளை மறுதொடக்கம் செய்ய வேண்டும்.
எந்தச் சதுரத்திலும் கிளிக் செய்வதன் மூலம், நமக்குக் கிடைக்கும் செயல்களைக் காண்போம்:

நீங்கள் எடுத்து கீழே விழுந்ததை அழிக்கலாம். அதாவது, அங்கு ஏதோ தோல்வியுற்றதை நாம் மறந்துவிடுகிறோம், அதே நிகழ்வின் பணி திட்டமிடுபவருக்குச் செல்லும்.

அனைத்து சிவப்பு சதுரங்களுடனும் சுட்டியைக் கொண்டு இதைச் செய்வது மிகவும் மனிதாபிமானம் அல்ல என்பது தெளிவாகிறது - இது ஏர்ஃப்ளோவிலிருந்து நாம் எதிர்பார்ப்பது அல்ல. இயற்கையாகவே, எங்களிடம் பேரழிவு ஆயுதங்கள் உள்ளன: Browse/Task Instances

எல்லாவற்றையும் ஒரே நேரத்தில் தேர்ந்தெடுத்து பூஜ்ஜியத்திற்கு மீட்டமைப்போம், சரியான உருப்படியைக் கிளிக் செய்யவும்:

சுத்தம் செய்த பிறகு, எங்கள் டாக்சிகள் இப்படி இருக்கும் (அவை ஏற்கனவே திட்டமிடுபவருக்காகக் காத்திருக்கின்றன):

இணைப்புகள், கொக்கிகள் மற்றும் பிற மாறிகள்
அடுத்த டிஏஜியைப் பார்க்க வேண்டிய நேரம் இது, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]எல்லோரும் எப்போதாவது அறிக்கை புதுப்பித்தலைச் செய்திருக்கிறார்களா? இது மீண்டும் அவளே: தரவைப் பெறுவதற்கான ஆதாரங்களின் பட்டியல் உள்ளது; எங்கு வைக்க வேண்டும் என்று ஒரு பட்டியல் உள்ளது; எல்லாம் நடந்தபோது அல்லது உடைந்தபோது ஒலிக்க மறக்காதீர்கள் (சரி, இது எங்களைப் பற்றியது அல்ல, இல்லை).
மீண்டும் கோப்பினைச் சென்று புதிய தெளிவற்ற விஷயங்களைப் பார்ப்போம்:
from commons.operators import TelegramBotSendMessage- தடை நீக்கப்பட்டவர்களுக்கு செய்திகளை அனுப்புவதற்கு ஒரு சிறிய ரேப்பரை உருவாக்குவதன் மூலம் எங்கள் சொந்த ஆபரேட்டர்களை உருவாக்குவதிலிருந்து எதுவும் எங்களைத் தடுக்காது. (இந்த ஆபரேட்டரைப் பற்றி கீழே பேசுவோம்);default_args={}- dag அதன் அனைத்து ஆபரேட்டர்களுக்கும் ஒரே மாதிரியான வாதங்களை விநியோகிக்க முடியும்;to='{{ var.value.all_the_kings_men }}'- புலம்toஎங்களிடம் ஹார்ட்கோட் இல்லை, ஆனால் ஜின்ஜாவைப் பயன்படுத்தி மாறும் வகையில் உருவாக்கப்படும் மற்றும் மின்னஞ்சல்களின் பட்டியலுடன் ஒரு மாறி, நான் கவனமாக உள்ளிடுவேன்Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- ஆபரேட்டரைத் தொடங்குவதற்கான நிபந்தனை. எங்கள் விஷயத்தில், அனைத்து சார்புகளும் செயல்பட்டால் மட்டுமே முதலாளிகளுக்கு கடிதம் பறக்கும் வெற்றிகரமாக;tg_bot_conn_id='tg_main'- வாதங்கள்conn_idநாங்கள் உருவாக்கும் இணைப்பு ஐடிகளை ஏற்கவும்Admin/Connections;trigger_rule=TriggerRule.ONE_FAILED- விழுந்த பணிகள் இருந்தால் மட்டுமே டெலிகிராமில் உள்ள செய்திகள் பறந்து செல்லும்;task_concurrency=1- ஒரு பணியின் பல பணி நிகழ்வுகளை ஒரே நேரத்தில் தொடங்குவதை நாங்கள் தடைசெய்கிறோம். இல்லையெனில், பலவற்றை ஒரே நேரத்தில் தொடங்குவோம்VerticaOperator(ஒரு மேசையைப் பார்த்து);report_update >> [email, tg]- அனைத்துVerticaOperatorகடிதங்கள் மற்றும் செய்திகளை அனுப்புவதில் ஒன்றிணைந்து, இது போன்றது:

ஆனால் அறிவிப்பாளர் ஆபரேட்டர்கள் வெவ்வேறு வெளியீட்டு நிலைமைகளைக் கொண்டிருப்பதால், ஒன்று மட்டுமே வேலை செய்யும். ட்ரீ வியூவில், எல்லாம் கொஞ்சம் குறைவாகவே காட்சியளிக்கிறது:

பற்றி சில வார்த்தைகள் கூறுவேன் மேக்ரோக்கள் மற்றும் அவர்களின் நண்பர்கள் - மாறிகள்.
மேக்ரோக்கள் ஜின்ஜா பிளேஸ்ஹோல்டர்கள் ஆகும், அவை பல்வேறு பயனுள்ள தகவல்களை ஆபரேட்டர் வாதங்களில் மாற்றும். உதாரணமாக, இது போன்றது:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} சூழல் மாறியின் உள்ளடக்கங்களுக்கு விரிவடையும் execution_date வடிவமைப்பில் YYYY-MM-DD: 2020-07-14. சிறந்த அம்சம் என்னவென்றால், சூழல் மாறிகள் ஒரு குறிப்பிட்ட பணி நிகழ்வில் (மரக் காட்சியில் ஒரு சதுரம்) இணைக்கப்படுகின்றன, மேலும் மறுதொடக்கம் செய்யும்போது, ஒதுக்கிடங்கள் அதே மதிப்புகளுக்கு விரிவடையும்.
ஒவ்வொரு பணி நிகழ்விலும் ரெண்டர் செய்யப்பட்ட பொத்தானைப் பயன்படுத்தி ஒதுக்கப்பட்ட மதிப்புகளைப் பார்க்கலாம். ஒரு கடிதத்தை அனுப்பும் பணி இப்படித்தான்:

எனவே ஒரு செய்தியை அனுப்பும் பணியில்:

சமீபத்திய கிடைக்கக்கூடிய பதிப்பிற்கான உள்ளமைக்கப்பட்ட மேக்ரோக்களின் முழுமையான பட்டியல் இங்கே கிடைக்கிறது:
மேலும், செருகுநிரல்களின் உதவியுடன், எங்கள் சொந்த மேக்ரோக்களை அறிவிக்க முடியும், ஆனால் அது மற்றொரு கதை.
முன் வரையறுக்கப்பட்ட விஷயங்களுக்கு கூடுதலாக, நமது மாறிகளின் மதிப்புகளை மாற்றலாம் (மேலே உள்ள குறியீட்டில் இதை நான் ஏற்கனவே பயன்படுத்தினேன்). உள்ளே உருவாக்குவோம் Admin/Variables ஓரிரு விஷயங்கள்:

நீங்கள் பயன்படுத்தக்கூடிய அனைத்தும்:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')மதிப்பு ஒரு அளவுகோலாக இருக்கலாம் அல்லது JSON ஆகவும் இருக்கலாம். JSON விஷயத்தில்:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}விரும்பிய விசைக்கான பாதையைப் பயன்படுத்தவும்: {{ var.json.bot_config.bot.token }}.
நான் ஒரு வார்த்தை சொல்லிவிட்டு ஒரு ஸ்கிரீன்ஷாட்டைக் காட்டுவேன் соединения. எல்லாம் இங்கே அடிப்படை: பக்கத்தில் Admin/Connections நாங்கள் ஒரு இணைப்பை உருவாக்குகிறோம், எங்கள் உள்நுழைவுகள் / கடவுச்சொற்கள் மற்றும் மேலும் குறிப்பிட்ட அளவுருக்களை அங்கு சேர்க்கிறோம். இது போன்ற:

கடவுச்சொற்கள் குறியாக்கம் செய்யப்படலாம் (இயல்புநிலையை விட முழுமையாக), அல்லது நீங்கள் இணைப்பு வகையை விட்டுவிடலாம் (நான் செய்தது போல் tg_main) - உண்மை என்னவென்றால், வகைகளின் பட்டியல் ஏர்ஃப்ளோ மாடல்களில் கடினமாக உள்ளது மற்றும் மூலக் குறியீடுகளுக்குள் வராமல் விரிவாக்க முடியாது (திடீரென்று நான் எதையாவது கூகிள் செய்யவில்லை என்றால், தயவுசெய்து என்னைத் திருத்தவும்), ஆனால் எதுவும் கிரெடிட்களைப் பெறுவதைத் தடுக்காது. பெயர்.
நீங்கள் ஒரே பெயரில் பல இணைப்புகளை உருவாக்கலாம்: இந்த விஷயத்தில், முறை BaseHook.get_connection(), இது எங்களுக்கு பெயர் மூலம் இணைப்புகளை பெறுகிறது, கொடுக்கும் சீரற்ற பல பெயர்களில் இருந்து (ரவுண்ட் ராபினை உருவாக்குவது மிகவும் தர்க்கரீதியானதாக இருக்கும், ஆனால் அதை ஏர்ஃப்ளோ டெவலப்பர்களின் மனசாட்சியில் விட்டுவிடுவோம்).
மாறிகள் மற்றும் இணைப்புகள் நிச்சயமாக சிறந்த கருவிகள், ஆனால் சமநிலையை இழக்காமல் இருப்பது முக்கியம்: உங்கள் பாய்வின் எந்தப் பகுதிகளை நீங்கள் குறியீட்டிலேயே சேமித்து வைத்திருக்கிறீர்கள், மற்றும் சேமிப்பிற்காக ஏர்ஃப்ளோவுக்கு எந்தப் பகுதிகளை வழங்குகிறீர்கள். ஒருபுறம், மதிப்பை விரைவாக மாற்றுவது வசதியாக இருக்கும், எடுத்துக்காட்டாக, UI மூலம் அஞ்சல் பெட்டி. மறுபுறம், இது இன்னும் மவுஸ் கிளிக்கிற்கு திரும்பும், அதில் இருந்து நாங்கள் (நான்) விடுபட விரும்பினோம்.
இணைப்புகளுடன் பணிபுரிவது பணிகளில் ஒன்றாகும் கொக்கிகள். பொதுவாக, ஏர்ஃப்ளோ ஹூக்குகள் மூன்றாம் தரப்பு சேவைகள் மற்றும் நூலகங்களுடன் இணைக்கும் புள்ளிகள். எ.கா. JiraHook ஜிராவுடன் தொடர்புகொள்வதற்கு ஒரு கிளையண்டைத் திறக்கும் (நீங்கள் பணிகளை முன்னும் பின்னுமாக நகர்த்தலாம்), மேலும் SambaHook நீங்கள் ஒரு உள்ளூர் கோப்பை அழுத்தலாம் smb-புள்ளி.
தனிப்பயன் ஆபரேட்டரை பாகுபடுத்துகிறது
அது எப்படி தயாரிக்கப்படுகிறது என்று பார்க்க நெருங்கிவிட்டோம் TelegramBotSendMessage
குறியீடு commons/operators.py உண்மையான ஆபரேட்டருடன்:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)இங்கே, காற்றோட்டத்தில் உள்ள எல்லாவற்றையும் போலவே, எல்லாம் மிகவும் எளிமையானது:
- மரபுரிமையாக இருந்து வந்தது
BaseOperator, இது சில காற்றோட்டம் சார்ந்த விஷயங்களைச் செயல்படுத்துகிறது (உங்கள் ஓய்வு நேரத்தைப் பாருங்கள்) - அறிவிக்கப்பட்ட புலங்கள்
template_fields, இதில் ஜின்ஜா செயலாக்க மேக்ரோக்களை தேடும். - சரியான வாதங்களை ஏற்பாடு செய்தார்
__init__(), தேவையான இடங்களில் இயல்புநிலைகளை அமைக்கவும். - மூதாதையரின் துவக்கத்தையும் நாங்கள் மறந்துவிடவில்லை.
- தொடர்புடைய கொக்கியைத் திறந்தார்
TelegramBotHookஅதிலிருந்து ஒரு வாடிக்கையாளர் பொருளைப் பெற்றார். - மேலெழுதப்பட்ட (மறுவரையறை) முறை
BaseOperator.execute(), ஆபரேட்டரைத் தொடங்குவதற்கான நேரம் வரும்போது எந்த ஏர்ஃபோவ் இழுக்கும் - அதில் உள்நுழைய மறந்துவிட்டு முக்கிய செயலைச் செயல்படுத்துவோம். (நாங்கள் உள்நுழைகிறோம்stdoutиstderr- காற்றோட்டம் எல்லாவற்றையும் இடைமறித்து, அழகாக போர்த்தி, தேவையான இடங்களில் சிதைக்கும்.)
நம்மிடம் இருப்பதைப் பார்ப்போம் commons/hooks.py. கோப்பின் முதல் பகுதி, ஹூக்குடன்:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientஇங்கே என்ன விளக்குவது என்று எனக்குத் தெரியவில்லை, முக்கியமான விஷயங்களை நான் கவனிக்கிறேன்:
- நாங்கள் மரபுரிமை பெறுகிறோம், வாதங்களைப் பற்றி சிந்தியுங்கள் - பெரும்பாலான சந்தர்ப்பங்களில் இது ஒன்றாக இருக்கும்:
conn_id; - நிலையான முறைகளை மீறுதல்: நானே வரம்பிடினேன்
get_conn(), இதில் நான் இணைப்பு அளவுருக்களை பெயரால் பெறுகிறேன் மற்றும் பிரிவைப் பெறுகிறேன்extra(இது ஒரு JSON புலம்), இதில் நான் (எனது சொந்த அறிவுறுத்தல்களின்படி!) டெலிகிராம் போட் டோக்கனை வைத்தேன்:{"bot_token": "YOuRAwEsomeBOtToKen"}. - நான் எங்களின் உதாரணத்தை உருவாக்குகிறேன்
TelegramBot, அதற்கு ஒரு குறிப்பிட்ட டோக்கன் கொடுக்கிறது.
அவ்வளவுதான். நீங்கள் பயன்படுத்தி ஒரு கொக்கி இருந்து ஒரு வாடிக்கையாளர் பெற முடியும் TelegramBotHook().clent அல்லது TelegramBotHook().get_conn().
கோப்பின் இரண்டாவது பகுதி, அதில் டெலிகிராம் ரெஸ்ட் ஏபிஐக்கு மைக்ரோ ரேப்பரை உருவாக்குகிறேன், அதை இழுக்கக்கூடாது ஒரு முறைக்கு sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))எல்லாவற்றையும் சேர்ப்பதே சரியான வழி:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- செருகுநிரலில், ஒரு பொது களஞ்சியத்தில் வைத்து, அதை திறந்த மூலத்திற்கு கொடுங்கள்.
இதையெல்லாம் நாங்கள் படித்துக்கொண்டிருந்தபோது, எங்கள் அறிக்கை புதுப்பிப்புகள் வெற்றிகரமாக தோல்வியடைந்து சேனலில் எனக்கு ஒரு பிழைச் செய்தியை அனுப்பியது. தவறா என்று பார்க்கிறேன்...

எங்கள் நாய்களில் ஏதோ உடைந்தது! நாம் எதிர்பார்த்தது அல்லவா? சரியாக!
ஊற்றப் போகிறாயா?
நான் எதையாவது தவறவிட்டதாக உணர்கிறீர்களா? SQL சர்வரில் இருந்து வெர்டிகாவிற்கு தரவை மாற்றுவதாக அவர் உறுதியளித்தார் என்று தெரிகிறது, பின்னர் அவர் அதை எடுத்து தலைப்பை நகர்த்தினார், அயோக்கியன்!
இந்த அட்டூழியமானது வேண்டுமென்றே செய்யப்பட்டது, உங்களுக்காக சில சொற்களை நான் புரிந்து கொள்ள வேண்டியிருந்தது. இப்போது நீங்கள் மேலும் செல்லலாம்.
எங்கள் திட்டம் இதுதான்:
- டாக் செய்யுங்கள்
- பணிகளை உருவாக்குங்கள்
- எல்லாம் எவ்வளவு அழகாக இருக்கிறது என்று பாருங்கள்
- நிரப்புவதற்கு அமர்வு எண்களை ஒதுக்கவும்
- SQL சேவையகத்திலிருந்து தரவைப் பெறுங்கள்
- தரவை வெர்டிகாவில் வைக்கவும்
- புள்ளிவிவரங்களை சேகரிக்கவும்
எனவே, இவை அனைத்தையும் இயக்க, நான் எங்களிடம் ஒரு சிறிய சேர்த்தல் செய்தேன் docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyஅங்கு நாம் எழுப்புகிறோம்:
- தொகுப்பாளராக வெர்டிகா
dwhமிகவும் இயல்புநிலை அமைப்புகளுடன், - SQL சேவையகத்தின் மூன்று நிகழ்வுகள்,
- பிந்தைய தரவுத்தளங்களை சில தரவுகளுடன் நிரப்புகிறோம் (எந்த விஷயத்திலும் பார்க்க வேண்டாம்
mssql_init.py!)
கடந்த காலத்தை விட சற்று சிக்கலான கட்டளையின் உதவியுடன் அனைத்து நன்மைகளையும் நாங்கள் தொடங்குகிறோம்:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3எங்கள் அதிசய ரேண்டமைசர் என்ன உருவாக்கியது, நீங்கள் உருப்படியைப் பயன்படுத்தலாம் Data Profiling/Ad Hoc Query:

முக்கிய விஷயம் அதை ஆய்வாளர்களுக்குக் காட்டக்கூடாது
விளக்கமாய் கூறுக ETL அமர்வுகள் நான் செய்ய மாட்டேன், எல்லாம் அங்கு அற்பமானது: நாங்கள் ஒரு தளத்தை உருவாக்குகிறோம், அதில் ஒரு அடையாளம் உள்ளது, எல்லாவற்றையும் ஒரு சூழல் மேலாளருடன் போர்த்திவிடுகிறோம், இப்போது இதைச் செய்கிறோம்:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15அமர்வு.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passநேரம் வந்துவிட்டது எங்கள் தரவு சேகரிக்க எங்கள் ஒன்றரை நூறு மேசைகளிலிருந்து. மிகவும் எளிமையான வரிகளின் உதவியுடன் இதைச் செய்வோம்:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- ஒரு கொக்கி உதவியுடன் நாம் காற்றோட்டத்திலிருந்து பெறுகிறோம்
pymssql- இணைக்கவும் - கோரிக்கையில் தேதி வடிவத்தில் ஒரு கட்டுப்பாட்டை மாற்றுவோம் - இது டெம்ப்ளேட் இயந்திரத்தால் செயல்பாட்டில் வீசப்படும்.
- எங்கள் கோரிக்கைக்கு உணவளிக்கிறது
pandasயார் நம்மைப் பெறுவார்கள்DataFrame- இது எதிர்காலத்தில் நமக்கு பயனுள்ளதாக இருக்கும்.
நான் மாற்றீட்டைப் பயன்படுத்துகிறேன்
{dt}கோரிக்கை அளவுருவிற்கு பதிலாக%sநான் ஒரு தீய பினோச்சியோ என்பதனால் அல்ல, ஏனென்றால்pandasகையாள முடியாதுpymssqlமற்றும் கடைசியாக நழுவுகிறதுparams: Listஅவர் உண்மையில் விரும்புகிறார் என்றாலும்tuple.
டெவலப்பர் என்பதையும் கவனத்தில் கொள்ளவும்pymssqlஅவரை இனி ஆதரிக்க வேண்டாம் என்று முடிவு செய்து, வெளியேற வேண்டிய நேரம் இதுpyodbc.
எங்கள் செயல்பாடுகளின் வாதங்களை ஏர்ஃப்ளோ எதைக் கொண்டு அடைத்தது என்பதைப் பார்ப்போம்:

தரவு இல்லை என்றால், தொடர்வதில் எந்த அர்த்தமும் இல்லை. ஆனால் நிரப்புதல் வெற்றிகரமாக இருப்பதாகக் கருதுவதும் விசித்திரமானது. ஆனால் இது தவறல்ல. ஆ-ஆ, என்ன செய்வது?! மற்றும் இங்கே என்ன:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ஏர்ஃப்ளோவிடம் பிழைகள் இல்லை என்று கூறுகிறது, ஆனால் நாங்கள் பணியைத் தவிர்க்கிறோம். இடைமுகத்தில் பச்சை அல்லது சிவப்பு சதுரம் இருக்காது, ஆனால் இளஞ்சிவப்பு.
எங்கள் தரவை தூக்கி எறிவோம் பல நெடுவரிசைகள்:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])அவை பின்வருமாறு:
- நாங்கள் ஆர்டர்களை எடுத்த தரவுத்தளத்தில்,
- எங்களின் வெள்ளப்பெருக்கு அமர்வின் ஐடி (இது வித்தியாசமாக இருக்கும் ஒவ்வொரு பணிக்கும்),
- மூல மற்றும் ஆர்டர் ஐடியிலிருந்து ஒரு ஹாஷ் - இறுதி தரவுத்தளத்தில் (எல்லாவற்றையும் ஒரு அட்டவணையில் ஊற்றினால்) எங்களிடம் ஒரு தனித்துவமான ஆர்டர் ஐடி உள்ளது.
இறுதி கட்டம் உள்ளது: எல்லாவற்றையும் வெர்டிகாவில் ஊற்றவும். மேலும், விந்தை போதும், இதைச் செய்வதற்கான மிக அற்புதமான மற்றும் திறமையான வழிகளில் ஒன்று CSV ஆகும்!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- நாங்கள் ஒரு சிறப்பு ரிசீவரை உருவாக்குகிறோம்
StringIO. pandasதயவுசெய்து எங்கள் வைப்பார்கள்DataFrameவடிவத்தில்CSV-கோடுகள்.- கொக்கி மூலம் நமக்குப் பிடித்த வெர்டிகாவுடன் இணைப்பைத் திறப்போம்.
- இப்போது உதவியுடன்
copy()எங்கள் தரவை நேரடியாக வெர்டிகாவிற்கு அனுப்புங்கள்!
எத்தனை வரிகள் நிரப்பப்பட்டன என்பதை டிரைவரிடமிருந்து எடுத்து, எல்லாம் சரி என்று அமர்வு மேலாளரிடம் கூறுவோம்:
session.loaded_rows = cursor.rowcount
session.successful = Trueஅவ்வளவுதான்.
விற்பனையில், இலக்குத் தகட்டை கைமுறையாக உருவாக்குகிறோம். இங்கே நான் ஒரு சிறிய இயந்திரத்தை அனுமதித்தேன்:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)நான் பயன்படுத்தி வருகிறேன்
VerticaOperator()நான் ஒரு தரவுத்தள ஸ்கீமா மற்றும் அட்டவணையை உருவாக்குகிறேன் (அவை ஏற்கனவே இல்லை என்றால், நிச்சயமாக). முக்கிய விஷயம் சார்புகளை சரியாக ஏற்பாடு செய்வது:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadசுருக்கமாக
- சரி, - சிறிய சுட்டி, - அது இல்லை, இப்போது
நான் காட்டில் மிகவும் பயங்கரமான விலங்கு என்று நீங்கள் உறுதியாக நம்புகிறீர்களா?
ஜூலியா டொனால்ட்சன், தி க்ரூஃபாலோ
எனக்கும் எனது சகாக்களுக்கும் ஒரு போட்டி இருந்தால்: புதிதாக ஒரு ETL செயல்முறையை யார் விரைவாக உருவாக்குவார்கள் மற்றும் தொடங்குவார்கள் என்று நான் நினைக்கிறேன்: அவர்கள் தங்கள் SSIS மற்றும் ஒரு மவுஸ் மற்றும் நான் காற்றோட்டத்துடன் ... பின்னர் நாங்கள் பராமரிப்பின் எளிமையையும் ஒப்பிடுவோம் ... ஆஹா, நான் அவர்களை எல்லா முனைகளிலும் வெல்வேன் என்பதை நீங்கள் ஒப்புக்கொள்வீர்கள் என்று நினைக்கிறேன்!
இன்னும் கொஞ்சம் தீவிரமாக இருந்தால், அப்பாச்சி ஏர்ஃப்ளோ - நிரல் குறியீட்டின் வடிவத்தில் செயல்முறைகளை விவரிப்பதன் மூலம் - எனது வேலையைச் செய்தேன். மேலும் மிகவும் வசதியான மற்றும் மகிழ்ச்சியான.
அதன் வரம்பற்ற நீட்டிப்பு, செருகுநிரல்கள் மற்றும் அளவிடுதலுக்கான முன்கணிப்பு ஆகிய இரண்டிலும், ஏறக்குறைய எந்தப் பகுதியிலும் காற்றோட்டத்தைப் பயன்படுத்துவதற்கான வாய்ப்பை உங்களுக்கு வழங்குகிறது: தரவுகளைச் சேகரித்தல், தயாரித்தல் மற்றும் செயலாக்குதல் ஆகியவற்றின் முழு சுழற்சியிலும், ராக்கெட்டுகளை ஏவுவதில் கூட (செவ்வாய், நிச்சயமாக).
பகுதி இறுதி, குறிப்பு மற்றும் தகவல்
உங்களுக்காக நாங்கள் சேகரித்த ரேக்
start_date. ஆம், இது ஏற்கனவே உள்ளூர் நினைவு. டக்கின் முக்கிய வாதம் வழியாகstart_dateஅனைத்து பாஸ். சுருக்கமாக, நீங்கள் குறிப்பிட்டால்start_dateதற்போதைய தேதி, மற்றும்schedule_interval- ஒரு நாள், பின்னர் டிஏஜி நாளை தொடங்கும்.start_date = datetime(2020, 7, 7, 0, 1, 2)மேலும் எந்த பிரச்சனையும் இல்லை.
அதனுடன் தொடர்புடைய மற்றொரு இயக்க நேரப் பிழை உள்ளது:
Task is missing the start_date parameter, இது பெரும்பாலும் நீங்கள் டாக் ஆபரேட்டருடன் பிணைக்க மறந்துவிட்டீர்கள் என்பதைக் குறிக்கிறது.- அனைத்தும் ஒரே இயந்திரத்தில். ஆம், மற்றும் தளங்கள் (காற்றோட்டம் மற்றும் எங்கள் பூச்சு), மற்றும் ஒரு வலை சேவையகம், மற்றும் ஒரு திட்டமிடுபவர் மற்றும் தொழிலாளர்கள். அது கூட வேலை செய்தது. ஆனால் காலப்போக்கில், சேவைகளுக்கான பணிகளின் எண்ணிக்கை அதிகரித்தது, மேலும் PostgreSQL குறியீட்டிற்கு 20 msக்குப் பதிலாக 5 வினாடிகளில் பதிலளிக்கத் தொடங்கியபோது, நாங்கள் அதை எடுத்துச் சென்றோம்.
- உள்ளூர் நிர்வாகி. ஆம், நாங்கள் இன்னும் அதில் அமர்ந்திருக்கிறோம், நாங்கள் ஏற்கனவே பள்ளத்தின் விளிம்பிற்கு வந்துவிட்டோம். LocalExecutor இதுவரை எங்களுக்கு போதுமானதாக உள்ளது, ஆனால் இப்போது குறைந்தபட்சம் ஒரு தொழிலாளியுடன் விரிவாக்க வேண்டிய நேரம் வந்துவிட்டது, மேலும் CeleryExecutor க்கு செல்ல நாங்கள் கடினமாக உழைக்க வேண்டும். நீங்கள் ஒரு கணினியில் அதனுடன் வேலை செய்ய முடியும் என்ற உண்மையைக் கருத்தில் கொண்டு, ஒரு சேவையகத்தில் கூட செலரியைப் பயன்படுத்துவதை எதுவும் தடுக்காது, இது "நிச்சயமாக, நேர்மையாக உற்பத்திக்கு செல்லாது!"
- பயன்படுத்தாதது உள்ளமைக்கப்பட்ட கருவிகள்:
- இணைப்புகள் சேவை சான்றுகளை சேமிக்க,
- SLA மிஸ்கள் சரியான நேரத்தில் வேலை செய்யாத பணிகளுக்கு பதிலளிக்க,
- xcom மெட்டாடேட்டா பரிமாற்றத்திற்காக (நான் சொன்னேன் மெட்டாதரவு!) டாக் பணிகளுக்கு இடையில்.
- அஞ்சல் முறைகேடு. சரி, நான் என்ன சொல்ல முடியும்? விழுந்த பணிகளின் அனைத்து மறுநிகழ்வுகளுக்கும் விழிப்பூட்டல்கள் அமைக்கப்பட்டன. இப்போது எனது பணியான ஜிமெயிலில் ஏர்ஃப்ளோவில் இருந்து 90k மின்னஞ்சல்கள் உள்ளன, மேலும் வலை அஞ்சல் முகவாய் ஒரே நேரத்தில் 100க்கும் மேற்பட்டவற்றை எடுக்கவும் நீக்கவும் மறுக்கிறது.
மேலும் ஆபத்துகள்:
மேலும் ஆட்டோமேஷன் கருவிகள்
நம் கைகளால் அல்லாமல் நம் தலையால் இன்னும் அதிகமாக வேலை செய்ய, ஏர்ஃப்ளோ எங்களுக்காக இதை தயார் செய்துள்ளது:
- - அவருக்கு இன்னும் பரிசோதனை அந்தஸ்து உள்ளது, இது அவரை வேலை செய்வதைத் தடுக்காது. இதன் மூலம், நீங்கள் டேக்ஸ் மற்றும் பணிகளைப் பற்றிய தகவல்களைப் பெறுவது மட்டுமல்லாமல், ஒரு டாக்கை நிறுத்தவும் / தொடங்கவும், ஒரு DAG ரன் அல்லது ஒரு குளத்தை உருவாக்கவும்.
- - பல கருவிகள் கட்டளை வரியின் மூலம் கிடைக்கின்றன, அவை WebUI மூலம் பயன்படுத்த சிரமமாக இல்லை, ஆனால் பொதுவாக இல்லை. உதாரணத்திற்கு:
backfillபணி நிகழ்வுகளை மறுதொடக்கம் செய்ய வேண்டும்.
உதாரணமாக, ஆய்வாளர்கள் வந்து சொன்னார்கள்: “தோழரே, ஜனவரி 1 முதல் 13 வரையிலான தரவுகளில் உங்களுக்கு முட்டாள்தனம் இருக்கிறது! சரி, சரி, சரி, சரி, சரி!" நீங்கள் ஒரு ஹாப்:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- அடிப்படை சேவை:
initdb,resetdb,upgradedb,checkdb. run, இது ஒரு நிகழ்வு பணியை இயக்க உங்களை அனுமதிக்கிறது, மேலும் அனைத்து சார்புகளிலும் கூட மதிப்பெண் பெறலாம். மேலும், நீங்கள் அதை இயக்கலாம்LocalExecutor, நீங்கள் செலரி கிளஸ்டர் வைத்திருந்தாலும் கூட.- கிட்டத்தட்ட அதையே செய்கிறது
test, அடிப்படையிலும் மட்டும் எதுவும் எழுதுவதில்லை. connectionsஷெல்லில் இருந்து இணைப்புகளை பெருமளவில் உருவாக்க அனுமதிக்கிறது.
- - தொடர்புகொள்வதற்கான ஒரு கடினமான வழி, இது செருகுநிரல்களுக்காக வடிவமைக்கப்பட்டுள்ளது, மேலும் சிறிய கைகளால் அதில் திரள்வது அல்ல. ஆனால் எங்களை போகவிடாமல் தடுப்பது யார்
/home/airflow/dagsஓடுipythonமற்றும் குழப்பத்தை தொடங்கவா? எடுத்துக்காட்டாக, பின்வரும் குறியீட்டைக் கொண்டு அனைத்து இணைப்புகளையும் ஏற்றுமதி செய்யலாம்:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - ஏர்ஃப்ளோ மெட்டாடேட்டாபேஸுடன் இணைக்கிறது. இதற்கு எழுதுவதை நான் பரிந்துரைக்கவில்லை, ஆனால் பல்வேறு குறிப்பிட்ட அளவீடுகளுக்கான பணி நிலைகளைப் பெறுவது எந்த API களைப் பயன்படுத்துவதை விடவும் மிக வேகமாகவும் எளிதாகவும் இருக்கும்.
நமது பணிகள் அனைத்தும் செயலற்றவை அல்ல, ஆனால் அவை சில சமயங்களில் வீழ்ச்சியடையலாம், இது சாதாரணமானது என்று சொல்லலாம். ஆனால் ஒரு சில தடைகள் ஏற்கனவே சந்தேகத்திற்குரியவை, மேலும் சரிபார்க்க வேண்டியது அவசியம்.
ஜாக்கிரதை SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
குறிப்புகள்
நிச்சயமாக, Google வழங்கும் முதல் பத்து இணைப்புகள் எனது புக்மார்க்குகளில் இருந்து Airflow கோப்புறையின் உள்ளடக்கங்களாகும்.
- - நிச்சயமாக, நாம் அலுவலகத்திலிருந்து தொடங்க வேண்டும். ஆவணங்கள், ஆனால் வழிமுறைகளை யார் படிக்கிறார்கள்?
- - சரி, குறைந்தபட்சம் படைப்பாளர்களின் பரிந்துரைகளைப் படிக்கவும்.
- - ஆரம்பம்: படங்களில் பயனர் இடைமுகம்
- - (திடீரென்று!) நீங்கள் என்னிடமிருந்து ஏதாவது புரிந்து கொள்ளவில்லை என்றால், அடிப்படைக் கருத்துக்கள் நன்கு விவரிக்கப்பட்டுள்ளன.
- - ஏர்ஃப்ளோ கிளஸ்டரை அமைப்பதற்கான ஒரு குறுகிய வழிகாட்டி.
- - கிட்டத்தட்ட அதே சுவாரஸ்யமான கட்டுரை, ஒருவேளை அதிக சம்பிரதாயம் மற்றும் குறைவான எடுத்துக்காட்டுகளைத் தவிர.
- - செலரியுடன் இணைந்து பணியாற்றுவது பற்றி.
- - பணிகளின் திறமையின்மை, தேதிக்கு பதிலாக ஐடி மூலம் ஏற்றுதல், மாற்றம், கோப்பு அமைப்பு மற்றும் பிற சுவாரஸ்யமான விஷயங்கள்.
- - பணிகளின் சார்புகள் மற்றும் தூண்டுதல் விதி, நான் கடந்து செல்லும் போது மட்டுமே குறிப்பிட்டேன்.
- - ஷெட்யூலரில் சில "உத்தேசிக்கப்பட்ட வேலைகளை" எப்படி சமாளிப்பது, இழந்த தரவை ஏற்றுவது மற்றும் பணிகளுக்கு முன்னுரிமை அளிப்பது எப்படி.
- - ஏர்ஃப்ளோ மெட்டாடேட்டாவுக்கு பயனுள்ள SQL வினவல்கள்.
- - தனிப்பயன் சென்சார் உருவாக்குவது பற்றி பயனுள்ள பகுதி உள்ளது.
- — தரவு அறிவியலுக்கான AWS இல் உள்கட்டமைப்பை உருவாக்குவது பற்றிய ஒரு சுவாரஸ்யமான சிறு குறிப்பு.
- - பொதுவான தவறுகள் (யாராவது இன்னும் வழிமுறைகளைப் படிக்காதபோது).
- - நீங்கள் இணைப்புகளைப் பயன்படுத்தினாலும், கடவுச்சொற்களை மக்கள் எப்படி ஊன்றுகோலாகச் சேமிப்பார்கள் என்று புன்னகைக்கவும்.
- - மறைமுகமான DAG பகிர்தல், செயல்பாடுகளில் சூழல் வீசுதல், மீண்டும் சார்புகள் மற்றும் பணி துவக்கங்களைத் தவிர்ப்பது.
- - பயன்பாடு பற்றி
default argumentsиparamsவார்ப்புருக்கள், அத்துடன் மாறிகள் மற்றும் இணைப்புகளில். - - ஏர்ஃப்ளோ 2.0க்கு திட்டமிடுபவர் எவ்வாறு தயாராகிறார் என்பது பற்றிய கதை.
- - எங்கள் கிளஸ்டரை வரிசைப்படுத்துவது பற்றிய சற்று காலாவதியான கட்டுரை
docker-compose. - - வார்ப்புருக்கள் மற்றும் சூழல் பகிர்தல் ஆகியவற்றைப் பயன்படுத்தி மாறும் பணிகள்.
- - அஞ்சல் மற்றும் ஸ்லாக் மூலம் நிலையான மற்றும் தனிப்பயன் அறிவிப்புகள்.
- - கிளை பணிகள், மேக்ரோக்கள் மற்றும் XCom.
மற்றும் கட்டுரையில் பயன்படுத்தப்படும் இணைப்புகள்:
- - வார்ப்புருக்களில் பயன்படுத்துவதற்கு ஒதுக்கிடங்கள் உள்ளன.
- - டாக்ஸை உருவாக்கும் போது பொதுவான தவறுகள்.
- -
docker-composeபரிசோதனை, பிழைத்திருத்தம் மற்றும் பலவற்றிற்கு. - - டெலிகிராம் ரெஸ்ட் ஏபிஐக்கான பைதான் ரேப்பர்.
ஆதாரம்: www.habr.com




