Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Salve, Habr! In hoc articulo loqui de uno magno instrumento ad massam processus notificationis evolvendam, exempli gratia, in infrastructura corporis DWH vel DataLake tui. Loquemur de Apache Airflow (inferius ad Airflow). Inique attentionem in Habre privatur, et in summa parte tibi persuadere conabitur saltem Airflow valet inspicere cum processus schedulae eligens pro processibus tuis ETL/ELT.

Antea seriem articulorum in argumento DWH scripsi cum apud Tinkoff Bank laboravi. Nunc pars turmae Mail.Ru Group factus sum et suggestum facio pro analysi analysi in area ludum. Profecto, ut nuntii et solutiones iucundae apparent, mea turma et hic de nostro suggestu pro notitia analyticorum colloquebor.

Incipit prologus

Incipiamus igitur. Quid est Airflow? Haec bibliotheca est (or bibliothecarum) ad explicandum, consilium et monitor processuum laboris. Pelagus notam airflow: codice Python (develop) processuum describere adhibetur. Hoc multum commoda habet ad consilium et progressionem ordinandam: in essentia, tuum (exempli gratia) ETL consilium Python tantum est propositum, et quod vis ordinare potes, ratione habita speciei infrastructure, quantitatis et quadrigis. aliis requisitis. Instrumentaliter omnia simplicia sunt. Pro exemplo PyCharm + Git. Mirum et commodum!

Nunc videamus res principales Airflow. Intelligendo essentiam et propositum suum processum architecturae optime instituere potes. Forsitan principale ens est Graph directa acyclica (inferius DAG appellata).

DAG

DAC significantia quaedam est coniunctio operum tuorum, quae ordine stricte definito iuxta certas schedulas complere vis. Airflow praebet telam opportunam ad operandum cum DAGs et aliis entibus:

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

DAG videre posset sic:

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Elit, cum DAG cogitans, partem operariorum ponit in quo opera intra DAG aedificabuntur. Hic ad aliam rem magni momenti accedimus: Operator Airflow.

operators

Auctor est ens secundum quod officium instantiarum creatae sunt, quae describit id quod eventurum est in executione operis instantiae. Airflow solvo ex GitHub iam copia operariorum promptu. Exempla:

  • BashOperator - operator ad exsequendam bash mandatum.
  • PythonOperator - operator pro Pythone codicem vocans.
  • EmailOperator - operator ad electronicam mittendam.
  • HTTPOperator - operator operandi cum http petitionibus.
  • SqlOperator - operator ad faciendum SQL codicem.
  • Sensor est operator ad eventum exspectandum (adventus temporis requisiti, aspectus fasciculi requisiti, linea in datorum, responsio ex API, etc., etc.).

Sunt specialiores operatores: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Operarios etiam excolere potes secundum proprias notas suas et his in tuo consilio utere. Exempli gratia, operantem MongoDBToHiveViaHdfsTransfer creavimus ad documenta educenda ex MongoDB ad Hive, et plures operatores ad operandum. clickhouse: CHLoadFromHiveOperator and CHTableLoaderOperator. Essentialiter, cum primum consilium saepe codicem in constitutionibus fundamentalibus aedificatum est, cogitare potes de ea aedificatione in novam constitutionem. Haec ulteriorem progressionem simpliciorem reddet, et bibliothecam operariorum in incepto dilatabis.

Deinde omnia haec instantiarum officiorum exsecutioni mandanda sunt, et nunc de schedula loquemur.

Scheduler

Airflow est negotium scheduler aedificatur apium. Apium Pythonis bibliotheca est quae te plus asynchronum instituere sinit et officiorum executionem distribuit. In parte airflow, omnia opera in piscinas dividuntur. Stagna manually creata sunt. De more eorum propositum est inposuit operandi fonte circumscribere vel pensare opera intra DWH. Lacus per interfacem telam tractari potest:

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Quisque lacus terminum habet in numero foraminum. DAC cum creando, stagnum datur;

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Piscina in gradu DAG definita opprimi potest in gradu molis.
Separatum processum, Scheduler, est responsabilis ad omnia opera in Airflow schedulanda. Profecto Scheduler agit de omnibus mechanicis ad executionem operum ordinandorum. Negotium percurrit plures gradus antequam exsecutioni mandaretur;

  1. Priores functiones in DAG peractae sunt, nova constare potest.
  2. Queue secundum prioritatem operum (prioritates etiam regeri possunt), et si libera socors in piscina est, negotium in operando sumi potest.
  3. Si est operarius gratuitus apium, negotium ei missum est; opus, quod in problemate programmatum es inchoat, uno vel alio operante utens.

Simplex satis.

Scheduler currit statuto omnium DAGs et omnium operum intra DAGs.

Nam Scheduler ut cum DAG operando committitur, DAG cedulam statuit:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Est parata copia facta presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Etiam expressionibus cronicis uti potes:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Executio Date

Ad intellegendum quomodo operatur Airflow, interest scire quid Date exsecutionem pro DAG. In Airflow, DAG executionem Date dimensionem habet, i.e., secundum schedulam laboris DAG, instantiae negotium creatae pro singulis Date Executionis. Et pro singulis Date exsecutioni, functiones resecuri possunt, vel, exempli gratia, DAG simul in pluribus Dates exsecutioni mandare potest. Hoc manifeste hic ostenditur.

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Infeliciter (vel fortasse feliciter: ex situ pendet), si exsecutio operis in DAG corrigitur, tunc exsecutio in praecedente Date Executionis ratione servatis servandis procedet. Hoc bonum est si in praeteritis periodis novis algorithmus notitias recalculare debes, sed malum est quia reproducibilitas effectus amittitur (nempe nemo tibi molestum est reddere debitam versionem fontis codicem ex Git et calculare quid uno tempore opus est, quo opus est).

Generans tasks

Exsecutio DAG in Pythone signum est, itaque commodissimum est modum ad redigendum quantitatem codicis operando, exempli gratia, cum fontibus scatebris. Dicamus te tres MySQL shards habere fontem, debes in unumquemque ascendere et aliquas notitias colligere. Praeterea, independenter et parallele. Python in DAG codicem hoc spectare posset:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG hoc spectat:

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Hoc in casu, testa pervalida addere vel removere potes simpliciter componendo occasus et adaequationem DAG. amoena!

Poteris etiam multipliciori codice generationis uti, exempli gratia, cum fontibus operari in forma database vel tabulae structuram describere, algorithmus ad operandum cum mensa, et, ratione habita infrastructure DWH notae, processum generant. ad onerandas N tabulas in tua reposita. Vel, exempli gratia, operando cum API qui operando parametrum in forma elenchi non sustinens, potes N operas generare in DAG ex hoc indice, parallelismum precum in API ad piscinam coarctare, et radere. necessarias notitias ex API. Flexibile!

repositio

Airflow habet suum repositorium, datorum (potest esse MySQL vel Postgres, habemus Postgres), qui refert status officiorum, DAGs, nexus uncinis, variabilium globalum etc. repositorium in Airflow est valde simplex (circa 20 tabulas) et opportunum si vis aliquem tuorum processuum super eam aedificare. Memini 100500 tabulas in Reposito Informatica, quae iam diu ante intellegendum esse quaerendum quam aedificandum putavi.

Cras

Reposito simplicitate donata, negotium vigilantiae processum tibi opportunum aedificare potes. Nota nota apud Zeppelin, ubi statum operum spectamus;

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Posset etiam hic esse interventus ipsius Airfluvii interreti:

Airflow est instrumentum commode et celeriter explicandi et conservandi processus processuum batch

Codex Airflow est fons apertus, ergo nos ad Telegram admonendo addidimus. Singulae cursus instantiae alicuius operis, si error occurrit, globus spams in Telegram, ubi tota evolutionis et subsidii turma consistit.

Promptum responsum accipimus per Telegram (si opus est), et per Zeppelin altiorem picturam officiorum in Airflow recipimus.

in summa

Fluxus praesertim aperta fons est, et miracula ab eo exspectare non debes. Solutio in opera et labore aedificandi praeparandi. Finis est rem deducere, mihi crede, pretium est. Celeritas progressionis, flexibilitas, facilitas novis processibus augendi - placet. Utique, multum operam dare debes ad ordinationem rei, stabilitatem ipsum Airflow: miracula non fiunt.

Nunc habemus Airflow opus cotidie circa 6,5 milia opera. Longe aliter se habent; Munus onerationis notitiarum in principalibus DWH sunt ex multis diversis et valde certis fontibus, sunt opera thesaurorum computandi intra principale DWH, sunt munera notitiae in ieiunium DWH editae, sunt multa, multa opera - et Airflow. ruminat omnes de die in diem. Loquens in numeris, hoc est 2,3 milia ELT officia variae complexionis intra DWH (Hadoop), proxime. 2,5 centum databases fontes, hoc est turma e 4 ETL developersquae dividuntur in ETL processus in DWH et ELT notitia processus intus DWH et sane magis unum adminqui infrastructurae servitii agit.

Consilia pro futuro

Numerus processuum inevitabiliter crescit, et maximum faciemus secundum infrastructuram Airflow infrastructuram scandentem. Volumus botrum airflow aedificare, par crurum operantibus Apium collocare, et caput duplicare cum processibus schedulingis et repositorium facere.

epilogus,

Quod quidem non omnia quae de Airflow dicere vellem, sed praecipua capita exaggerare conabar. Appetitus cum comedendo venit, experire et tibi placet :)

Source: www.habr.com

Add a comment