హలో, హబ్ర్! ఈ వ్యాసంలో నేను బ్యాచ్ డేటా ప్రాసెసింగ్ ప్రక్రియలను అభివృద్ధి చేయడానికి ఒక గొప్ప సాధనం గురించి మాట్లాడాలనుకుంటున్నాను, ఉదాహరణకు, కార్పొరేట్ DWH లేదా మీ డేటాలేక్ యొక్క అవస్థాపనలో. మేము అపాచీ ఎయిర్ఫ్లో (ఇకపై ఎయిర్ఫ్లోగా సూచిస్తారు) గురించి మాట్లాడుతాము. ఇది అన్యాయంగా హబ్రేపై దృష్టిని కోల్పోయింది మరియు మీ ETL/ELT ప్రాసెస్ల కోసం షెడ్యూలర్ను ఎన్నుకునేటప్పుడు కనీసం ఎయిర్ఫ్లో చూడటం విలువైనదని నేను మిమ్మల్ని ఒప్పించేందుకు ప్రయత్నిస్తాను.
గతంలో, నేను Tinkoff బ్యాంక్లో పనిచేసినప్పుడు DWH అనే అంశంపై వరుస కథనాలు రాశాను. ఇప్పుడు నేను Mail.Ru గ్రూప్ బృందంలో భాగమయ్యాను మరియు గేమింగ్ ప్రాంతంలో డేటా విశ్లేషణ కోసం ప్లాట్ఫారమ్ను అభివృద్ధి చేస్తున్నాను. వాస్తవానికి, వార్తలు మరియు ఆసక్తికరమైన పరిష్కారాలు కనిపించినప్పుడు, డేటా అనలిటిక్స్ కోసం మా ప్లాట్ఫారమ్ గురించి నేను మరియు నా బృందం ఇక్కడ మాట్లాడుతాము.
నాంది
కాబట్టి, ప్రారంభిద్దాం. గాలి ప్రవాహం అంటే ఏమిటి? ఇది లైబ్రరీ (లేదా
ఇప్పుడు ఎయిర్ఫ్లో యొక్క ప్రధాన అంశాలను చూద్దాం. వాటి సారాంశం మరియు ఉద్దేశ్యాన్ని అర్థం చేసుకోవడం ద్వారా, మీరు మీ ప్రాసెస్ ఆర్కిటెక్చర్ను ఉత్తమంగా నిర్వహించవచ్చు. బహుశా ప్రధాన అంశం డైరెక్టెడ్ ఎసిక్లిక్ గ్రాఫ్ (ఇకపై DAGగా సూచిస్తారు).
dag
DAG అనేది మీరు నిర్దిష్ట షెడ్యూల్ ప్రకారం ఖచ్చితంగా నిర్వచించబడిన క్రమంలో పూర్తి చేయాలనుకుంటున్న మీ పనుల యొక్క కొంత అర్ధవంతమైన అనుబంధం. ఎయిర్ఫ్లో DAGలు మరియు ఇతర ఎంటిటీలతో పని చేయడానికి అనుకూలమైన వెబ్ ఇంటర్ఫేస్ను అందిస్తుంది:
DAG ఇలా ఉండవచ్చు:
డెవలపర్, DAGని డిజైన్ చేస్తున్నప్పుడు, DAGలోని టాస్క్లు నిర్మించబడే ఆపరేటర్ల సమితిని నిర్దేశిస్తారు. ఇక్కడ మేము మరొక ముఖ్యమైన అంశానికి వచ్చాము: ఎయిర్ఫ్లో ఆపరేటర్.
నిర్వాహకులు
ఆపరేటర్ అనేది ఉద్యోగ సందర్భాలు సృష్టించబడిన దాని ఆధారంగా ఒక సంస్థ, ఇది ఉద్యోగ ఉదాహరణ అమలు సమయంలో ఏమి జరుగుతుందో వివరిస్తుంది.
- బాష్ ఆపరేటర్ - బాష్ ఆదేశాన్ని అమలు చేయడానికి ఆపరేటర్.
- పైథాన్ ఆపరేటర్ - పైథాన్ కోడ్కి కాల్ చేయడానికి ఆపరేటర్.
- EmailOperator — ఇమెయిల్ పంపడానికి ఆపరేటర్.
- HTTPoperator - http అభ్యర్థనలతో పని చేయడానికి ఆపరేటర్.
- SqlOperator - SQL కోడ్ని అమలు చేయడానికి ఆపరేటర్.
- సెన్సార్ అనేది ఈవెంట్ కోసం వేచి ఉండే ఆపరేటర్ (అవసరమైన సమయం రాక, అవసరమైన ఫైల్ యొక్క రూపాన్ని, డేటాబేస్లో ఒక లైన్, API నుండి ప్రతిస్పందన మొదలైనవి మొదలైనవి).
మరింత నిర్దిష్టమైన ఆపరేటర్లు ఉన్నారు: డాకర్ ఆపరేటర్, హైవ్ ఆపరేటర్, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
మీరు మీ స్వంత లక్షణాల ఆధారంగా ఆపరేటర్లను అభివృద్ధి చేయవచ్చు మరియు వాటిని మీ ప్రాజెక్ట్లో ఉపయోగించవచ్చు. ఉదాహరణకు, మేము MongoDBToHiveViaHdfsTransferని సృష్టించాము, ఇది MongoDB నుండి హైవ్కి డాక్యుమెంట్లను ఎగుమతి చేయడానికి ఆపరేటర్ని మరియు పని చేయడానికి అనేక మంది ఆపరేటర్లను సృష్టించింది.
తరువాత, ఈ పనుల యొక్క అన్ని సందర్భాలు అమలు చేయబడాలి మరియు ఇప్పుడు మేము షెడ్యూలర్ గురించి మాట్లాడుతాము.
షెడ్యూలర్
ఎయిర్ఫ్లో టాస్క్ షెడ్యూలర్ నిర్మించబడింది
ప్రతి పూల్కు స్లాట్ల సంఖ్యపై పరిమితి ఉంటుంది. DAGని సృష్టించేటప్పుడు, దానికి ఒక పూల్ ఇవ్వబడుతుంది:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG స్థాయిలో నిర్వచించబడిన పూల్ టాస్క్ స్థాయిలో భర్తీ చేయబడుతుంది.
ఒక ప్రత్యేక ప్రక్రియ, షెడ్యూలర్, ఎయిర్ఫ్లో అన్ని టాస్క్లను షెడ్యూల్ చేయడానికి బాధ్యత వహిస్తుంది. వాస్తవానికి, షెడ్యూలర్ ఎగ్జిక్యూషన్ కోసం టాస్క్లను సెట్ చేసే అన్ని మెకానిక్లతో వ్యవహరిస్తాడు. అమలు చేయడానికి ముందు పని అనేక దశల గుండా వెళుతుంది:
- DAGలో మునుపటి పనులు పూర్తయ్యాయి; కొత్తది క్యూలో ఉంచబడుతుంది.
- పనుల ప్రాధాన్యతపై ఆధారపడి క్యూ క్రమబద్ధీకరించబడుతుంది (ప్రాధాన్యతలను కూడా నియంత్రించవచ్చు), మరియు పూల్లో ఉచిత స్లాట్ ఉన్నట్లయితే, పనిని అమలులోకి తీసుకోవచ్చు.
- ఒక ఉచిత కార్మికుడు సెలెరీ ఉంటే, పని దానికి పంపబడుతుంది; మీరు సమస్యలో ప్రోగ్రామ్ చేసిన పని ఒకటి లేదా మరొక ఆపరేటర్ని ఉపయోగించి ప్రారంభమవుతుంది.
తగినంత సాధారణ.
షెడ్యూలర్ అన్ని DAGల సెట్లో మరియు DAGలలోని అన్ని టాస్క్లపై నడుస్తుంది.
షెడ్యూలర్ DAGతో పని చేయడం ప్రారంభించడానికి, DAG షెడ్యూల్ని సెట్ చేయాలి:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
రెడీమేడ్ ప్రీసెట్ల సెట్ ఉంది: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
మీరు క్రాన్ వ్యక్తీకరణలను కూడా ఉపయోగించవచ్చు:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
అమలు తేదీ
ఎయిర్ఫ్లో ఎలా పనిచేస్తుందో అర్థం చేసుకోవడానికి, DAG కోసం అమలు తేదీ ఏమిటో అర్థం చేసుకోవడం ముఖ్యం. ఎయిర్ఫ్లోలో, DAGకి ఎగ్జిక్యూషన్ డేట్ డైమెన్షన్ ఉంటుంది, అనగా, DAG పని షెడ్యూల్పై ఆధారపడి, ప్రతి ఎగ్జిక్యూషన్ తేదీకి టాస్క్ ఇన్స్టాన్స్లు సృష్టించబడతాయి. మరియు ప్రతి అమలు తేదీకి, టాస్క్లు మళ్లీ అమలు చేయబడతాయి - లేదా, ఉదాహరణకు, DAG అనేక అమలు తేదీలలో ఏకకాలంలో పని చేస్తుంది. ఇది ఇక్కడ స్పష్టంగా చూపబడింది:
దురదృష్టవశాత్తూ (లేదా అదృష్టవశాత్తూ: ఇది పరిస్థితిపై ఆధారపడి ఉంటుంది), DAGలో విధిని అమలు చేయడం సరిదిద్దబడితే, మునుపటి అమలు తేదీలో అమలు సర్దుబాట్లను పరిగణనలోకి తీసుకుంటుంది. మీరు కొత్త అల్గారిథమ్ని ఉపయోగించి గత కాలాల్లోని డేటాను తిరిగి లెక్కించాల్సిన అవసరం ఉంటే ఇది మంచిది, కానీ ఫలితం యొక్క పునరుత్పత్తి సామర్థ్యం కోల్పోయినందున ఇది చెడ్డది (వాస్తవానికి, Git నుండి సోర్స్ కోడ్ యొక్క అవసరమైన సంస్కరణను తిరిగి ఇవ్వడానికి మరియు ఏమి లెక్కించాలో ఎవరూ మిమ్మల్ని ఇబ్బంది పెట్టరు. మీకు ఒక సారి కావాలి, మీకు అవసరమైన విధంగా).
టాస్క్లను రూపొందించడం
DAG యొక్క అమలు అనేది పైథాన్లో కోడ్, కాబట్టి పని చేస్తున్నప్పుడు కోడ్ మొత్తాన్ని తగ్గించడానికి మాకు చాలా అనుకూలమైన మార్గం ఉంది, ఉదాహరణకు, షార్డ్ మూలాలతో. మీకు మూలంగా మూడు MySQL షార్డ్లు ఉన్నాయని అనుకుందాం, మీరు ప్రతి దానిలోకి ఎక్కి కొంత డేటాను తీయాలి. అంతేకాక, స్వతంత్రంగా మరియు సమాంతరంగా. DAGలోని పైథాన్ కోడ్ ఇలా ఉండవచ్చు:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG ఇలా కనిపిస్తుంది:
ఈ సందర్భంలో, మీరు సెట్టింగులను సర్దుబాటు చేయడం మరియు DAGని నవీకరించడం ద్వారా ఒక షార్డ్ను జోడించవచ్చు లేదా తీసివేయవచ్చు. సౌకర్యవంతమైన!
మీరు మరింత సంక్లిష్టమైన కోడ్ ఉత్పత్తిని కూడా ఉపయోగించవచ్చు, ఉదాహరణకు, డేటాబేస్ రూపంలో మూలాధారాలతో పని చేయండి లేదా టేబుల్ నిర్మాణాన్ని వివరించండి, టేబుల్తో పని చేయడానికి అల్గోరిథం మరియు DWH ఇన్ఫ్రాస్ట్రక్చర్ యొక్క లక్షణాలను పరిగణనలోకి తీసుకుని, ప్రక్రియను రూపొందించండి. మీ నిల్వలోకి N పట్టికలను లోడ్ చేయడం కోసం. లేదా, ఉదాహరణకు, జాబితా రూపంలో పారామీటర్తో పనిచేయడానికి మద్దతు ఇవ్వని APIతో పని చేయడం, మీరు ఈ జాబితా నుండి DAGలో N టాస్క్లను రూపొందించవచ్చు, APIలోని అభ్యర్థనల సమాంతరతను పూల్కి పరిమితం చేయవచ్చు మరియు స్క్రాప్ చేయవచ్చు API నుండి అవసరమైన డేటా. అనువైన!
రిపోజిటరీ
ఎయిర్ఫ్లో దాని స్వంత బ్యాకెండ్ రిపోజిటరీ, డేటాబేస్ (MySQL లేదా పోస్ట్గ్రెస్ కావచ్చు, మనకు పోస్ట్గ్రెస్ ఉంది), ఇది టాస్క్లు, DAGలు, కనెక్షన్ సెట్టింగ్లు, గ్లోబల్ వేరియబుల్స్ మొదలైన వాటి యొక్క స్థితిని నిల్వ చేస్తుంది. ఇక్కడ నేను చెప్పాలనుకుంటున్నాను ఎయిర్ఫ్లో రిపోజిటరీ చాలా సులభం (సుమారు 20 పట్టికలు) మరియు మీరు దాని పైన మీ స్వంత ప్రాసెస్లలో దేనినైనా నిర్మించాలనుకుంటే సౌకర్యవంతంగా ఉంటుంది. ఇన్ఫర్మాటికా రిపోజిటరీలోని 100500 పట్టికలు నాకు గుర్తున్నాయి, ఇది ప్రశ్నను ఎలా నిర్మించాలో అర్థం చేసుకోవడానికి చాలా కాలం పాటు అధ్యయనం చేయాల్సి వచ్చింది.
పర్యవేక్షణ
రిపోజిటరీ యొక్క సరళత కారణంగా, మీరు మీకు అనుకూలమైన పని పర్యవేక్షణ ప్రక్రియను రూపొందించవచ్చు. మేము జెప్పెలిన్లో నోట్ప్యాడ్ని ఉపయోగిస్తాము, ఇక్కడ మేము పనుల స్థితిని పరిశీలిస్తాము:
ఇది ఎయిర్ఫ్లో యొక్క వెబ్ ఇంటర్ఫేస్ కూడా కావచ్చు:
ఎయిర్ఫ్లో కోడ్ ఓపెన్ సోర్స్, కాబట్టి మేము టెలిగ్రామ్కు హెచ్చరికను జోడించాము. టాస్క్ యొక్క ప్రతి రన్నింగ్ ఇన్స్టాన్స్, లోపం సంభవించినట్లయితే, మొత్తం డెవలప్మెంట్ మరియు సపోర్ట్ టీమ్ ఉండే టెలిగ్రామ్లో గ్రూప్ను స్పామ్ చేస్తుంది.
మేము టెలిగ్రామ్ (అవసరమైతే) ద్వారా సత్వర ప్రతిస్పందనను అందుకుంటాము మరియు జెప్పెలిన్ ద్వారా మేము ఎయిర్ఫ్లో టాస్క్ల యొక్క మొత్తం చిత్రాన్ని అందుకుంటాము.
మొత్తం
గాలి ప్రవాహం ప్రధానంగా ఓపెన్ సోర్స్, మరియు మీరు దాని నుండి అద్భుతాలను ఆశించకూడదు. పని చేసే పరిష్కారాన్ని రూపొందించడానికి సమయం మరియు కృషిని వెచ్చించడానికి సిద్ధంగా ఉండండి. లక్ష్యం సాధించదగినది, నన్ను నమ్మండి, అది విలువైనది. అభివృద్ధి వేగం, వశ్యత, కొత్త ప్రక్రియలను జోడించే సౌలభ్యం - మీరు దీన్ని ఇష్టపడతారు. వాస్తవానికి, మీరు ప్రాజెక్ట్ యొక్క సంస్థ, ఎయిర్ఫ్లో యొక్క స్థిరత్వంపై చాలా శ్రద్ధ వహించాలి: అద్భుతాలు జరగవు.
ఇప్పుడు మేము రోజువారీ గాలి ప్రవాహాన్ని కలిగి ఉన్నాము సుమారు 6,5 వేల పనులు. వారు పాత్రలో చాలా భిన్నంగా ఉంటారు. అనేక విభిన్నమైన మరియు చాలా నిర్దిష్టమైన మూలాల నుండి ప్రధాన DWHలోకి డేటాను లోడ్ చేసే పనులు ఉన్నాయి, ప్రధాన DWH లోపల స్టోర్ ఫ్రంట్లను లెక్కించే పనులు ఉన్నాయి, డేటాను వేగవంతమైన DWHలోకి ప్రచురించే పనులు ఉన్నాయి, అనేక విభిన్న పనులు ఉన్నాయి - మరియు ఎయిర్ఫ్లో వాటిని రోజు తర్వాత రోజు నమిలేస్తుంది. సంఖ్యలలో మాట్లాడుతూ, ఇది 2,3 వేలు DWH (హడూప్)లో విభిన్న సంక్లిష్టత కలిగిన ELT టాస్క్లు, సుమారు. 2,5 వందల డేటాబేస్లు మూలాలు, ఇది ఒక బృందం 4 ETL డెవలపర్లు, ఇవి DWHలో ETL డేటా ప్రాసెసింగ్ మరియు DWH లోపల ELT డేటా ప్రాసెసింగ్గా విభజించబడ్డాయి మరియు చాలా ఎక్కువ ఒక నిర్వాహకుడు, సేవ యొక్క మౌలిక సదుపాయాలతో ఎవరు వ్యవహరిస్తారు.
భవిష్యత్తు కోసం ప్రణాళికలు
ప్రక్రియల సంఖ్య అనివార్యంగా పెరుగుతోంది మరియు ఎయిర్ఫ్లో ఇన్ఫ్రాస్ట్రక్చర్ పరంగా మనం చేయబోయే ప్రధాన విషయం స్కేలింగ్. మేము ఎయిర్ఫ్లో క్లస్టర్ను నిర్మించాలనుకుంటున్నాము, సెలెరీ కార్మికుల కోసం ఒక జత కాళ్లను కేటాయించాలనుకుంటున్నాము మరియు జాబ్ షెడ్యూలింగ్ ప్రక్రియలు మరియు రిపోజిటరీతో స్వీయ-నకిలీ హెడ్ని తయారు చేయాలనుకుంటున్నాము.
ఉపసంహారం
ఇది, వాస్తవానికి, నేను ఎయిర్ఫ్లో గురించి చెప్పాలనుకుంటున్న ప్రతిదీ కాదు, కానీ నేను ప్రధాన అంశాలను హైలైట్ చేయడానికి ప్రయత్నించాను. తినడం వల్ల ఆకలి వస్తుంది, దీన్ని ప్రయత్నించండి మరియు మీకు నచ్చుతుంది :)
మూలం: www.habr.com