హాయ్, నేను డిమిత్రి లాగ్వినెంకో - వెజెట్ గ్రూప్ ఆఫ్ కంపెనీల అనలిటిక్స్ విభాగానికి చెందిన డేటా ఇంజనీర్.
ETL ప్రక్రియలను అభివృద్ధి చేయడానికి అద్భుతమైన సాధనం గురించి నేను మీకు చెప్తాను - Apache Airflow. కానీ ఎయిర్ఫ్లో చాలా బహుముఖంగా మరియు బహుముఖంగా ఉంటుంది, మీరు డేటా ఫ్లోలలో పాల్గొనకపోయినా, మీరు దానిని నిశితంగా పరిశీలించాలి, అయితే క్రమానుగతంగా ఏదైనా ప్రక్రియలను ప్రారంభించడం మరియు వాటి అమలును పర్యవేక్షించడం అవసరం.
మరియు అవును, నేను చెప్పడమే కాదు, చూపిస్తాను: ప్రోగ్రామ్లో చాలా కోడ్, స్క్రీన్షాట్లు మరియు సిఫార్సులు ఉన్నాయి.
మీరు ఎయిర్ఫ్లో / వికీమీడియా కామన్స్ అనే పదాన్ని గూగుల్ చేసినప్పుడు మీరు సాధారణంగా చూసేది
- మాత్రమే మంచిది, మరియు ఇది పూర్తిగా భిన్నమైన ప్రయోజనాల కోసం తయారు చేయబడింది, అవి (కాట్ ముందు వ్రాసినట్లు):
అపరిమిత సంఖ్యలో మెషీన్లలో పనులను అమలు చేయడం మరియు పర్యవేక్షించడం (అనేక సెలెరీ / కుబెర్నెట్స్ మరియు మీ మనస్సాక్షి మిమ్మల్ని అనుమతించే విధంగా)
పైథాన్ కోడ్ను వ్రాయడం మరియు అర్థం చేసుకోవడం చాలా సులభం నుండి డైనమిక్ వర్క్ఫ్లో జనరేషన్తో
మరియు రెడీమేడ్ భాగాలు మరియు హోమ్-మేడ్ ప్లగిన్లు (ఇది చాలా సులభం) రెండింటినీ ఉపయోగించి ఏదైనా డేటాబేస్లు మరియు APIలను ఒకదానితో ఒకటి కనెక్ట్ చేయగల సామర్థ్యం.
మేము అపాచీ ఎయిర్ఫ్లోను ఇలా ఉపయోగిస్తాము:
మేము DWH మరియు ODSలో (మాకు వెర్టికా మరియు క్లిక్హౌస్ ఉన్నాయి) వివిధ మూలాధారాల (అనేక SQL సర్వర్ మరియు PostgreSQL ఉదంతాలు, అప్లికేషన్ మెట్రిక్లతో కూడిన వివిధ APIలు, 1C కూడా) నుండి డేటాను సేకరిస్తాము.
ఎంత అభివృద్ధి చెందింది cron, ఇది ODSలో డేటా కన్సాలిడేషన్ ప్రక్రియలను ప్రారంభిస్తుంది మరియు వాటి నిర్వహణను కూడా పర్యవేక్షిస్తుంది.
ఇటీవలి వరకు, మా అవసరాలు 32 కోర్లు మరియు 50 GB RAMతో ఒక చిన్న సర్వర్ ద్వారా కవర్ చేయబడ్డాయి. గాలి ప్రవాహంలో, ఇది పనిచేస్తుంది:
మరింత 200 డాగ్స్ (వాస్తవానికి వర్క్ఫ్లోలు, దీనిలో మేము టాస్క్లను నింపాము)
ప్రతిదానిలో సగటున 70 పనులు,
ఈ మంచితనం మొదలవుతుంది (సగటున కూడా) గంటకు ఒకసారి.
మరియు మేము ఎలా విస్తరించాము అనే దాని గురించి, నేను క్రింద వ్రాస్తాను, కానీ ఇప్పుడు మనం పరిష్కరించే ఉబెర్-సమస్యను నిర్వచిద్దాం:
మూడు మూలాధార SQL సర్వర్లు ఉన్నాయి, ఒక్కొక్కటి 50 డేటాబేస్లతో ఉంటాయి - వరుసగా ఒక ప్రాజెక్ట్కి సంబంధించిన సందర్భాలు, అవి ఒకే నిర్మాణాన్ని కలిగి ఉంటాయి (దాదాపు ప్రతిచోటా, మువా-హ-హ), అంటే ప్రతి ఒక్కటి ఆర్డర్ల పట్టికను కలిగి ఉంటుంది (అదృష్టవశాత్తూ, దానితో ఒక టేబుల్ పేరు ఏదైనా వ్యాపారంలోకి నెట్టవచ్చు). మేము సేవా ఫీల్డ్లను (సోర్స్ సర్వర్, సోర్స్ డేటాబేస్, ETL టాస్క్ ID) జోడించడం ద్వారా డేటాను తీసుకుంటాము మరియు వాటిని అమాయకంగా వెర్టికాలో వేస్తాము.
లెట్ యొక్క వెళ్ళి!
ప్రధాన భాగం, ఆచరణాత్మక (మరియు కొద్దిగా సైద్ధాంతిక)
ఎందుకు మేము (మరియు మీరు)
చెట్లు పెద్దవిగా ఉన్నప్పుడు మరియు నేను సాదాసీదాగా ఉండేవాడిని SQL-స్కిక్ ఒక రష్యన్ రిటైల్లో, మేము మాకు అందుబాటులో ఉన్న రెండు సాధనాలను ఉపయోగించి ETL ప్రక్రియలను అకా డేటా ఫ్లోలను స్కామ్ చేసాము:
ఇన్ఫర్మేటికా పవర్ సెంటర్ - దాని స్వంత హార్డ్వేర్తో, దాని స్వంత వెర్షన్తో అత్యంత వ్యాప్తి చెందుతున్న వ్యవస్థ, అత్యంత ఉత్పాదకత. నేను దాని సామర్థ్యాలలో 1% నిషేధించడాన్ని దేవుడు ఉపయోగించాను. ఎందుకు? సరే, మొదటగా, ఈ ఇంటర్ఫేస్, 380ల నుండి ఎక్కడో మానసికంగా మనపై ఒత్తిడి తెచ్చింది. రెండవది, ఈ కాంట్రాప్షన్ చాలా ఫాన్సీ ప్రాసెస్లు, ఫ్యూరియస్ కాంపోనెంట్ రీయూజ్ మరియు ఇతర చాలా ముఖ్యమైన-ఎంటర్ప్రైజ్-ట్రిక్ల కోసం రూపొందించబడింది. సంవత్సరానికి ఎయిర్బస్ AXNUMX యొక్క వింగ్ వంటి దాని ఖరీదు గురించి, మేము ఏమీ చెప్పము.
జాగ్రత్త, స్క్రీన్షాట్ 30 ఏళ్లలోపు వారిని కొద్దిగా బాధపెడుతుంది
SQL సర్వర్ ఇంటిగ్రేషన్ సర్వర్ - మేము మా ఇంట్రా-ప్రాజెక్ట్ ఫ్లోలలో ఈ సహచరుడిని ఉపయోగించాము. బాగా, నిజానికి: మేము ఇప్పటికే SQL సర్వర్ని ఉపయోగిస్తున్నాము మరియు దాని ETL సాధనాలను ఉపయోగించకపోవడం ఏదో ఒకవిధంగా అసమంజసమైనది. దానిలోని ప్రతిదీ బాగుంది: ఇంటర్ఫేస్ రెండూ అందంగా ఉన్నాయి మరియు ప్రోగ్రెస్ రిపోర్ట్లు ఉన్నాయి ... కానీ దీనివల్ల మేము సాఫ్ట్వేర్ ఉత్పత్తులను ఇష్టపడతాము, ఓహ్, దీని కోసం కాదు. దానిని వెర్షన్ చేయండి dtsx (సేవ్లో షఫుల్ చేయబడిన నోడ్లతో XML అంటే) మనం చేయగలం, అయితే ప్రయోజనం ఏమిటి? వందలాది టేబుల్లను ఒక సర్వర్ నుండి మరొక సర్వర్కి లాగే టాస్క్ ప్యాకేజీని ఎలా తయారు చేయాలి? అవును, ఏ వంద, మీ చూపుడు వేలు ఇరవై ముక్కలు నుండి వస్తాయి, మౌస్ బటన్పై క్లిక్ చేయండి. కానీ ఇది ఖచ్చితంగా మరింత నాగరికంగా కనిపిస్తుంది:
మేము ఖచ్చితంగా మార్గాల కోసం వెతుకుతున్నాము. కేసు కూడా దాదాపు స్వీయ-వ్రాతపూర్వక SSIS ప్యాకేజీ జనరేటర్కి వచ్చింది ...
… ఆపై ఒక కొత్త ఉద్యోగం నాకు దొరికింది. మరియు అపాచీ ఎయిర్ఫ్లో నన్ను అధిగమించింది.
ETL ప్రక్రియ వివరణలు సాధారణ పైథాన్ కోడ్ అని తెలుసుకున్నప్పుడు, నేను ఆనందం కోసం నృత్యం చేయలేదు. డేటా స్ట్రీమ్లు ఈ విధంగా వెర్షన్ చేయబడ్డాయి మరియు విభిన్నంగా ఉంటాయి మరియు వందలాది డేటాబేస్ల నుండి ఒకే స్ట్రక్చర్తో టేబుల్లను ఒక టార్గెట్లోకి పోయడం ఒకటిన్నర లేదా రెండు 13 ”స్క్రీన్లలో పైథాన్ కోడ్కి సంబంధించిన అంశంగా మారింది.
క్లస్టర్ను అసెంబ్లింగ్ చేస్తోంది
మేము పూర్తిగా కిండర్ గార్టెన్ని ఏర్పాటు చేయము మరియు ఎయిర్ఫ్లోను ఇన్స్టాల్ చేయడం, మీరు ఎంచుకున్న డేటాబేస్, సెలెరీ మరియు డాక్స్లో వివరించిన ఇతర కేసుల వంటి పూర్తిగా స్పష్టమైన విషయాల గురించి ఇక్కడ మాట్లాడకూడదు.
మేము వెంటనే ప్రయోగాలు ప్రారంభించవచ్చు కాబట్టి, నేను గీసాను docker-compose.yml దీనిలో:
నిజానికి పెంచుదాం గాలి ప్రవాహం: షెడ్యూలర్, వెబ్సర్వర్. సెలెరీ పనులను పర్యవేక్షించడానికి ఫ్లవర్ కూడా అక్కడ తిరుగుతుంది (ఎందుకంటే ఇది ఇప్పటికే నెట్టబడింది apache/airflow:1.10.10-python3.7, కానీ మాకు అభ్యంతరం లేదు)
PostgreSQL, దీనిలో ఎయిర్ఫ్లో తన సేవా సమాచారాన్ని (షెడ్యూలర్ డేటా, ఎగ్జిక్యూషన్ స్టాటిస్టిక్స్, మొదలైనవి) వ్రాస్తుంది మరియు సెలెరీ పూర్తయిన పనులను గుర్తు చేస్తుంది;
Redis, ఇది సెలెరీకి టాస్క్ బ్రోకర్గా పనిచేస్తుంది;
సెలెరీ కార్మికుడు, ఇది పనుల ప్రత్యక్ష అమలులో నిమగ్నమై ఉంటుంది.
ఫోల్డర్కి ./dags మేము డాగ్ల వివరణతో మా ఫైల్లను జోడిస్తాము. అవి ఎగిరినప్పుడు తీయబడతాయి, కాబట్టి ప్రతి తుమ్ము తర్వాత మొత్తం స్టాక్ను మోసగించాల్సిన అవసరం లేదు.
కొన్ని ప్రదేశాలలో, ఉదాహరణలలోని కోడ్ పూర్తిగా చూపబడలేదు (వచనాన్ని చిందరవందర చేయకుండా), కానీ ఎక్కడో అది ప్రక్రియలో సవరించబడింది. పూర్తి వర్కింగ్ కోడ్ ఉదాహరణలు రిపోజిటరీలో చూడవచ్చు https://github.com/dm-logv/airflow-tutorial.
కూర్పు యొక్క అసెంబ్లీలో, నేను ఎక్కువగా బాగా తెలిసిన చిత్రంపై ఆధారపడి ఉన్నాను పుకెల్/డాకర్-ఎయిర్ ఫ్లో - తప్పకుండా తనిఖీ చేయండి. బహుశా మీ జీవితంలో ఇంకేమీ అవసరం లేదు.
అన్ని ఎయిర్ఫ్లో సెట్టింగ్లు మాత్రమే అందుబాటులో ఉంటాయి airflow.cfg, కానీ ఎన్విరాన్మెంట్ వేరియబుల్స్ (డెవలపర్లకు ధన్యవాదాలు) ద్వారా కూడా నేను దురుద్దేశపూర్వకంగా ప్రయోజనం పొందాను.
సహజంగానే, ఇది ఉత్పత్తికి సిద్ధంగా లేదు: నేను ఉద్దేశపూర్వకంగా కంటైనర్లపై హృదయ స్పందనలను ఉంచలేదు, నేను భద్రతతో బాధపడలేదు. కానీ నేను మా ప్రయోగాత్మకులకు తగిన కనీసము చేసాను.
గమనించండి:
డాగ్ ఫోల్డర్ తప్పనిసరిగా షెడ్యూలర్ మరియు కార్మికులు ఇద్దరికీ అందుబాటులో ఉండాలి.
ఇది అన్ని థర్డ్-పార్టీ లైబ్రరీలకు వర్తిస్తుంది - అవన్నీ తప్పనిసరిగా షెడ్యూలర్ మరియు వర్కర్లు ఉన్న మెషీన్లలో ఇన్స్టాల్ చేయబడాలి.
బాగా, ఇప్పుడు ఇది సులభం:
$ docker-compose up --scale worker=3
ప్రతిదీ పెరిగిన తర్వాత, మీరు వెబ్ ఇంటర్ఫేస్లను చూడవచ్చు:
ఈ "డాగ్స్"లో మీకు ఏమీ అర్థం కాకపోతే, ఇక్కడ ఒక చిన్న నిఘంటువు ఉంది:
షెడ్యూలర్ - ఎయిర్ఫ్లోలో అత్యంత ముఖ్యమైన అంకుల్, రోబోట్లు కష్టపడి పనిచేస్తాయని నియంత్రిస్తారు మరియు ఒక వ్యక్తి కాదు: షెడ్యూల్ను పర్యవేక్షిస్తుంది, డాగ్లను అప్డేట్ చేస్తుంది, టాస్క్లను ప్రారంభిస్తుంది.
సాధారణంగా, పాత సంస్కరణల్లో, అతనికి మెమరీలో సమస్యలు ఉన్నాయి (లేదు, స్మృతి కాదు, కానీ లీక్లు) మరియు లెగసీ పరామితి కూడా కాన్ఫిగర్లలోనే ఉంది. run_duration - దాని పునఃప్రారంభ విరామం. అయితే ఇప్పుడు అంతా బాగానే ఉంది.
dag (అకా "డాగ్") - "డైరెక్ట్ ఎసిక్లిక్ గ్రాఫ్", కానీ అలాంటి నిర్వచనం కొంతమందికి తెలియజేస్తుంది, కానీ వాస్తవానికి ఇది ఒకదానితో ఒకటి పరస్పర చర్య చేసే పనుల కోసం ఒక కంటైనర్ (క్రింద చూడండి) లేదా SSISలో ప్యాకేజీ మరియు ఇన్ఫర్మేటికాలో వర్క్ఫ్లో యొక్క అనలాగ్ .
డాగ్లతో పాటు, సబ్డ్యాగ్లు ఇప్పటికీ ఉండవచ్చు, కానీ మనం వాటిని పొందలేము.
DAG రన్ - ప్రారంభించబడిన డాగ్, దాని స్వంతంగా కేటాయించబడింది execution_date. అదే డాగ్కు చెందిన డాగ్రాన్లు సమాంతరంగా పని చేయవచ్చు (మీరు మీ పనులను అసంపూర్తిగా చేసి ఉంటే).
ఆపరేటర్ నిర్దిష్ట చర్యను నిర్వహించడానికి బాధ్యత వహించే కోడ్ ముక్కలు. మూడు రకాల ఆపరేటర్లు ఉన్నాయి:
చర్యమా ఇష్టం PythonOperator, ఇది ఏదైనా (చెల్లుబాటు అయ్యే) పైథాన్ కోడ్ని అమలు చేయగలదు;
బదిలీ, ఇది స్థలం నుండి మరొక ప్రదేశానికి డేటాను రవాణా చేస్తుంది, చెప్పండి, MsSqlToHiveTransfer;
నమోదు చేయు పరికరము మరోవైపు, ఇది ఒక సంఘటన జరిగే వరకు ప్రతిస్పందించడానికి లేదా డాగ్ యొక్క తదుపరి అమలును నెమ్మదించడానికి మిమ్మల్ని అనుమతిస్తుంది. HttpSensor పేర్కొన్న ముగింపు బిందువును లాగవచ్చు మరియు కావలసిన ప్రతిస్పందన వేచి ఉన్నప్పుడు, బదిలీని ప్రారంభించండి GoogleCloudStorageToS3Operator. పరిశోధనాత్మక మనస్సు ఇలా అడుగుతుంది: “ఎందుకు? అన్నింటికంటే, మీరు ఆపరేటర్లోనే పునరావృత్తులు చేయవచ్చు! ” ఆపై, సస్పెండ్ చేయబడిన ఆపరేటర్లతో పనుల పూల్ మూసుకుపోకుండా ఉండటానికి. సెన్సార్ ప్రారంభమవుతుంది, తదుపరి ప్రయత్నానికి ముందు తనిఖీ చేయబడుతుంది మరియు మరణిస్తుంది.
టాస్క్ - డిక్లేర్డ్ ఆపరేటర్లు, రకంతో సంబంధం లేకుండా, మరియు డాగ్తో జతచేయబడిన వారు టాస్క్ ర్యాంక్కు పదోన్నతి పొందుతారు.
పని ఉదాహరణ - ప్రదర్శకుడి-కార్మికులపై యుద్ధానికి టాస్క్లను పంపాల్సిన సమయం ఆసన్నమైందని సాధారణ ప్లానర్ నిర్ణయించినప్పుడు (అక్కడికక్కడే, మనం ఉపయోగిస్తే). LocalExecutor లేదా విషయంలో రిమోట్ నోడ్కి CeleryExecutor), ఇది వారికి ఒక సందర్భాన్ని కేటాయిస్తుంది (అనగా, వేరియబుల్స్ సెట్ - ఎగ్జిక్యూషన్ పారామితులు), కమాండ్ లేదా క్వెరీ టెంప్లేట్లను విస్తరిస్తుంది మరియు వాటిని పూల్ చేస్తుంది.
మేము పనులను రూపొందిస్తాము
ముందుగా, మన డౌగ్ యొక్క సాధారణ స్కీమ్ను రూపుమాపుదాం, ఆపై మేము మరింత ఎక్కువగా వివరాలలోకి ప్రవేశిస్తాము, ఎందుకంటే మేము కొన్ని చిన్నవిషయం కాని పరిష్కారాలను వర్తింపజేస్తాము.
కాబట్టి, దాని సరళమైన రూపంలో, అటువంటి డాగ్ ఇలా కనిపిస్తుంది:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
దీన్ని గుర్తించండి:
మొదట, మేము అవసరమైన లిబ్లను దిగుమతి చేస్తాము మరియు ఇంకేదో;
sql_server_ds అది - List[namedtuple[str, str]] ఎయిర్ఫ్లో కనెక్షన్ల నుండి కనెక్షన్ల పేర్లతో మరియు మేము మా ప్లేట్ను తీసుకునే డేటాబేస్లతో;
dag - మా డాగ్ యొక్క ప్రకటన, ఇది తప్పనిసరిగా ఉండాలి globals(), లేకుంటే ఎయిర్ఫ్లో దానిని కనుగొనదు. డగ్ కూడా చెప్పాలి:
అతని పేరు ఏమిటి orders - ఈ పేరు వెబ్ ఇంటర్ఫేస్లో కనిపిస్తుంది,
అతను జూలై ఎనిమిదవ తేదీ అర్ధరాత్రి నుండి పని చేస్తానని,
మరియు ఇది దాదాపు ప్రతి 6 గంటలకు నడుస్తుంది (కఠినమైన వ్యక్తుల కోసం ఇక్కడ బదులుగా timedelta() ఆమోదయోగ్యమైనది cron- లైన్ 0 0 0/6 ? * * *, తక్కువ కూల్ కోసం - వంటి వ్యక్తీకరణ @daily);
workflow() ప్రధాన పని చేస్తుంది, కానీ ఇప్పుడు కాదు. ప్రస్తుతానికి, మేము మా సందర్భాన్ని లాగ్లోకి పంపుతాము.
మరియు ఇప్పుడు పనులను సృష్టించే సాధారణ మేజిక్:
మేము మా మూలాల ద్వారా అమలు చేస్తాము;
ప్రారంభించు PythonOperator, ఇది మా డమ్మీని అమలు చేస్తుంది workflow(). టాస్క్ యొక్క ప్రత్యేకమైన (డాగ్ లోపల) పేరును పేర్కొనడం మరియు డాగ్ను కట్టడం మర్చిపోవద్దు. జెండా provide_context ప్రతిగా, ఫంక్షన్లో అదనపు ఆర్గ్యుమెంట్లను కురిపిస్తుంది, వీటిని మేము జాగ్రత్తగా సేకరిస్తాము **context.
ప్రస్తుతానికి, అంతే. మనకు లభించినవి:
వెబ్ ఇంటర్ఫేస్లో కొత్త డాగ్,
ఒకటిన్నర వందల పనులు సమాంతరంగా అమలు చేయబడతాయి (వాయు ప్రవాహం, సెలెరీ సెట్టింగ్లు మరియు సర్వర్ సామర్థ్యం అనుమతిస్తే).
బాగా, దాదాపు వచ్చింది.
డిపెండెన్సీలను ఎవరు ఇన్స్టాల్ చేస్తారు?
ఈ మొత్తం విషయాన్ని సరళీకృతం చేయడానికి, నేను చిక్కుకున్నాను docker-compose.yml ప్రాసెసింగ్ requirements.txt అన్ని నోడ్లపై.
ఇప్పుడు అది పోయింది:
గ్రే స్క్వేర్లు షెడ్యూలర్ ద్వారా ప్రాసెస్ చేయబడిన టాస్క్ ఇన్స్టాన్స్లు.
మేము కొంచెం వేచి ఉన్నాము, కార్మికులు పనిని ముగించారు:
ఆకుపచ్చ రంగులు తమ పనిని విజయవంతంగా పూర్తి చేశాయి. రెడ్స్ చాలా విజయవంతం కాలేదు.
మార్గం ద్వారా, మా ఉత్పత్తిలో ఫోల్డర్ లేదు ./dags, యంత్రాల మధ్య సమకాలీకరణ లేదు - అన్ని డాగ్లు ఉన్నాయి git మా Gitlabలో, మరియు Gitlab CI విలీనం అయినప్పుడు యంత్రాలకు నవీకరణలను పంపిణీ చేస్తుంది master.
ఫ్లవర్ గురించి కొంచెం
పనివాళ్ళు మన పసిమొగ్గలు కొడుతుండగా, మనకి ఏదో ఒకటి చూపించగల మరో సాధనం గుర్తుకు తెచ్చుకుందాం - పువ్వు.
వర్కర్ నోడ్లపై సారాంశ సమాచారంతో మొదటి పేజీ:
పని చేయడానికి వెళ్ళిన టాస్క్లతో అత్యంత తీవ్రమైన పేజీ:
మా బ్రోకర్ స్థితితో అత్యంత బోరింగ్ పేజీ:
ప్రకాశవంతమైన పేజీ టాస్క్ స్థితి గ్రాఫ్లు మరియు వాటి అమలు సమయంతో ఉంటుంది:
మేము అండర్లోడ్ చేసిన వాటిని లోడ్ చేస్తాము
కాబట్టి, అన్ని పనులు పని చేశాయి, మీరు గాయపడినవారిని తీసుకెళ్లవచ్చు.
మరియు చాలా మంది గాయపడ్డారు - ఒక కారణం లేదా మరొకటి. ఎయిర్ఫ్లో సరైన ఉపయోగం విషయంలో, డేటా ఖచ్చితంగా రాలేదని ఈ చతురస్రాలు సూచిస్తున్నాయి.
మీరు లాగ్ను చూడాలి మరియు పడిపోయిన టాస్క్ ఇన్స్టాన్స్లను రీస్టార్ట్ చేయాలి.
ఏదైనా స్క్వేర్పై క్లిక్ చేయడం ద్వారా, మాకు అందుబాటులో ఉన్న చర్యలను మేము చూస్తాము:
మీరు తీసుకొని పడిపోయిన వాటిని క్లియర్ చేయవచ్చు. అంటే, అక్కడ ఏదో విఫలమైందని మనం మరచిపోతాము మరియు అదే ఉదాహరణ పని షెడ్యూలర్కి వెళ్తుంది.
అన్ని ఎరుపు చతురస్రాలతో మౌస్తో దీన్ని చేయడం చాలా మానవత్వం కాదని స్పష్టమైంది - ఇది మేము ఎయిర్ఫ్లో నుండి ఆశించేది కాదు. సహజంగానే, మన దగ్గర సామూహిక విధ్వంసం చేసే ఆయుధాలు ఉన్నాయి: Browse/Task Instances
అన్నింటినీ ఒకేసారి ఎంచుకుని, సున్నాకి రీసెట్ చేద్దాం, సరైన అంశాన్ని క్లిక్ చేయండి:
శుభ్రపరిచిన తర్వాత, మా టాక్సీలు ఇలా కనిపిస్తాయి (షెడ్యూలర్ వాటిని షెడ్యూల్ చేయడానికి అవి ఇప్పటికే వేచి ఉన్నాయి):
కనెక్షన్లు, హుక్స్ మరియు ఇతర వేరియబుల్స్
ఇది తదుపరి DAGని చూసే సమయం, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
ప్రతి ఒక్కరూ ఎప్పుడైనా రిపోర్ట్ అప్డేట్ చేశారా? ఇది మళ్లీ ఆమె: డేటాను ఎక్కడ నుండి పొందాలో మూలాల జాబితా ఉంది; ఎక్కడ ఉంచాలో జాబితా ఉంది; ప్రతిదీ జరిగినప్పుడు లేదా విరిగిపోయినప్పుడు హాంక్ చేయడం మర్చిపోవద్దు (అలాగే, ఇది మన గురించి కాదు, లేదు).
మళ్లీ ఫైల్ని పరిశీలిద్దాం మరియు కొత్త అస్పష్టమైన అంశాలను చూద్దాం:
from commons.operators import TelegramBotSendMessage - అన్బ్లాక్ చేయబడిన వారికి సందేశాలను పంపడం కోసం చిన్న రేపర్ని తయారు చేయడం ద్వారా మేము మా స్వంత ఆపరేటర్లను తయారు చేయకుండా ఏమీ నిరోధించలేదు. (మేము దిగువ ఈ ఆపరేటర్ గురించి మరింత మాట్లాడుతాము);
default_args={} - dag దాని ఆపరేటర్లందరికీ ఒకే వాదనలను పంపిణీ చేయగలదు;
to='{{ var.value.all_the_kings_men }}' - ఫీల్డ్ to మేము హార్డ్కోడ్ చేయము, కానీ జింజా మరియు ఇమెయిల్ల జాబితాతో వేరియబుల్ని ఉపయోగించి డైనమిక్గా రూపొందించాము, నేను జాగ్రత్తగా ఉంచాను Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - ఆపరేటర్ను ప్రారంభించడానికి షరతు. మా విషయంలో, అన్ని డిపెండెన్సీలు పనిచేసినప్పుడు మాత్రమే లేఖ ఉన్నతాధికారులకు ఎగురుతుంది విజయవంతంగా;
tg_bot_conn_id='tg_main' - వాదనలు conn_id మేము సృష్టించిన కనెక్షన్ IDలను అంగీకరించండి Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - పడిపోయిన పనులు ఉంటే మాత్రమే టెలిగ్రామ్లోని సందేశాలు ఎగిరిపోతాయి;
task_concurrency=1 - మేము ఒక టాస్క్ యొక్క అనేక టాస్క్ ఇన్స్టాన్స్లను ఏకకాలంలో ప్రారంభించడాన్ని నిషేధిస్తాము. లేకపోతే, మేము అనేక ఏకకాల ప్రయోగాన్ని పొందుతాము VerticaOperator (ఒక టేబుల్ వైపు చూడటం);
report_update >> [email, tg] - అన్నీ VerticaOperator ఇలా లేఖలు మరియు సందేశాలను పంపడంలో కలుస్తాయి:
కానీ నోటిఫైయర్ ఆపరేటర్లు వేర్వేరు ప్రయోగ పరిస్థితులను కలిగి ఉన్నందున, ఒకటి మాత్రమే పని చేస్తుంది. ట్రీ వ్యూలో, ప్రతిదీ కొద్దిగా తక్కువ దృశ్యమానంగా కనిపిస్తుంది:
నేను గురించి కొన్ని మాటలు చెబుతాను మాక్రోలు మరియు వారి స్నేహితులు - వేరియబుల్స్.
మాక్రోలు జింజా ప్లేస్హోల్డర్లు, ఇవి వివిధ ఉపయోగకరమైన సమాచారాన్ని ఆపరేటర్ ఆర్గ్యుమెంట్లుగా మార్చగలవు. ఉదాహరణకు, ఇలా:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} సందర్భ వేరియబుల్ యొక్క కంటెంట్లకు విస్తరిస్తుంది execution_date ఆకృతిలో YYYY-MM-DD: 2020-07-14. మంచి భాగం ఏమిటంటే, కాంటెక్స్ట్ వేరియబుల్స్ నిర్దిష్ట టాస్క్ ఇన్స్టాన్స్కి (ట్రీ వ్యూలో ఒక చతురస్రం) నైల్ చేయబడి ఉంటాయి మరియు పునఃప్రారంభించినప్పుడు, ప్లేస్హోల్డర్లు అదే విలువలకు విస్తరిస్తాయి.
ప్రతి టాస్క్ ఇన్స్టాన్స్లో రెండర్ చేయబడిన బటన్ను ఉపయోగించి కేటాయించిన విలువలను వీక్షించవచ్చు. లేఖను పంపే పని ఇలా ఉంది:
కాబట్టి సందేశాన్ని పంపే పనిలో:
అందుబాటులో ఉన్న తాజా వెర్షన్ కోసం అంతర్నిర్మిత మాక్రోల పూర్తి జాబితా ఇక్కడ అందుబాటులో ఉంది: స్థూల సూచన
అంతేకాకుండా, ప్లగిన్ల సహాయంతో, మన స్వంత మాక్రోలను ప్రకటించవచ్చు, కానీ అది మరొక కథ.
ముందే నిర్వచించిన విషయాలతో పాటు, మన వేరియబుల్స్ యొక్క విలువలను ప్రత్యామ్నాయం చేయవచ్చు (నేను ఇప్పటికే పై కోడ్లో దీనిని ఉపయోగించాను). లో సృష్టిద్దాం Admin/Variables కొన్ని విషయాలు:
కావలసిన కీకి మార్గాన్ని ఉపయోగించండి: {{ var.json.bot_config.bot.token }}.
నేను అక్షరాలా ఒక పదం చెబుతాను మరియు దాని గురించి ఒక స్క్రీన్షాట్ చూపిస్తాను కనెక్షన్లు. ఇక్కడ ప్రతిదీ ప్రాథమికమైనది: పేజీలో Admin/Connections మేము కనెక్షన్ని సృష్టిస్తాము, అక్కడ మా లాగిన్లు / పాస్వర్డ్లు మరియు మరిన్ని నిర్దిష్ట పారామితులను జోడించండి. ఇలా:
పాస్వర్డ్లను గుప్తీకరించవచ్చు (డిఫాల్ట్ కంటే మరింత క్షుణ్ణంగా), లేదా మీరు కనెక్షన్ రకాన్ని వదిలివేయవచ్చు (నేను చేసినట్లుగా tg_main) - వాస్తవం ఏమిటంటే, రకాల జాబితా ఎయిర్ఫ్లో మోడల్లలో హార్డ్వైర్డ్గా ఉంది మరియు సోర్స్ కోడ్లలోకి ప్రవేశించకుండా విస్తరించబడదు (అకస్మాత్తుగా నేను ఏదైనా గూగుల్ చేయకపోతే, దయచేసి నన్ను సరిదిద్దండి), కానీ ఏదీ మనల్ని క్రెడిట్లను పొందకుండా ఆపదు పేరు.
మీరు ఒకే పేరుతో అనేక కనెక్షన్లను కూడా చేయవచ్చు: ఈ సందర్భంలో, పద్ధతి BaseHook.get_connection(), ఇది పేరు ద్వారా మాకు కనెక్షన్లను పొందుతుంది, ఇస్తుంది యాదృచ్ఛికంగా అనేక నేమ్సేక్ల నుండి (రౌండ్ రాబిన్ను తయారు చేయడం మరింత తార్కికంగా ఉంటుంది, కానీ దానిని ఎయిర్ఫ్లో డెవలపర్ల మనస్సాక్షిపై వదిలివేద్దాం).
వేరియబుల్స్ మరియు కనెక్షన్లు ఖచ్చితంగా అద్భుతమైన సాధనాలు, కానీ బ్యాలెన్స్ను కోల్పోకుండా ఉండటం ముఖ్యం: మీ ప్రవాహాల యొక్క ఏ భాగాలను మీరు కోడ్లోనే నిల్వ చేస్తారు మరియు నిల్వ కోసం మీరు ఎయిర్ఫ్లోకి ఏ భాగాలను ఇస్తారు. ఒక వైపు, UI ద్వారా విలువను త్వరగా మార్చడం సౌకర్యంగా ఉంటుంది, ఉదాహరణకు, మెయిలింగ్ బాక్స్. మరోవైపు, ఇది ఇప్పటికీ మౌస్ క్లిక్కి తిరిగి వస్తుంది, దీని నుండి మేము (నేను) వదిలించుకోవాలనుకుంటున్నాము.
కనెక్షన్లతో పనిచేయడం ఒక పని హుక్స్. సాధారణంగా, ఎయిర్ఫ్లో హుక్స్ అనేది థర్డ్-పార్టీ సేవలు మరియు లైబ్రరీలకు కనెక్ట్ చేయడానికి పాయింట్లు. ఉదా, JiraHook జిరాతో పరస్పర చర్య చేయడానికి మాకు క్లయింట్ను తెరుస్తుంది (మీరు టాస్క్లను ముందుకు వెనుకకు తరలించవచ్చు), మరియు దీని సహాయంతో SambaHook మీరు లోకల్ ఫైల్ని నెట్టవచ్చు smb-పాయింట్.
కస్టమ్ ఆపరేటర్ని అన్వయించడం
మరియు అది ఎలా తయారు చేయబడిందో చూడడానికి మేము దగ్గరగా వచ్చాము TelegramBotSendMessage
కోడ్ commons/operators.py అసలు ఆపరేటర్తో:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
ఇక్కడ, ఎయిర్ఫ్లో అన్నిటిలాగే, ప్రతిదీ చాలా సులభం:
నుండి వారసత్వంగా వచ్చింది BaseOperator, ఇది చాలా కొన్ని గాలి ప్రవాహ-నిర్దిష్ట విషయాలను అమలు చేస్తుంది (మీ విశ్రాంతిని చూడండి)
ప్రకటించిన క్షేత్రాలు template_fields, దీనిలో జింజా ప్రాసెస్ చేయడానికి మాక్రోల కోసం చూస్తుంది.
కోసం సరైన వాదనలు ఏర్పాటు చేశారు __init__(), అవసరమైన చోట డిఫాల్ట్లను సెట్ చేయండి.
మేము పూర్వీకుల ప్రారంభోత్సవం గురించి కూడా మరచిపోలేదు.
సంబంధిత హుక్ తెరిచింది TelegramBotHookదాని నుండి క్లయింట్ వస్తువును పొందింది.
భర్తీ చేయబడిన (పునర్నిర్వచించబడిన) పద్ధతి BaseOperator.execute(), ఆపరేటర్ను ప్రారంభించే సమయం వచ్చినప్పుడు ఏ ఎయిర్ఫో ట్విచ్ అవుతుంది - అందులో మేము లాగిన్ చేయడం మర్చిపోకుండా ప్రధాన చర్యను అమలు చేస్తాము. (మేము లాగ్ ఇన్, మార్గం ద్వారా, కుడి ఇన్ stdout и stderr - వాయుప్రవాహం ప్రతిదీ అడ్డగిస్తుంది, అందంగా చుట్టి, అవసరమైన చోట కుళ్ళిపోతుంది.)
మన దగ్గర ఏమి ఉందో చూద్దాం commons/hooks.py. ఫైల్ యొక్క మొదటి భాగం, హుక్తోనే:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
ఇక్కడ ఏమి వివరించాలో కూడా నాకు తెలియదు, నేను ముఖ్యమైన అంశాలను గమనిస్తాను:
మేము వారసత్వంగా పొందుతాము, వాదనల గురించి ఆలోచించండి - చాలా సందర్భాలలో ఇది ఒకటిగా ఉంటుంది: conn_id;
ప్రామాణిక పద్ధతులను భర్తీ చేయడం: నేను నన్ను పరిమితం చేసుకున్నాను get_conn(), దీనిలో నేను కనెక్షన్ పారామితులను పేరు ద్వారా పొందుతాను మరియు విభాగాన్ని పొందుతాను extra (ఇది JSON ఫీల్డ్), దీనిలో నేను (నా స్వంత సూచనల ప్రకారం!) టెలిగ్రామ్ బాట్ టోకెన్ను ఉంచాను: {"bot_token": "YOuRAwEsomeBOtToKen"}.
నేను మా ఉదాహరణను సృష్టిస్తాను TelegramBot, దీనికి నిర్దిష్ట టోకెన్ ఇవ్వడం.
అంతే. మీరు ఉపయోగించి హుక్ నుండి క్లయింట్ని పొందవచ్చు TelegramBotHook().clent లేదా TelegramBotHook().get_conn().
మరియు ఫైల్ యొక్క రెండవ భాగం, దీనిలో నేను టెలిగ్రామ్ REST API కోసం మైక్రోవ్రాపర్ను తయారు చేస్తాను, దానిని లాగకుండా ఉండేందుకు python-telegram-bot ఒక పద్ధతి కోసం sendMessage.
అన్నింటినీ జోడించడం సరైన మార్గం: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ప్లగ్ఇన్లో, పబ్లిక్ రిపోజిటరీలో ఉంచండి మరియు దానిని ఓపెన్ సోర్స్కు ఇవ్వండి.
మేము ఇవన్నీ అధ్యయనం చేస్తున్నప్పుడు, మా నివేదిక నవీకరణలు విజయవంతంగా విఫలమయ్యాయి మరియు ఛానెల్లో నాకు ఎర్రర్ సందేశాన్ని పంపాయి. అది తప్పు కాదా అని నేను తనిఖీ చేస్తాను...
మా కుక్కలో ఏదో విరిగింది! మనం ఊహించినది అది కాదా? సరిగ్గా!
మీరు పోయబోతున్నారా?
నేను ఏదో కోల్పోయినట్లు మీకు అనిపిస్తుందా? అతను SQL సర్వర్ నుండి వెర్టికాకు డేటాను బదిలీ చేస్తానని వాగ్దానం చేసినట్లు తెలుస్తోంది, ఆపై అతను దానిని తీసుకొని టాపిక్ ఆఫ్ స్కౌండ్రల్!
ఈ దారుణం ఉద్దేశపూర్వకంగా జరిగింది, నేను మీ కోసం కొన్ని పదజాలాన్ని అర్థంచేసుకోవలసి వచ్చింది. ఇప్పుడు మీరు మరింత ముందుకు వెళ్ళవచ్చు.
మా ప్రణాళిక ఇది:
డాగ్ చేయండి
టాస్క్లను రూపొందించండి
అంతా ఎంత అందంగా ఉందో చూడండి
పూరించడానికి సెషన్ నంబర్లను కేటాయించండి
SQL సర్వర్ నుండి డేటాను పొందండి
డేటాను వెర్టికాలో ఉంచండి
గణాంకాలు సేకరించండి
కాబట్టి, ఇవన్నీ అప్ మరియు రన్నింగ్ కోసం, నేను మా దానికి ఒక చిన్న అదనంగా చేసాను docker-compose.yml:
హోస్ట్గా వెర్టికా dwh అత్యంత డిఫాల్ట్ సెట్టింగ్లతో,
SQL సర్వర్ యొక్క మూడు ఉదాహరణలు,
మేము తరువాతి డేటాబేస్లను కొంత డేటాతో నింపుతాము (ఎట్టి పరిస్థితుల్లోనూ పరిశీలించవద్దు mssql_init.py!)
మేము చివరిసారి కంటే కొంచెం సంక్లిష్టమైన ఆదేశం సహాయంతో అన్ని మంచిని ప్రారంభించాము:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
మా మిరాకిల్ రాండమైజర్ ఏమి ఉత్పత్తి చేసిందో, మీరు అంశాన్ని ఉపయోగించవచ్చు Data Profiling/Ad Hoc Query:
ప్రధాన విషయం ఏమిటంటే దానిని విశ్లేషకులకు చూపించకూడదు
విశదీకరించండి ETL సెషన్లు నేను చేయను, అక్కడ ప్రతిదీ చిన్నవిషయం: మేము ఒక ఆధారాన్ని తయారు చేస్తాము, దానిలో ఒక సంకేతం ఉంది, మేము ప్రతిదీ సందర్భ నిర్వాహికితో చుట్టాము మరియు ఇప్పుడు మేము ఇలా చేస్తాము:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
సమయం వచ్చింది మా డేటాను సేకరించండి మా ఒకటిన్నర వందల పట్టికల నుండి. చాలా అనుకవగల పంక్తుల సహాయంతో దీన్ని చేద్దాం:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
ఒక హుక్ సహాయంతో మేము ఎయిర్ఫ్లో నుండి పొందుతాము pymssql- కనెక్ట్ చేయండి
అభ్యర్థనలో తేదీ రూపంలో పరిమితిని ప్రత్యామ్నాయం చేద్దాం - ఇది టెంప్లేట్ ఇంజిన్ ద్వారా ఫంక్షన్లోకి విసిరివేయబడుతుంది.
మా అభ్యర్థనను అందిస్తోంది pandasఎవరు మాకు పొందుతారు DataFrame - ఇది భవిష్యత్తులో మాకు ఉపయోగకరంగా ఉంటుంది.
నేను ప్రత్యామ్నాయాన్ని ఉపయోగిస్తున్నాను {dt} అభ్యర్థన పరామితికి బదులుగా %s నేను చెడ్డ పినోచియో కాబట్టి కాదు, కానీ ఎందుకంటే pandas నిర్వహించలేరు pymssql మరియు చివరిది జారిపోతుంది params: Listఅతను నిజంగా కోరుకుంటున్నప్పటికీ tuple.
డెవలపర్ అని కూడా గమనించండి pymssql అతనికి ఇకపై మద్దతు ఇవ్వకూడదని నిర్ణయించుకుంది మరియు ఇది బయటకు వెళ్లడానికి సమయం pyodbc.
ఎయిర్ఫ్లో మా ఫంక్షన్ల ఆర్గ్యుమెంట్లను దేనితో నింపిందో చూద్దాం:
డేటా లేకపోతే, కొనసాగించడంలో అర్థం లేదు. కానీ ఫిల్లింగ్ విజయవంతంగా పరిగణించడం కూడా వింతగా ఉంది. అయితే ఇది పొరపాటు కాదు. అ-ఆహ్, ఏమి చేయాలి?! మరియు ఇక్కడ ఏమి ఉంది:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException లోపాలు లేవని ఎయిర్ఫ్లో చెబుతుంది, కానీ మేము పనిని దాటవేస్తాము. ఇంటర్ఫేస్లో ఆకుపచ్చ లేదా ఎరుపు చతురస్రం ఉండదు, కానీ పింక్.
మా వరద సెషన్ ID (ఇది భిన్నంగా ఉంటుంది ప్రతి పని కోసం),
మూలం మరియు ఆర్డర్ ID నుండి హాష్ - తద్వారా తుది డేటాబేస్లో (ప్రతిదీ ఒకే టేబుల్లో పోస్తారు) మనకు ప్రత్యేకమైన ఆర్డర్ ID ఉంటుంది.
చివరి దశ మిగిలి ఉంది: వెర్టికాలో ప్రతిదీ పోయాలి. మరియు, విచిత్రమేమిటంటే, దీన్ని చేయడానికి అత్యంత అద్భుతమైన మరియు సమర్థవంతమైన మార్గాలలో ఒకటి CSV ద్వారా!
విక్రయంలో, మేము టార్గెట్ ప్లేట్ను మాన్యువల్గా సృష్టిస్తాము. ఇక్కడ నేను ఒక చిన్న యంత్రాన్ని అనుమతించాను:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
నేను వాడుతున్నాను VerticaOperator() నేను డేటాబేస్ స్కీమా మరియు టేబుల్ని క్రియేట్ చేస్తాను (అవి ఇప్పటికే ఉనికిలో లేకుంటే). ప్రధాన విషయం ఏమిటంటే డిపెండెన్సీలను సరిగ్గా అమర్చడం:
- బాగా, - చిన్న మౌస్, - అది కాదు, ఇప్పుడు
అడవిలో నేనే అత్యంత భయంకరమైన జంతువు అని మీరు నమ్ముతున్నారా?
జూలియా డోనాల్డ్సన్, ది గ్రుఫలో
నా సహోద్యోగులకు మరియు నాకు పోటీ ఉంటే: ఎవరు త్వరగా మొదటి నుండి ETL ప్రక్రియను సృష్టిస్తారు మరియు ప్రారంభిస్తారు: వారు వారి SSIS మరియు మౌస్తో మరియు నేను ఎయిర్ఫ్లోతో ... ఆపై మేము నిర్వహణ సౌలభ్యాన్ని కూడా పోల్చి చూస్తాము ... వావ్, నేను వారిని అన్ని రంగాలలో ఓడిస్తానని మీరు అంగీకరిస్తారని నేను భావిస్తున్నాను!
కొంచెం గంభీరంగా ఉంటే, అపాచీ ఎయిర్ఫ్లో - ప్రోగ్రామ్ కోడ్ రూపంలో ప్రక్రియలను వివరించడం ద్వారా - నా పని నేను చేసాను మరింత మరింత సౌకర్యవంతమైన మరియు ఆనందించే.
దాని అపరిమిత విస్తరణ, ప్లగ్-ఇన్ల పరంగా మరియు స్కేలబిలిటీకి ముందస్తుగా, దాదాపు ఏ ప్రాంతంలోనైనా వాయు ప్రవాహాన్ని ఉపయోగించుకునే అవకాశాన్ని మీకు అందిస్తుంది: రాకెట్లను ప్రయోగించడంలో కూడా డేటాను సేకరించడం, సిద్ధం చేయడం మరియు ప్రాసెస్ చేయడం వంటి పూర్తి చక్రంలో కూడా (మార్స్, ఆఫ్ కోర్సు).
పార్ట్ ఫైనల్, సూచన మరియు సమాచారం
మేము మీ కోసం సేకరించిన రేక్
start_date. అవును, ఇది ఇప్పటికే స్థానిక జ్ఞాపకం. డౌగ్ యొక్క ప్రధాన వాదన ద్వారా start_date అన్ని పాస్. క్లుప్తంగా, మీరు పేర్కొన్నట్లయితే start_date ప్రస్తుత తేదీ, మరియు schedule_interval - ఒక రోజు, అప్పుడు DAG రేపు ముందుగా ప్రారంభమవుతుంది.
start_date = datetime(2020, 7, 7, 0, 1, 2)
మరియు మరిన్ని సమస్యలు లేవు.
దానితో అనుబంధించబడిన మరొక రన్టైమ్ లోపం ఉంది: Task is missing the start_date parameter, ఇది చాలా తరచుగా మీరు డాగ్ ఆపరేటర్కు బంధించడం మర్చిపోయినట్లు సూచిస్తుంది.
అన్నీ ఒకే యంత్రంపై. అవును, మరియు బేస్లు (ఎయిర్ఫ్లో మరియు మా పూత), మరియు వెబ్ సర్వర్ మరియు షెడ్యూలర్ మరియు కార్మికులు. మరియు అది కూడా పనిచేసింది. కానీ కాలక్రమేణా, సేవల కోసం టాస్క్ల సంఖ్య పెరిగింది మరియు PostgreSQL 20 ఎంఎస్లకు బదులుగా 5 సెకన్లలో ఇండెక్స్కు ప్రతిస్పందించడం ప్రారంభించినప్పుడు, మేము దానిని తీసుకొని తీసుకెళ్లాము.
లోకల్ ఎగ్జిక్యూటర్. అవును, మేము ఇంకా దానిపై కూర్చున్నాము మరియు మేము ఇప్పటికే అగాధం అంచుకు వచ్చాము. LocalExecutor మాకు ఇప్పటివరకు సరిపోతుంది, కానీ ఇప్పుడు కనీసం ఒక కార్మికునితో విస్తరించడానికి సమయం ఆసన్నమైంది మరియు CeleryExecutorకి వెళ్లడానికి మేము చాలా కష్టపడాల్సి ఉంటుంది. మరియు మీరు దానితో ఒక మెషీన్లో పని చేయగలరనే వాస్తవాన్ని దృష్టిలో ఉంచుకుని, సర్వర్లో కూడా సెలెరీని ఉపయోగించకుండా ఏదీ మిమ్మల్ని ఆపదు, ఇది “వాస్తవానికి, నిజాయితీగా ఉత్పత్తికి వెళ్లదు!”
ఉపయోగం లేనిది అంతర్నిర్మిత సాధనాలు:
కనెక్షన్లు సేవా ఆధారాలను నిల్వ చేయడానికి,
SLA మిస్లు సమయానికి పని చేయని పనులకు ప్రతిస్పందించడానికి,
xcom మెటాడేటా మార్పిడి కోసం (నేను చెప్పాను మెటాడేటా!) డాగ్ టాస్క్ల మధ్య.
మెయిల్ దుర్వినియోగం. సరే, నేను ఏమి చెప్పగలను? పడిపోయిన పనుల యొక్క అన్ని పునరావృతాల కోసం హెచ్చరికలు సెటప్ చేయబడ్డాయి. ఇప్పుడు నా పని Gmailలో ఎయిర్ఫ్లో నుండి 90k ఇమెయిల్లు ఉన్నాయి మరియు వెబ్ మెయిల్ మూతి ఒకేసారి 100 కంటే ఎక్కువ తీయడానికి మరియు తొలగించడానికి నిరాకరిస్తుంది.
మన చేతులతో కాకుండా మన తలలతో మరింత ఎక్కువగా పనిచేయడానికి, ఎయిర్ఫ్లో మా కోసం దీన్ని సిద్ధం చేసింది:
REST API - అతను ఇప్పటికీ ప్రయోగాత్మక స్థితిని కలిగి ఉన్నాడు, ఇది అతన్ని పని చేయకుండా నిరోధించదు. దానితో, మీరు డాగ్లు మరియు టాస్క్ల గురించి సమాచారాన్ని మాత్రమే పొందలేరు, కానీ డాగ్ను ఆపండి/ప్రారంభించండి, DAG రన్ లేదా పూల్ను సృష్టించండి.
CLI - WebUI ద్వారా ఉపయోగించడానికి అసౌకర్యంగా ఉండటమే కాకుండా సాధారణంగా లేని అనేక సాధనాలు కమాండ్ లైన్ ద్వారా అందుబాటులో ఉన్నాయి. ఉదాహరణకి:
backfill పని సందర్భాలను పునఃప్రారంభించడం అవసరం.
ఉదాహరణకు, విశ్లేషకులు వచ్చి ఇలా అన్నారు: “మరియు మీరు, కామ్రేడ్, జనవరి 1 నుండి 13 వరకు డేటాలో అర్ధంలేనిది! దాన్ని పరిష్కరించండి, పరిష్కరించండి, సరిదిద్దండి, సరిదిద్దండి!" మరియు మీరు అలాంటి హాబ్:
run, ఇది ఒక ఉదాహరణ పనిని అమలు చేయడానికి మరియు అన్ని డిపెండెన్సీలపై కూడా స్కోర్ చేయడానికి మిమ్మల్ని అనుమతిస్తుంది. అదనంగా, మీరు దీన్ని అమలు చేయవచ్చు LocalExecutor, మీకు సెలెరీ క్లస్టర్ ఉన్నప్పటికీ.
దాదాపు అదే పని చేస్తుంది test, బేస్లలో కూడా ఏమీ వ్రాయదు.
connections షెల్ నుండి కనెక్షన్ల యొక్క భారీ సృష్టిని అనుమతిస్తుంది.
పైథాన్ API - ఇంటరాక్ట్ చేయడానికి చాలా హార్డ్కోర్ మార్గం, ఇది ప్లగిన్ల కోసం ఉద్దేశించబడింది మరియు చిన్న చేతులతో దానిలో గుమిగూడదు. అయితే మమ్మల్ని వెళ్లకుండా ఎవరు ఆపాలి /home/airflow/dags, పరుగు ipython మరియు గందరగోళాన్ని ప్రారంభించాలా? మీరు, ఉదాహరణకు, కింది కోడ్తో అన్ని కనెక్షన్లను ఎగుమతి చేయవచ్చు:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
ఎయిర్ఫ్లో మెటాడేబేస్కు కనెక్ట్ చేస్తోంది. నేను దీనికి వ్రాయమని సిఫారసు చేయను, కానీ వివిధ నిర్దిష్ట కొలమానాల కోసం విధి స్థితిని పొందడం అనేది ఏదైనా APIలను ఉపయోగించడం కంటే చాలా వేగంగా మరియు సులభంగా ఉంటుంది.
మన పనులన్నీ బలహీనమైనవి కావు, కానీ అవి కొన్నిసార్లు పడిపోవచ్చు మరియు ఇది సాధారణం. కానీ కొన్ని అడ్డంకులు ఇప్పటికే అనుమానాస్పదంగా ఉన్నాయి మరియు తనిఖీ చేయడం అవసరం.
SQL జాగ్రత్త!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
సూచనలు
మరియు వాస్తవానికి, Google జారీ చేసిన మొదటి పది లింక్లు నా బుక్మార్క్ల నుండి ఎయిర్ఫ్లో ఫోల్డర్లోని కంటెంట్లు.
ది జెన్ ఆఫ్ పైథాన్ మరియు అపాచీ ఎయిర్ఫ్లో - ఇంప్లిసిట్ DAG ఫార్వార్డింగ్, ఫంక్షన్లలో సందర్భం విసరడం, మళ్లీ డిపెండెన్సీల గురించి మరియు టాస్క్ లాంచ్లను దాటవేయడం గురించి.