مان توهان کي ETL پروسيس کي ترقي ڪرڻ لاء هڪ شاندار اوزار بابت ٻڌايان ٿو - Apache Airflow. پر ايئر فلو ايترو ورڇيل ۽ گھڻائي وارو آهي ته توهان کي ان تي هڪ ويجهي نظر رکڻ گهرجي جيتوڻيڪ توهان ڊيٽا جي وهڪري ۾ شامل نه آهيو، پر توهان کي وقتي طور تي ڪنهن به عمل کي شروع ڪرڻ ۽ انهن جي عمل جي نگراني ڪرڻ جي ضرورت آهي.
۽ ها، مان نه رڳو ٻڌايان ٿو، پر ڏيکاريو پڻ: پروگرام ۾ تمام گهڻو ڪوڊ، اسڪرين شاٽ ۽ سفارشون آهن.
جيڪو توهان عام طور تي ڏسندا آهيو جڏهن توهان گوگل لفظ Airflow / Wikimedia Commons استعمال ڪندا آهيو
- صرف بهتر، ۽ اهو مڪمل طور تي مختلف مقصدن لاء ٺاهيو ويو آهي، يعني (جيئن ته کاتا کان اڳ لکيو ويو آهي):
مشينن جي لامحدود تعداد تي هلائڻ ۽ نگراني جا ڪم (جيئن ڪيترائي Celery / Kubernetes ۽ توهان جو ضمير توهان کي اجازت ڏيندو)
Python ڪوڊ لکڻ ۽ سمجھڻ ۾ تمام آسان کان متحرڪ ڪم فلو نسل سان
۽ ڪنهن به ڊيٽابيس ۽ APIs کي هڪ ٻئي سان ڳنڍڻ جي صلاحيت ٻنهي تيار ڪيل اجزاء ۽ گهر ٺاهيل پلگ ان استعمال ڪندي (جيڪو انتهائي سادو آهي).
اسان Apache Airflow هن طرح استعمال ڪندا آهيون:
اسان مختلف ذريعن کان ڊيٽا گڏ ڪريون ٿا (ڪيترن ئي SQL سرور ۽ PostgreSQL مثال، مختلف APIs سان ايپليڪيشن ميٽرڪس، حتي 1C) DWH ۽ ODS ۾ (اسان وٽ Vertica ۽ Clickhouse آهي).
ڪيترو ترقي يافته cron، جيڪو ODS تي ڊيٽا جي استحڪام جي عمل کي شروع ڪري ٿو، ۽ انهن جي سار سنڀال جي نگراني پڻ ڪري ٿو.
تازو تائين، اسان جون ضرورتون ڍڪيل هيون هڪ ننڍڙي سرور سان 32 ڪور ۽ 50 GB جي رام سان. ايئر فلو ۾، هي ڪم ڪري ٿو:
более 200 ٻج (حقيقت ۾ ڪم فلوز، جنهن ۾ اسان ڪمن کي پورو ڪيو)
هر هڪ ۾ اوسط 70 ڪم,
هي نيڪي شروع ٿئي ٿي (اوسط تي) هڪ ڪلاڪ ۾ هڪ ڀيرو.
۽ اسان ڪيئن وڌايو، مان هيٺ لکندس، پر هاڻي اچو ته وضاحت ڪريون über-مسئلا جيڪو اسان حل ڪنداسين:
هتي ٽي ماخذ SQL سرور آهن، هر هڪ ۾ 50 ڊيٽابيس - هڪ منصوبي جا مثال، ترتيب سان، انهن وٽ هڪ ئي جوڙجڪ آهي (تقريبا هر جڳهه، mua-ha-ha)، جنهن جو مطلب آهي ته هر هڪ وٽ آرڊر ٽيبل آهي (خوش قسمتي سان، هڪ ٽيبل ان سان گڏ. نالو ڪنهن به ڪاروبار ۾ پئجي سگهي ٿو). اسان ڊيٽا کي شامل ڪندي سروس فيلڊز (ذريعو سرور، ماخذ ڊيٽابيس، اي ٽي ايل ٽاسڪ ID) شامل ڪري وٺون ٿا ۽ انھن کي صاف طور تي اڇلائي، چئو، ورٽيڪا.
اچو ته اچو!
مکيه حصو، عملي (۽ ٿورڙو نظرياتي)
ڇو اسان (۽ توهان)
جڏهن وڻ وڏا هئا ۽ مان سادو هوس SQL-سکڪ هڪ روسي پرچون ۾، اسان ETL پروسيس کي اسڪيم ڪيو عرف ڊيٽا فلوز اسان وٽ موجود ٻه اوزار استعمال ڪندي:
انفارميٽيڪا پاور سينٽر - هڪ انتهائي پکڙيل سسٽم، انتهائي پيداواري، پنهنجي هارڊويئر سان، پنهنجي ورزننگ. مون خدا کي استعمال ڪيو 1٪ ان جي صلاحيتن جو. ڇو؟ خير، سڀ کان پهريان، هي انٽرفيس، 380 کان ڪٿي، ذهني طور تي اسان تي دٻاء وجهي ٿو. ٻيو، هي تڪرار انتهائي فينسي پروسيس، بيحد جزو ٻيهر استعمال ۽ ٻين تمام اهم-انٽرپرائز-چالن لاءِ ٺهيل آهي. حقيقت اها آهي ته ان جي قيمت، ايئر بيس AXNUMX / سال جي ونگ وانگر، اسان ڪجهه به نه چوندا سين.
خبردار، هڪ اسڪرين شاٽ 30 سالن کان گهٽ عمر وارن ماڻهن کي نقصان پهچائي سگھي ٿو
SQL سرور انٽيگريشن سرور - اسان هن ڪامريڊ کي اسان جي انٽرا پروجيڪٽ جي وهڪري ۾ استعمال ڪيو. خير، حقيقت ۾: اسان اڳ ۾ ئي SQL سرور استعمال ڪندا آهيون، ۽ اهو ڪنهن به طرح غير معقول هوندو ان جي اي ٽي ايل اوزار استعمال ڪرڻ نه. ان ۾ سڀ ڪجهه سٺو آهي: ٻئي انٽرفيس خوبصورت آهي، ۽ پيش رفت رپورٽون ... پر اهو ڇو نه آهي ته اسان سافٽ ويئر پروڊڪٽس سان پيار ڪندا آهيون، اوه، هن لاء نه. ان جو نسخو dtsx (جيڪو XML آهي نوڊس سان شفل ٿيل محفوظ تي) اسان ڪري سگهون ٿا، پر مقصد ڇا آهي؟ ڪيئن هڪ ٽاسڪ پيڪيج ٺاهڻ بابت جيڪو سوين ٽيبل هڪ سرور کان ٻئي ڏانهن ڇڪيندو؟ ها، ڇا هڪ سو، توهان جي آڱر جي آڱر ويهن ٽڪرن مان گر ٿي ويندي، مائوس جي بٽڻ تي ڪلڪ ڪريو. پر اهو ضرور وڌيڪ فيشن ڏسڻ ۾ اچي ٿو:
اسان يقيناً ٻاهر جا طريقا ڳوليندا هئاسين. ڪيس به تقريبن هڪ خود لکيل SSIS پيڪيج جنريٽر تي آيو ...
... ۽ پوءِ هڪ نئين نوڪري ملي. ۽ Apache Airflow مون کي ان تي ختم ڪيو.
جڏهن مون کي معلوم ٿيو ته ETL پروسيس وضاحتون سادي پٿون ڪوڊ آهن، مون صرف خوشي لاء ناچ نه ڪيو. اهڙيءَ طرح ڊيٽا اسٽريمز کي ورجن ۽ مختلف ڪيو ويو، ۽ سلين ڊيٽابيس مان هڪ واحد ڍانچي سان ٽيبل کي هڪ ٽارگيٽ ۾ وجهڻ هڪ اڌ يا ٻه 13 ”اسڪرين ۾ پٿون ڪوڊ جو معاملو بڻجي ويو.
ڪلستر گڏ ڪرڻ
اچو ته مڪمل طور تي کنڊر گارٽن جو بندوبست نه ڪريون، ۽ هتي مڪمل طور تي واضح شين جي باري ۾ نه ڳالهايون، جهڙوڪ ايئر فلو کي نصب ڪرڻ، توهان جي چونڊيل ڊيٽابيس، سيلري ۽ ڊاکن ۾ بيان ڪيل ٻيا ڪيس.
انهي ڪري ته اسان فوري طور تي تجربا شروع ڪري سگهون ٿا، مون اسڪيچ ڪيو docker-compose.yml ڪھڙي:
اچو ته اصل ۾ بلند ڪريون Airflow: شيڊيولر، ويب سرور. سيلري جي ڪمن کي مانيٽر ڪرڻ لاءِ گل به اتي گھمائي رهيا آهن (ڇاڪاڻ ته اهو اڳ ۾ ئي ڌڪيو ويو آهي. apache/airflow:1.10.10-python3.7پر اسان کي ڪو اعتراض ناهي)
PostgreSQL، جنهن ۾ ايئر فلو پنهنجي سروس جي معلومات لکندو (شيڊيولر ڊيٽا، عمل جي انگ اکر، وغيره)، ۽ سيلري مڪمل ٿيل ڪمن کي نشانو بڻائيندو؛
Redis, جيڪو Celery لاءِ ٽاسڪ بروکر طور ڪم ڪندو.
سيلري ڪم ڪندڙ، جيڪو ڪمن جي سڌي عمل ۾ مصروف هوندو.
فولڊر ڏانهن ./dags اسان پنهنجي فائلن کي ڊيگ جي وضاحت سان شامل ڪنداسين. انهن کي اڏامڻ تي کنيو ويندو، تنهنڪري هر ڇڪڻ کان پوءِ پوري اسٽيڪ کي جهلڻ جي ضرورت ناهي.
ڪجهه هنڌن تي، مثالن ۾ ڪوڊ مڪمل طور تي نه ڏيکاريو ويو آهي (جيئن متن کي بي ترتيب نه هجي)، پر ڪجهه هنڌن تي اهو عمل ۾ تبديل ڪيو ويو آهي. مڪمل ڪم ڪندڙ ڪوڊ جا مثال ڳولهي سگهجن ٿا مخزن ۾ https://github.com/dm-logv/airflow-tutorial.
ساخت جي اسيمبليء ۾، مون گهڻو ڪري معروف تصوير تي ڀروسو ڪيو puckel/docker- airflow - ان کي چيڪ ڪرڻ جي پڪ ڪريو. ٿي سگهي ٿو توهان کي پنهنجي زندگيء ۾ ڪنهن ٻئي جي ضرورت نه آهي.
سڀ ايئر فلو سيٽنگون موجود آهن نه صرف ذريعي airflow.cfg، پر پڻ ماحولياتي متغيرن جي ذريعي (ڊولپرز جي مهرباني)، جنهن جو مون بدسلوڪي سان فائدو ورتو.
قدرتي طور تي، اهو پيداوار لاء تيار ناهي: مون ڄاڻي واڻي دل جي ڌڙڪن ڪنٽينرز تي نه رکي، مون سيڪيورٽي سان پريشان نه ڪيو. پر مون اسان جي تجربيڪارن لاءِ گھٽ ۾ گھٽ مناسب ڪيو.
نوٽ ڪريو ته:
ڊيگ فولڊر لازمي طور تي شيڊولر ۽ ڪارڪنن ٻنهي تائين رسائي لائق هوندو.
ساڳيو ئي سڀني ٽئين پارٽي جي لائبريرين تي لاڳو ٿئي ٿو - انهن سڀني کي هڪ شيڊولر ۽ ڪارڪنن سان مشين تي نصب ڪيو وڃي.
خير، هاڻي اهو آسان آهي:
$ docker-compose up --scale worker=3
هر شي جي اڀرڻ کان پوء، توهان ويب انٽرنيٽ تي ڏسي سگهو ٿا:
عام طور تي، پراڻن نسخن ۾، هن کي يادگيري سان مسئلا هئا (نه، ايمنسيا نه، پر ليک) ۽ ميراثي پيٽرولر اڃا تائين ترتيبن ۾ رهي ٿو. run_duration - ان جي ٻيهر شروع ڪرڻ جو وقفو. پر هاڻي سڀ ڪجهه ٺيڪ آهي.
ڊيگ (aka "dag") - "هدايت ڪيل ايڪيڪلڪ گراف"، پر اهڙي تعريف ڪجهه ماڻهن کي ٻڌائيندو، پر حقيقت ۾ اهو هڪ ڪنٽينر آهي ڪمن لاء هڪ ٻئي سان رابطي ۾ (هيٺ ڏسو) يا SSIS ۾ پيڪيج جو هڪ اينالاگ ۽ انفارميٽيڪا ۾ ورڪ فلو .
ڊيگس کان علاوه، اڃا به ذيلي ڊيگ ٿي سگھي ٿو، پر اسان گهڻو ڪري انهن کي حاصل نه ڪنداسين.
DAG رن - شروعاتي ڊيگ، جنهن کي پنهنجو مقرر ڪيو ويو آهي execution_date. ساڳي ڊيگ جي ڊيگرن متوازي ۾ ڪم ڪري سگهن ٿيون (جيڪڏهن توهان پنهنجي ڪمن کي غير معمولي بنايو آهي، يقينا).
آپريٽر ڪوڊ جا ٽڪرا آھن جيڪي ھڪڙي مخصوص عمل کي انجام ڏيڻ لاء ذميوار آھن. ٽي قسم جا آپريٽرز آهن:
عملاسان جي پسنديده وانگر PythonOperator، جيڪو ڪنهن به (درست) پٿون ڪوڊ تي عمل ڪري سگهي ٿو؛
sensor ٻئي طرف، اهو توهان کي رد عمل ڪرڻ يا ڊيگ جي وڌيڪ عمل کي سست ڪرڻ جي اجازت ڏيندو جيستائين ڪو واقعو ٿئي. HttpSensor مخصوص آخري پوائنٽ کي ڇڪي سگھي ٿو، ۽ جڏھن گهربل جواب انتظار ڪري رھيو آھي، منتقلي شروع ڪريو GoogleCloudStorageToS3Operator. هڪ جستجو وارو ذهن پڇندو: ”ڇو؟ آخرڪار، توهان درست آپريٽر ۾ ورجائي سگهو ٿا! ۽ پوء، معطل ٿيل آپريٽرز سان ڪمن جي تلاء کي بند نه ڪرڻ لاء. سينسر شروع ٿئي ٿو، چيڪ ڪري ٿو ۽ ايندڙ ڪوشش کان اڳ مري ٿو.
ڪم - اعلان ڪيل آپريٽرز، قطع نظر قسم جي، ۽ ڊيگ سان ڳنڍيل آهن ڪم جي درجي تي ترقي يافته آهن.
ڪم جو مثال - جڏهن عام منصوبه بندي جو فيصلو ڪيو ويو ته اهو وقت آهي ڪمن کي جنگ ۾ ڪم ڪندڙ ڪارڪنن تي موڪلڻ جو (جڳه تي صحيح، جيڪڏهن اسان استعمال ڪندا آهيون LocalExecutor يا جي صورت ۾ ريموٽ نوڊ ڏانهن CeleryExecutor)، اهو انهن لاءِ هڪ حوالو تفويض ڪري ٿو (يعني متغيرن جو هڪ سيٽ - execution parameters)، وڌائي ٿو ڪمانڊ يا سوال ٽيمپليٽس، ۽ انهن کي پول ڪري ٿو.
اسان ڪم ٺاهيندا آهيون
پهرين، اچو ته اسان جي ڊوگ جي عام اسڪيم کي بيان ڪريون، ۽ پوء اسان تفصيل ۾ وڌيڪ تفصيل ڏينداسين، ڇاڪاڻ ته اسان ڪجهه غير معمولي حل لاڳو ڪندا آهيون.
تنهن ڪري، ان جي آسان ترين شڪل ۾، اهڙي ڊگ هن طرح نظر ايندي:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
اچو ته ان کي سمجهون:
پهريون، اسان درآمد ڪريون ٿا ضروري ليب ۽ ڪجھ ٻيو;
sql_server_ds - هي آهي List[namedtuple[str, str]] ايئر فلو ڪنيڪشن مان ڪنيڪشن جي نالن سان ۽ ڊيٽابيسس جن مان اسان اسان جي پليٽ کڻنداسين؛
dag - اسان جي ڊيگ جو اعلان، جيڪو لازمي طور تي هجڻ گهرجي globals()ٻي صورت ۾ ايئر فلو ان کي ڳولي نه سگهندو. Doug پڻ چوڻ جي ضرورت آهي:
workflow() مکيه ڪم ڪندو، پر هاڻي نه. في الحال، اسان صرف لاگ ۾ اسان جي حوالي سان ڊمپ ڪنداسين.
۽ هاڻي ڪم ٺاهڻ جو سادو جادو:
اسان پنهنجي ذريعن ذريعي هلون ٿا؛
شروعات ڪرڻ PythonOperator، جيڪو اسان جي ڊمي تي عمل ڪندو workflow(). ڪم جي هڪ منفرد (ڊيگ جي اندر) نالو بيان ڪرڻ نه وساريو ۽ ڊيگ پاڻ کي ڳنڍيو. جھنڊو provide_context موڙ ۾، اضافي دليلن کي فنڪشن ۾ داخل ڪندو، جنهن کي اسين احتياط سان گڏ ڪنداسين **context.
في الحال، اهو سڀ ڪجهه آهي. اسان کي ڇا مليو:
ويب انٽرفيس ۾ نئون ڊيگ،
هڪ ۽ اڌ سو ڪم جيڪي متوازي طور تي عمل ڪيا ويندا (جيڪڏهن ايئر فلو، سيلري سيٽنگون ۽ سرور جي صلاحيت ان کي اجازت ڏين).
خير، لڳ ڀڳ حاصل ڪيو.
انحصار ڪير انسٽال ڪندو؟
هن سڄي شيء کي آسان ڪرڻ لاء، مون کي خراب ڪيو docker-compose.yml پروسيسنگ requirements.txt سڀني نوڊس تي.
هاڻي اهو ٿي ويو آهي:
گرين چوڪون ڪم جا مثال آھن جيڪي شيڊولر پاران پروسيس ٿيل آھن.
سائو، يقينا، ڪاميابيء سان پنهنجو ڪم مڪمل ڪيو آهي. ڳاڙهو تمام ڪامياب نه آهن.
رستي جي ذريعي، اسان جي پيداوار تي ڪو فولڊر ناهي ./dags، مشينن جي وچ ۾ ڪوبه هم وقت سازي نه آهي - سڀ ڊيگ اندر آهن git اسان جي Gitlab تي، ۽ Gitlab CI مشينن ۾ تازه ڪاريون ورهائي ٿو جڏهن ضم ٿي وڃي master.
گلن بابت ٿورڙو
جڏهن ته مزدور اسان جي پيسيفائر کي ڌڪ هڻي رهيا آهن، اچو ته هڪ ٻيو اوزار ياد رکون جيڪو اسان کي ڪجهه ڏيکاري سگهي ٿو - فلاور.
ڪم ڪندڙ نوڊس تي خلاصو معلومات سان گڏ پهريون صفحو:
ڪمن سان گڏ سڀ کان وڌيڪ شديد صفحو جيڪي ڪم تي ويا:
اسان جي بروکر جي حيثيت سان سڀ کان وڌيڪ بورنگ صفحو:
چمڪندڙ صفحو ٽاسڪ اسٽيٽس گرافس ۽ انهن جي عمل جي وقت سان آهي:
اسان انڊر لوڊ لوڊ ڪريون ٿا
تنهن ڪري، سڀني ڪمن کي ڪم ڪيو آهي، توهان زخمي کي کڻي سگهو ٿا.
۽ اتي ڪيترائي زخمي هئا - هڪ سبب يا ٻئي لاء. ايئر فلو جي صحيح استعمال جي صورت ۾، اهي تمام چورس ظاهر ڪن ٿا ته ڊيٽا ضرور نه پهتي.
توهان کي لاگ ڏسڻ جي ضرورت آهي ۽ گر ٿيل ڪم جي مثالن کي ٻيهر شروع ڪرڻ جي ضرورت آهي.
ڪنهن به اسڪوائر تي ڪلڪ ڪندي، اسان کي اسان وٽ موجود ڪارناما ڏسندا:
توھان وٺي سگھوٿا ۽ ڪري سگھوٿا گرھ کي صاف ڪريو. اھو آھي، اسان وساريو ٿا ته ڪجھھ ناڪام ٿي چڪو آھي، ۽ ساڳئي مثال جو ڪم شيڊولر ڏانھن ويندو.
اهو واضح آهي ته اهو سڀ ڳاڙهي چوڪن سان مائوس سان ڪرڻ بلڪل انساني ناهي - اهو اهو ناهي جيڪو اسان ايئر فلو کان توقع ڪريون ٿا. قدرتي طور تي، اسان وٽ وڏي تباهي جا هٿيار آهن: Browse/Task Instances
اچو ته هڪ ڀيرو هر شي کي چونڊيو ۽ صفر تي ري سيٽ ڪريو، صحيح شيون تي ڪلڪ ڪريو:
صفائي کان پوء، اسان جون ٽيڪسيون هن طرح نظر اچن ٿيون (اهي اڳ ۾ ئي شيڊولر جي انتظار ۾ آهن انهن کي شيڊول ڪرڻ لاء):
ڪنيڪشن، ٿلهو ۽ ٻيا متغير
اهو ايندڙ ڊي اي جي کي ڏسڻ جو وقت آهي، update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
ڇا هرڪو ڪڏهن هڪ رپورٽ اپڊيٽ ڪيو آهي؟ هي ٻيهر آهي: اتي ذريعن جي هڪ فهرست آهي جتي ڊيٽا حاصل ڪرڻ لاء؛ اتي هڪ فهرست آهي جتي رکڻ لاء؛ هان ڪرڻ نه وساريو جڏهن سڀ ڪجهه ٿيو يا ڀڄي ويو (چڱو، اهو اسان جي باري ۾ ناهي، نه).
اچو ته ٻيهر فائل ذريعي وڃو ۽ نئين غير واضح شين کي ڏسو:
from commons.operators import TelegramBotSendMessage - ڪجھ به نه روڪيو اسان کي پنهنجا آپريٽرز ٺاهڻ کان، جنهن جو اسان فائدو ورتو هڪ ننڍڙو ريپر ٺاهي پيغام موڪلڻ لاءِ Unblocked تي. (اسان هيٺ هن آپريٽر بابت وڌيڪ ڳالهائينداسين)؛
to='{{ var.value.all_the_kings_men }}' - ميدان to اسان وٽ هارڊ ڪوڊ نه هوندو، پر متحرڪ طور تي جنجا استعمال ڪندي ۽ اي ميلن جي فهرست سان هڪ متغير، جنهن کي مون احتياط سان داخل ڪيو Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - آپريٽر شروع ڪرڻ لاء شرط. اسان جي صورت ۾، خط صرف مالڪن ڏانهن پرواز ڪندو جيڪڏهن سڀ انحصار ڪم ڪيو آهي ڪاميابيءَ سان;
tg_bot_conn_id='tg_main' - دليل conn_id قبول ڪنيڪشن IDs جيڪي اسان ٺاهيندا آهيون Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - ٽيليگرام ۾ پيغام صرف ان صورت ۾ ڀڄي ويندا جڏهن اتي ڪم ڪري رهيا آهن؛
task_concurrency=1 - اسان ھڪڙي ڪم جي ڪيترن ئي ٽاسڪ مثالن جي ھڪڙي وقت لانچ کي منع ڪريون ٿا. ٻي صورت ۾، اسان ڪيترن ئي جي هڪ ئي وقت لانچ حاصل ڪنداسين VerticaOperator (هڪ ٽيبل تي ڏسي رهيو آهي)؛
صرف مطلوب چاٻي جو رستو استعمال ڪريو: {{ var.json.bot_config.bot.token }}.
مان لفظي طور تي ھڪڙو لفظ چوندس ۽ ھڪڙي اسڪرين شاٽ ڏيکاريندس رابطا. هتي سڀ ڪجهه ابتدائي آهي: صفحي تي Admin/Connections اسان هڪ ڪنيڪشن ٺاهي، اسان جا لاگ ان / پاسورڊ ۽ وڌيڪ مخصوص پيٽرولر شامل ڪريو. هن وانگر:
پاسورڊ انڪريپٽ ٿي سگھي ٿو (ڊفالٽ کان وڌيڪ چڱي طرح)، يا توھان ڪنيڪشن جي قسم کي ڇڏي سگھو ٿا (جيئن مون ڪيو tg_main) - حقيقت اها آهي ته قسمن جي لسٽ ايئر فلو ماڊلز ۾ سخت آهي ۽ سورس ڪوڊز ۾ اچڻ کان سواءِ وڌائي نه ٿي سگهجي (جيڪڏهن اوچتو مون ڪجهه گوگل نه ڪيو، مهرباني ڪري مون کي درست ڪريو)، پر ڪجھ به اسان کي صرف ڪريڊٽ حاصل ڪرڻ کان روڪي نه سگهندو. نالو.
توھان پڻ ڪري سگھو ٿا ڪيترائي ڪنيڪشن ساڳئي نالي سان: ھن صورت ۾، طريقو BaseHook.get_connection()، جيڪو اسان کي نالي سان ڪنيڪشن حاصل ڪري ٿو، ڏيندو بي ترتيب ڪيترن ئي نالن مان (گول رابن ٺاهڻ وڌيڪ منطقي هوندو، پر اچو ته ان کي ايئر فلو ڊولپرز جي ضمير تي ڇڏي ڏيو).
متغير ۽ ڪنيڪشن يقيناً سٺا اوزار آهن، پر اهو ضروري آهي ته توازن نه وڃايو: توهان جي وهڪري جا ڪهڙا حصا توهان ڪوڊ ۾ محفوظ ڪندا آهيو، ۽ ڪهڙا حصا توهان اسٽوريج لاءِ ايئر فلو کي ڏيو ٿا. هڪ طرف، اهو آسان ٿي سگهي ٿو جلدي قدر کي تبديل ڪرڻ لاء، مثال طور، هڪ ميلنگ باڪس، UI ذريعي. ٻئي طرف، اهو اڃا تائين مائوس ڪلڪ ڏانهن واپسي آهي، جنهن مان اسان (مان) نڪرڻ چاهيو ٿا.
ڪنيڪشن سان ڪم ڪرڻ هڪ ڪم آهي ٿلهو. عام طور تي، ايئر فلو ٿلهو ان کي ٽئين پارٽي جي خدمتن ۽ لائبريرين سان ڳنڍڻ لاء پوائنٽون آهن. مثال، JiraHook جيرا سان رابطو ڪرڻ لاءِ اسان لاءِ هڪ گراهڪ کوليندو (توهان ڪم کي اڳتي ۽ پوئتي منتقل ڪري سگهو ٿا) ۽ مدد سان SambaHook توھان مقامي فائل کي دٻائي سگھو ٿا smb- پوائنٽ.
ڪسٽم آپريٽر کي پارس ڪرڻ
۽ اسان کي ڏسڻ جي ويجهو اچي ويو ته اهو ڪيئن ٺاهيو ويو آهي TelegramBotSendMessage
ڪوڊ commons/operators.py حقيقي آپريٽر سان:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
هتي، ايئر فلو ۾ هر شيء وانگر، هر شيء بلڪل سادو آهي:
کان ورثي ۾ مليل آهي BaseOperator، جيڪو ڪافي ڪجھ ايئر فلو-مخصوص شين کي لاڳو ڪري ٿو (توهان جي فرصت کي ڏسو)
لاء صحيح دليلن جو بندوبست ڪيو __init__()، جتي ضروري هجي ته ڊفالٽ مقرر ڪريو.
اسان ابن ڏاڏن جي شروعات جي باري ۾ نه وساريو.
لاڳاپيل ٿلهو کوليو TelegramBotHookان کان هڪ ڪلائنٽ اعتراض حاصل ڪيو.
ختم ٿيل (ٻيهر بيان ڪيل) طريقو BaseOperator.execute()، جيڪو Airfow ٽائيچ ڪندو جڏهن آپريٽر کي لانچ ڪرڻ جو وقت اچي ٿو - ان ۾ اسان لاگ ان ٿيڻ کي وساريندي مکيه عمل کي لاڳو ڪنداسين. (اسان لاگ ان، رستي ۾، صحيح اندر stdout и stderr - ايئر فلو هر شيء کي روڪيندو، ان کي خوبصورت طور تي لپي، ان کي ختم ڪري ڇڏيندو جتي ضروري هجي.)
اچو ته ڏسون ته اسان وٽ ڇا آهي commons/hooks.py. فائل جو پهريون حصو، پاڻ کي ٿلهو سان:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
مون کي اها به خبر ناهي ته هتي ڇا بيان ڪجي، مان صرف اهم نقطا نوٽ ڪندس:
اسان بعد ۾ ڊيٽابيس کي ڪجهه ڊيٽا سان ڀريندا آهيون (ڪنهن به صورت ۾ نه ڏسو mssql_init.py!)
اسان آخري وقت کان ٿورو وڌيڪ پيچيده حڪم جي مدد سان تمام سٺو شروع ڪيو:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
ڇا اسان جو معجزو randomizer پيدا ڪيو، توهان شيون استعمال ڪري سگهو ٿا Data Profiling/Ad Hoc Query:
اصلي شيء ان کي تجزيه نگارن کي ڏيکارڻ نه آهي
تفصيل سان بيان ڪرڻ اي ٽي ايل سيشن مان نه ڪندس، اتي هر شيء معمولي آهي: اسان هڪ بنياد ٺاهيندا آهيون، ان ۾ هڪ نشاني آهي، اسان هر شيء کي هڪ حوالي سان مينيجر سان لپيندا آهيون، ۽ هاڻي اسان هي ڪريون ٿا:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
وقت اچي ويو آهي اسان جي ڊيٽا گڏ ڪريو اسان جي ڏيڍ سئو ٽيبلن مان. اچو ته هن کي تمام بي مثال لائينن جي مدد سان ڪريون:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
ٿلهي جي مدد سان اسان ايئر فلو مان حاصل ڪندا آهيون pymssql- ڳنڍڻ
اچو ته ھڪڙي پابندي کي ھڪڙي تاريخ جي صورت ۾ درخواست ۾ تبديل ڪريون - ان کي ٽيمپليٽ انجڻ جي فنڪشن ۾ اڇلايو ويندو.
اسان جي درخواست کي کارائڻ pandasجيڪو اسان کي حاصل ڪندو DataFrame - اهو مستقبل ۾ اسان لاء مفيد ٿيندو.
مان متبادل استعمال ڪري رهيو آهيان {dt} درخواست جي پيٽرولر جي بدران %s ان ڪري نه ته مان بڇڙو پنوچيو آهيان، پر ان ڪري جو pandas سنڀالي نه ٿو سگهي pymssql ۽ آخري ڦٽي ٿو params: Listجيتوڻيڪ هو واقعي چاهي ٿو tuple.
اهو پڻ نوٽ ڪريو ته ڊولپر pymssql فيصلو ڪيو ته هن کي وڌيڪ سپورٽ نه ڏيو، ۽ اهو وقت نڪرڻ جو وقت آهي pyodbc.
اچو ته ڏسون ڇا ايئر فلو اسان جي ڪمن جي دليلن سان ڀريل آهي:
جيڪڏهن ڪو ڊيٽا نه آهي، پوء جاري رکڻ ۾ ڪو به نقطو ناهي. پر اهو پڻ عجيب آهي ته فلنگ کي ڪامياب سمجهيو وڃي. پر هي ڪا غلطي ناهي. آهه، ڇا ڪجي؟! ۽ هتي ڇا آهي:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException ايئر فلو کي ٻڌائي ٿو ته ڪا به غلطي ناهي، پر اسان ڪم کي ڇڏي ڏيو. انٽرفيس ۾ سائي يا ڳاڙهي چورس نه هوندي، پر گلابي.
وڪري تي، اسان ھدف پليٽ ٺاهي دستي طور تي. هتي مون پاڻ کي هڪ ننڍڙي مشين جي اجازت ڏني:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
آئون استعمال ڪريان پيو VerticaOperator() مان هڪ ڊيٽابيس اسڪيما ۽ ٽيبل ٺاهيو (جيڪڏهن اهي اڳ ۾ ئي موجود نه آهن، يقينا). بنيادي شيء صحيح طور تي انحصار کي ترتيب ڏيڻ آهي:
مان سمجهان ٿو ته جيڪڏهن منهنجي ساٿين ۽ مون ۾ مقابلو هجي ها: ڪير جلدي ٺاهي ۽ شروع ڪري اي ٽي ايل پروسيس شروع ڪندو: اهي پنهنجي SSIS ۽ هڪ مائوس سان ۽ مون سان ايئر فلو سان ... ۽ پوءِ اسان سار سنڀال جي آسانيءَ جو به مقابلو ڪنداسين ... واه، مان سمجهان ٿو ته توهان متفق آهيو ته آئون انهن کي سڀني محاذن تي شڪست ڏيندس!
جيڪڏهن ٿورو وڌيڪ سنجيدگي سان، پوء Apache Airflow - پروگرام جي ڪوڊ جي صورت ۾ عمل بيان ڪندي - منهنجو ڪم ڪيو گهڻو وڌيڪ آرامده ۽ خوشگوار.
ان جي لامحدود توسيع، ٻنهي پلگ ان جي لحاظ کان ۽ اسڪاليبلٽي جي اڳڪٿي جي لحاظ کان، توهان کي تقريبن ڪنهن به علائقي ۾ ايئر فلو استعمال ڪرڻ جو موقعو ڏئي ٿو: ڊيٽا گڏ ڪرڻ، تيار ڪرڻ ۽ پروسيسنگ جي مڪمل چڪر ۾، ايستائين جو راڪيٽ لانچ ڪرڻ ۾ (مارس تائين، ڪورس).
حصو فائنل، حوالو ۽ ڄاڻ
ريڪ اسان توهان لاء گڏ ڪيو آهي
start_date. ها، هي اڳ ۾ ئي هڪ مقامي meme آهي. Via Doug جي مکيه دليل start_date سڀ پاس. مختصر طور، جيڪڏھن توھان بيان ڪريو start_date موجوده تاريخ، ۽ schedule_interval - هڪ ڏينهن، پوءِ DAG سڀاڻي شروع ٿيندي نه اڳ.
start_date = datetime(2020, 7, 7, 0, 1, 2)
۽ وڌيڪ ڪو مسئلو ناهي.
ان سان لاڳاپيل هڪ ٻي رن ٽائيم غلطي آهي: Task is missing the start_date parameter، جيڪو گهڻو ڪري ظاهر ڪري ٿو ته توهان ڊگ آپريٽر کي پابند ڪرڻ وساري ڇڏيو.
سڀ هڪ مشين تي. ها، ۽ بيس (ايئر فلو پاڻ ۽ اسان جي ڪوٽنگ)، ۽ هڪ ويب سرور، ۽ هڪ شيڊولر، ۽ مزدور. ۽ اهو پڻ ڪم ڪيو. پر وقت گذرڻ سان گڏ، خدمتن لاءِ ڪمن جو تعداد وڌندو ويو، ۽ جڏھن PostgreSQL انڊيڪس کي 20 ms بدران 5 s ۾ جواب ڏيڻ شروع ڪيو، اسان ان کي ورتو ۽ ان کي کڻي ويا.
LocalExecutor. ها، اسان اڃا تائين ان تي ويٺا آهيون، ۽ اسان اڳ ۾ ئي اونهاري جي ڪناري تي اچي چڪا آهيون. LocalExecutor هن وقت تائين اسان لاءِ ڪافي آهي، پر هاڻي وقت اچي ويو آهي ته گهٽ ۾ گهٽ هڪ ڪم ڪندڙ کي وڌايو وڃي، ۽ اسان کي CeleryExecutor ڏانهن وڃڻ لاءِ سخت محنت ڪرڻي پوندي. ۽ انهي حقيقت کي نظر ۾ رکندي ته توهان ان سان گڏ هڪ مشين تي ڪم ڪري سگهو ٿا، ڪجھ به توهان کي سيلري استعمال ڪرڻ کان روڪي نٿو سگهي جيتوڻيڪ سرور تي، جيڪو "يقينا، ڪڏهن به پيداوار ۾ نه ويندو، ايمانداري سان!"
غير استعمال تعمير ٿيل اوزار:
رابطا خدمت جي سند کي ذخيرو ڪرڻ لاء،
SLA مس انهن ڪمن جو جواب ڏيڻ جيڪي وقت تي ڪم نه ڪيا آهن،
xcom ميٽا ڊيٽا مٽائڻ لاءِ (مون چيو ميٽاڊيٽا!) ڊيگ ڪمن جي وچ ۾.
ميل غلط استعمال. خير، مان ڇا ٿو چئي سگهان؟ گراهڪ ڪمن جي سڀني ورهاڱي لاء الرٽ قائم ڪيا ويا. ھاڻي منھنجو ڪم Gmail ۾ ايئر فلو کان 90k اي ميلون آھن، ۽ ويب ميل مزل ھڪڙي وقت ۾ 100 کان وڌيڪ کڻڻ ۽ حذف ڪرڻ کان انڪار ڪري ٿو.
اسان جي لاءِ اسان جي مٿي سان وڌيڪ ڪم ڪرڻ لاءِ ۽ نه اسان جي هٿن سان ، ايئر فلو اسان لاءِ تيار ڪيو آهي:
REST API - هن کي اڃا تائين تجرباتي حيثيت حاصل آهي، جيڪا هن کي ڪم ڪرڻ کان روڪي نه ٿي. ان سان، توهان نه صرف ڊيگ ۽ ڪمن بابت ڄاڻ حاصل ڪري سگهو ٿا، پر ڊگ کي روڪي/شروع ڪري سگهو ٿا، ڊيگ رن يا پول ٺاهي سگهو ٿا.
CLI - ڪيترائي اوزار ڪمانڊ لائن ذريعي دستياب آھن جيڪي نه رڳو WebUI ذريعي استعمال ڪرڻ ۾ تڪليف آھن، پر عام طور تي غير حاضر آھن. مثال طور:
backfill ڪم جي مثالن کي ٻيهر شروع ڪرڻ جي ضرورت آهي.
مثال طور، تجزيه نگار آيا ۽ چيائون: ”۽ ڪامريڊ، توهان 1 جنوري کان 13 جنوري تائين ڊيٽا ۾ بيوقوف آهيو! ان کي درست ڪريو، ان کي درست ڪريو، ان کي درست ڪريو، ان کي درست ڪريو!" ۽ تون اهڙو شوق آهين:
run، جيڪو توهان کي هڪ مثالي ڪم کي هلائڻ جي اجازت ڏئي ٿو، ۽ حتي سڀني انحصار تي سکور. ان کان سواء، توهان ان جي ذريعي هلائي سگهو ٿا LocalExecutor، جيتوڻيڪ توهان وٽ هڪ Celery ڪلستر آهي.
گهڻو ڪري ساڳيو ڪم ڪندو آهي test، صرف بنيادن ۾ ڪجهه به نه لکندو آهي.
connections شيل مان ڪنيڪشن جي وڏي پيماني تي اجازت ڏئي ٿي.
پٿن اي پي آئي - رابطي جو ھڪڙو سخت طريقو آھي، جيڪو پلگ ان لاء آھي، ۽ ان ۾ ننڍڙن ھٿن سان گڏ نه. پر اسان کي وڃڻ کان ڪير روڪي /home/airflow/dags، ڊوڙڻ ipython ۽ چوڌاري ڦرڻ شروع ڪيو؟ توھان ڪري سگھو ٿا، مثال طور، ھيٺ ڏنل ڪوڊ سان سڀئي ڪنيڪشن برآمد ڪريو:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
ايئر فلو ميٽا ڊيٽابيس سان ڳنڍڻ. مان ان تي لکڻ جي سفارش نه ٿو ڪريان، پر مختلف مخصوص ميٽرڪس لاءِ ٽاسڪ اسٽيٽس حاصل ڪرڻ ڪنهن به APIs استعمال ڪرڻ کان وڌيڪ تيز ۽ آسان ٿي سگهي ٿو.
اچو ته اهو چئون ته اسان جا سڀئي ڪم ڪمزور نه آهن، پر اهي ڪڏهن ڪڏهن گر ٿي سگهن ٿا، ۽ اهو عام آهي. پر ڪجھه رڪاوٽون اڳ ۾ ئي مشڪوڪ آھن، ۽ ان جي چڪاس ڪرڻ ضروري آھي.
خبردار SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
حوالن
۽ يقينا، گوگل جي جاري ٿيڻ کان پهريان ڏهه لنڪس منهنجي بک مارڪ مان ايئر فلو فولڊر جو مواد آهن.
Apache Airflow دستاويزن - يقينا، اسان کي آفيس سان شروع ڪرڻ گهرجي. دستاويز، پر هدايتون ڪير پڙهي؟