Salve, Habr! In hoc articulo loqui de uno magno instrumento ad massam processus notificationis evolvendam, exempli gratia, in infrastructura corporis DWH vel DataLake tui. Loquemur de Apache Airflow (inferius ad Airflow). Inique attentionem in Habre privatur, et in summa parte tibi persuadere conabitur saltem Airflow valet inspicere cum processus schedulae eligens pro processibus tuis ETL/ELT.
Antea seriem articulorum in argumento DWH scripsi cum apud Tinkoff Bank laboravi. Nunc pars turmae Mail.Ru Group factus sum et suggestum facio pro analysi analysi in area ludum. Profecto, ut nuntii et solutiones iucundae apparent, mea turma et hic de nostro suggestu pro notitia analyticorum colloquebor.
Incipit prologus
Incipiamus igitur. Quid est Airflow? Haec bibliotheca est (or
Nunc videamus res principales Airflow. Intelligendo essentiam et propositum suum processum architecturae optime instituere potes. Forsitan principale ens est Graph directa acyclica (inferius DAG appellata).
DAG
DAC significantia quaedam est coniunctio operum tuorum, quae ordine stricte definito iuxta certas schedulas complere vis. Airflow praebet telam opportunam ad operandum cum DAGs et aliis entibus:
DAG videre posset sic:
Elit, cum DAG cogitans, partem operariorum ponit in quo opera intra DAG aedificabuntur. Hic ad aliam rem magni momenti accedimus: Operator Airflow.
operators
Auctor est ens secundum quod officium instantiarum creatae sunt, quae describit id quod eventurum est in executione operis instantiae.
- BashOperator - operator ad exsequendam bash mandatum.
- PythonOperator - operator pro Pythone codicem vocans.
- EmailOperator - operator ad electronicam mittendam.
- HTTPOperator - operator operandi cum http petitionibus.
- SqlOperator - operator ad faciendum SQL codicem.
- Sensor est operator ad eventum exspectandum (adventus temporis requisiti, aspectus fasciculi requisiti, linea in datorum, responsio ex API, etc., etc.).
Sunt specialiores operatores: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Operarios etiam excolere potes secundum proprias notas suas et his in tuo consilio utere. Exempli gratia, operantem MongoDBToHiveViaHdfsTransfer creavimus ad documenta educenda ex MongoDB ad Hive, et plures operatores ad operandum.
Deinde omnia haec instantiarum officiorum exsecutioni mandanda sunt, et nunc de schedula loquemur.
Scheduler
Airflow est negotium scheduler aedificatur
Quisque lacus terminum habet in numero foraminum. DAC cum creando, stagnum datur;
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Piscina in gradu DAG definita opprimi potest in gradu molis.
Separatum processum, Scheduler, est responsabilis ad omnia opera in Airflow schedulanda. Profecto Scheduler agit de omnibus mechanicis ad executionem operum ordinandorum. Negotium percurrit plures gradus antequam exsecutioni mandaretur;
- Priores functiones in DAG peractae sunt, nova constare potest.
- Queue secundum prioritatem operum (prioritates etiam regeri possunt), et si libera socors in piscina est, negotium in operando sumi potest.
- Si est operarius gratuitus apium, negotium ei missum est; opus, quod in problemate programmatum es inchoat, uno vel alio operante utens.
Simplex satis.
Scheduler currit statuto omnium DAGs et omnium operum intra DAGs.
Nam Scheduler ut cum DAG operando committitur, DAG cedulam statuit:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Est parata copia facta presets: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Etiam expressionibus cronicis uti potes:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Executio Date
Ad intellegendum quomodo operatur Airflow, interest scire quid Date exsecutionem pro DAG. In Airflow, DAG executionem Date dimensionem habet, i.e., secundum schedulam laboris DAG, instantiae negotium creatae pro singulis Date Executionis. Et pro singulis Date exsecutioni, functiones resecuri possunt, vel, exempli gratia, DAG simul in pluribus Dates exsecutioni mandare potest. Hoc manifeste hic ostenditur.
Infeliciter (vel fortasse feliciter: ex situ pendet), si exsecutio operis in DAG corrigitur, tunc exsecutio in praecedente Date Executionis ratione servatis servandis procedet. Hoc bonum est si in praeteritis periodis novis algorithmus notitias recalculare debes, sed malum est quia reproducibilitas effectus amittitur (nempe nemo tibi molestum est reddere debitam versionem fontis codicem ex Git et calculare quid uno tempore opus est, quo opus est).
Generans tasks
Exsecutio DAG in Pythone signum est, itaque commodissimum est modum ad redigendum quantitatem codicis operando, exempli gratia, cum fontibus scatebris. Dicamus te tres MySQL shards habere fontem, debes in unumquemque ascendere et aliquas notitias colligere. Praeterea, independenter et parallele. Python in DAG codicem hoc spectare posset:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG hoc spectat:
Hoc in casu, testa pervalida addere vel removere potes simpliciter componendo occasus et adaequationem DAG. amoena!
Poteris etiam multipliciori codice generationis uti, exempli gratia, cum fontibus operari in forma database vel tabulae structuram describere, algorithmus ad operandum cum mensa, et, ratione habita infrastructure DWH notae, processum generant. ad onerandas N tabulas in tua reposita. Vel, exempli gratia, operando cum API qui operando parametrum in forma elenchi non sustinens, potes N operas generare in DAG ex hoc indice, parallelismum precum in API ad piscinam coarctare, et radere. necessarias notitias ex API. Flexibile!
repositio
Airflow habet suum repositorium, datorum (potest esse MySQL vel Postgres, habemus Postgres), qui refert status officiorum, DAGs, nexus uncinis, variabilium globalum etc. repositorium in Airflow est valde simplex (circa 20 tabulas) et opportunum si vis aliquem tuorum processuum super eam aedificare. Memini 100500 tabulas in Reposito Informatica, quae iam diu ante intellegendum esse quaerendum quam aedificandum putavi.
Cras
Reposito simplicitate donata, negotium vigilantiae processum tibi opportunum aedificare potes. Nota nota apud Zeppelin, ubi statum operum spectamus;
Posset etiam hic esse interventus ipsius Airfluvii interreti:
Codex Airflow est fons apertus, ergo nos ad Telegram admonendo addidimus. Singulae cursus instantiae alicuius operis, si error occurrit, globus spams in Telegram, ubi tota evolutionis et subsidii turma consistit.
Promptum responsum accipimus per Telegram (si opus est), et per Zeppelin altiorem picturam officiorum in Airflow recipimus.
in summa
Fluxus praesertim aperta fons est, et miracula ab eo exspectare non debes. Solutio in opera et labore aedificandi praeparandi. Finis est rem deducere, mihi crede, pretium est. Celeritas progressionis, flexibilitas, facilitas novis processibus augendi - placet. Utique, multum operam dare debes ad ordinationem rei, stabilitatem ipsum Airflow: miracula non fiunt.
Nunc habemus Airflow opus cotidie circa 6,5 milia opera. Longe aliter se habent; Munus onerationis notitiarum in principalibus DWH sunt ex multis diversis et valde certis fontibus, sunt opera thesaurorum computandi intra principale DWH, sunt munera notitiae in ieiunium DWH editae, sunt multa, multa opera - et Airflow. ruminat omnes de die in diem. Loquens in numeris, hoc est 2,3 milia ELT officia variae complexionis intra DWH (Hadoop), proxime. 2,5 centum databases fontes, hoc est turma e 4 ETL developersquae dividuntur in ETL processus in DWH et ELT notitia processus intus DWH et sane magis unum adminqui infrastructurae servitii agit.
Consilia pro futuro
Numerus processuum inevitabiliter crescit, et maximum faciemus secundum infrastructuram Airflow infrastructuram scandentem. Volumus botrum airflow aedificare, par crurum operantibus Apium collocare, et caput duplicare cum processibus schedulingis et repositorium facere.
epilogus,
Quod quidem non omnia quae de Airflow dicere vellem, sed praecipua capita exaggerare conabar. Appetitus cum comedendo venit, experire et tibi placet :)
Source: www.habr.com