Hei, olen Dmitry Logvinenko - Vezet-yritysryhmän Analytics-osaston tietoinsinööri.
Kerron sinulle upeasta työkalusta ETL-prosessien kehittämiseen - Apache Airflow. Mutta Airflow on niin monipuolinen ja monipuolinen, että sitä kannattaa tarkastella lähemmin, vaikka et olisikaan mukana tietovirroissa, mutta joutuisit ajoittain käynnistämään prosesseja ja seuraamaan niiden toteutumista.
Ja kyllä, en vain kerro, vaan myös näytän: ohjelmassa on paljon koodia, kuvakaappauksia ja suosituksia.

Mitä yleensä näet, kun googletat sanaa Airflow / Wikimedia Commons
sisällysluettelo
Esittely
Apache Airflow on aivan kuin Django:
- kirjoitettu pythonilla
- siellä on loistava hallintapaneeli,
- laajennettavissa toistaiseksi
- vain parempi, ja se tehtiin täysin eri tarkoituksiin, nimittäin (kuten ennen kattia on kirjoitettu):
- tehtävien suorittaminen ja valvonta rajoittamattomalla määrällä koneita (niin monta selleriä/kubernettiä ja omatunto sallii)
- dynaamisen työnkulun luomisen kanssa erittäin helposti kirjoitettavasta ja ymmärrettävästä Python-koodista
- ja kyky yhdistää tietokannat ja API:t toisiinsa käyttämällä sekä valmiita komponentteja että kotitekoisia laajennuksia (mikä on erittäin yksinkertaista).
Käytämme Apache Airflowta seuraavasti:
- keräämme tietoja eri lähteistä (monet SQL Server- ja PostgreSQL-instanssit, erilaiset API:t sovellusmittareineen, jopa 1C) DWH:ssa ja ODS:ssä (meillä on Vertica ja Clickhouse).
- kuinka edistynyt
cron, joka käynnistää ODS:n tietojen konsolidointiprosessit ja valvoo myös niiden ylläpitoa.
Viime aikoihin asti tarpeemme kattoi yksi pieni palvelin, jossa oli 32 ydintä ja 50 Gt RAM-muistia. Airflow:ssa tämä toimii:
- lisää 200 päivää (itse asiassa työnkulkuja, joihin täyttimme tehtäviä),
- jokaisessa keskimäärin 70 tehtävää,
- tämä hyvyys alkaa (myös keskimäärin) kerran tunnissa.
Ja kuinka laajennimme, kirjoitan alla, mutta nyt määritellään se yber-ongelma, jonka ratkaisemme:
Alkuperäisiä SQL-palvelimia on kolme, joissa kussakin on 50 tietokantaa - yhden projektin esiintymiä, vastaavasti, niillä on sama rakenne (melkein kaikkialla, mua-ha-ha), mikä tarkoittaa, että jokaisessa on Tilaukset-taulukko (onneksi taulukko, jossa se on nimi voidaan työntää mihin tahansa liiketoimintaan). Otamme tiedot lisäämällä palvelukenttiä (lähdepalvelin, lähdetietokanta, ETL-tehtävätunnus) ja heitämme ne naiivisti vaikkapa Verticaan.
Mennään!
Pääosa, käytännöllinen (ja hieman teoreettinen)
Miksi me (ja sinä)
Kun puut olivat isoja ja minä olin yksinkertainen SQL-schik yhdessä venäläisessä vähittäiskaupassa, huijasimme ETL-prosesseja eli tietovirtoja käyttämällä kahta käytettävissämme olevaa työkalua:
- Informatica Power Center - erittäin levittävä järjestelmä, erittäin tuottava, omalla laitteistollaan, omalla versioillaan. Käytin Jumala varjelkoon 1% sen kyvyistä. Miksi? Ensinnäkin tämä käyttöliittymä, jostain 380-luvulta, painoi meitä henkisesti. Toiseksi tämä konsti on suunniteltu erittäin hienoihin prosesseihin, raivokkaaseen komponenttien uudelleenkäyttöön ja muihin erittäin tärkeisiin yritystemppuihin. Siitä tosiasiasta, että se maksaa, kuten Airbus AXNUMX:n siipi / vuosi, emme sano mitään.
Varo, kuvakaappaus voi vahingoittaa hieman alle 30-vuotiaita

- SQL Server Integration Server - Käytimme tätä toveria projektin sisäisissä virroissamme. No itse asiassa: käytämme jo SQL Serveriä, ja olisi jotenkin kohtuutonta olla käyttämättä sen ETL-työkaluja. Kaikki siinä on hyvää: sekä käyttöliittymä on kaunis, että edistymisraportit... Mutta emme siksi rakasta ohjelmistotuotteita, oi, ei tätä varten. Versio se
dtsx(joka on XML, jossa solmut sekoitetaan tallennuksen yhteydessä) voimme, mutta mitä järkeä on? Mitä jos tekisit tehtäväpaketin, joka vetää satoja taulukoita palvelimelta toiselle? Kyllä, mikä sata, etusormesi putoaa kahdestakymmenestä palasta hiiren painiketta napsauttamalla. Mutta hän näyttää ehdottomasti muodikkaammalta:
Etsimme varmasti ulospääsyä. Tapaus jopa melkein tuli itse kirjoitettuun SSIS-pakettigeneraattoriin ...
…ja sitten sain uuden työpaikan. Ja Apache Airflow ohitti minut siinä.
Kun sain selville, että ETL-prosessikuvaukset ovat yksinkertaista Python-koodia, en vain tanssinut ilosta. Näin tietovirrat versioitiin ja diffattiin, ja yksirakenneisten taulukoiden kaatamisesta sadoista tietokannoista yhteen kohteeseen tuli Python-koodia puolessatoista tai kahdessa 13” näytössä.
Klusterin kokoaminen
Älkäämme järjestäkö täysin päiväkotia, äläkä puhu täällä täysin itsestään selvistä asioista, kuten Airflown asentamisesta, valitsemastasi tietokannasta, Sellerin ja muista telakoissa kuvatuista tapauksista.
Jotta voimme aloittaa kokeilut välittömästi, luonnostelin docker-compose.yml jossa:
- Nostetaan oikeasti Ilmavirta: Aikataulu, Web-palvelin. Flower tulee myös pyörimään siellä valvomaan Selleritehtäviä (koska se on jo työnnetty
apache/airflow:1.10.10-python3.7, mutta emme välitä) - PostgreSQL, johon Airflow kirjoittaa palvelutietonsa (ajoitustiedot, suoritustilastot jne.), ja Selleri merkitsee suoritetut tehtävät;
- Redis, joka toimii sellerin tehtävävälittäjänä;
- Selleri työntekijä, joka osallistuu suoraan tehtävien suorittamiseen.
- Kansioon
./dagslisäämme tiedostomme dags-kuvauksen kanssa. Ne poimitaan lennossa, joten koko pinoa ei tarvitse jongleerata jokaisen aivastauksen jälkeen.
Joissain paikoissa esimerkkien koodia ei näytetä kokonaan (jotta teksti ei sotkeutuisi), mutta jossain sitä muutetaan prosessin aikana. Täydelliset toimivat koodiesimerkit löytyvät arkistosta .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerHuomautuksia:
- Sävellyksen kokoonpanossa luotin pitkälti tunnettuun kuvaan - muista tarkistaa se. Ehkä et tarvitse mitään muuta elämääsi.
- Kaikki ilmavirran asetukset ovat käytettävissä paitsi kautta
airflow.cfg, mutta myös ympäristömuuttujien kautta (kiitos kehittäjille), joita käytin ilkeästi hyväkseni. - Se ei tietenkään ole tuotantovalmis: en tietoisesti laittanut sykettä konteille, en vaivautunut turvallisuuteen. Mutta tein kokeilijoillemme sopivan minimin.
- Ota huomioon, että:
- dag-kansion on oltava sekä ajoittajan että työntekijöiden käytettävissä.
- Sama koskee kaikkia kolmannen osapuolen kirjastoja - ne kaikki on asennettava koneille, joissa on ajastin ja työntekijät.
No nyt on yksinkertaista:
$ docker-compose up --scale worker=3Kun kaikki on noussut, voit katsoa verkkokäyttöliittymiä:
- Ilmavirta:
- Kukka:
Peruskäsitteet
Jos et ymmärtänyt mitään kaikista näistä "päivistä", tässä on lyhyt sanakirja:
- Scheduler - Airflown tärkein setä, joka hallitsee, että robotit työskentelevät kovasti, ei ihminen: tarkkailee aikataulua, päivittää päivämääriä, käynnistää tehtäviä.
Yleensä vanhemmissa versioissa hänellä oli ongelmia muistin kanssa (ei, ei muistinmenetystä, vaan vuotoja) ja vanha parametri jäi jopa asetuksiin
run_duration- sen uudelleenkäynnistysväli. Mutta nyt kaikki on hyvin. - PÄIVÄ (alias "dag") - "suunnattu asyklinen graafi", mutta tällainen määritelmä kertoo harvoille ihmisille, mutta itse asiassa se on säilö tehtäville, jotka ovat vuorovaikutuksessa toistensa kanssa (katso alla) tai analoginen paketille SSIS:ssä ja Workflow in Informaticassa .
Dagien lisäksi voi vielä olla alidageja, mutta emme todennäköisesti pääse niihin.
- DAG Juoksu - alustettu dag, jolle on määritetty oma
execution_date. Saman dag:n Dagranit voivat toimia rinnakkain (jos olet tehnyt tehtäväsi idempotenteiksi, tietysti). - operaattori ovat koodinpätkiä, jotka vastaavat tietyn toiminnon suorittamisesta. Operaattoreita on kolmenlaisia:
- toimintakuten meidän suosikkimme
PythonOperator, joka voi suorittaa minkä tahansa (kelvollisen) Python-koodin; - siirtää, jotka kuljettavat tietoja paikasta toiseen, esim.
MsSqlToHiveTransfer; - anturi toisaalta sen avulla voit reagoida tai hidastaa dag:n jatkoa, kunnes tapahtuma tapahtuu.
HttpSensorvoi vetää määritetyn päätepisteen, ja kun haluttu vastaus odottaa, aloita siirtoGoogleCloudStorageToS3Operator. Utelias mieli kysyy: "miksi? Loppujen lopuksi voit tehdä toistoja suoraan operaattorissa!” Ja sitten, jotta tehtävien joukko ei tukkeutuisi keskeytetyillä operaattoreilla. Anturi käynnistyy, tarkistaa ja sammuu ennen seuraavaa yritystä.
- toimintakuten meidän suosikkimme
- Tehtävä - Ilmoitetut operaattorit, tyypistä riippumatta ja jotka on liitetty dag:iin, ylennetään tehtävän arvoon.
- tehtävän esimerkki - kun yleissuunnittelija päätti, että on aika lähettää tehtävät taisteluun esiintyjätyöntekijöiden päälle (oikein paikan päällä, jos käytämme
LocalExecutortai etäsolmuun, jos kyseessä onCeleryExecutor), se määrittää niille kontekstin (eli joukon muuttujia - suoritusparametreja), laajentaa komento- tai kyselymalleja ja yhdistää ne.
Luomme tehtäviä
Ensin hahmotellaan dougimme yleinen kaavio, ja sitten sukeltamme yksityiskohtiin yhä enemmän, koska käytämme joitain ei-triviaaleja ratkaisuja.
Joten yksinkertaisimmassa muodossaan tällainen dag näyttää tältä:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ymmärretään:
- Ensin tuomme tarvittavat libsit ja jotain muuta;
sql_server_ds- OnkoList[namedtuple[str, str]]Airflow Connectionsin yhteyksien nimet ja tietokannat, joista otamme levymme;dag- ilmoitus päivästämme, jonka on välttämättä oltava mukanaglobals(), muuten Airflow ei löydä sitä. Dougin on myös sanottava:- mikä hänen nimensä on
orders- tämä nimi tulee näkyviin verkkokäyttöliittymään, - että hän työskentelee keskiyöstä XNUMX. heinäkuuta,
- ja sen pitäisi käydä noin 6 tunnin välein (koville miehille täällä sen sijaan
timedelta()hyväksyttäväksicron-linja0 0 0/6 ? * * *, vähemmän cool - ilmaus kuin@daily);
- mikä hänen nimensä on
workflow()tekee päätyön, mutta ei nyt. Toistaiseksi vain upotamme kontekstimme lokiin.- Ja nyt tehtävien luomisen yksinkertainen taika:
- käymme läpi lähteemme;
- alustaa
PythonOperator, joka suorittaa nukenworkflow(). Älä unohda määrittää tehtävälle yksilöivää (dag:n sisällä) nimeä ja sitoa itse dag. Lippuprovide_contextpuolestaan lisää funktioon lisäargumentteja, joita keräämme huolellisesti käyttämällä**context.
Toistaiseksi siinä kaikki. Mitä saimme:
- uusi päivä verkkokäyttöliittymässä,
- puolitoista sataa tehtävää, jotka suoritetaan rinnakkain (jos Airflow-, Sellery-asetukset ja palvelimen kapasiteetti sen sallivat).
No, melkein sain sen.

Kuka asentaa riippuvuudet?
Yksinkertaistaakseni tätä koko asiaa, menin asiaan docker-compose.yml käsittelyä requirements.txt kaikissa solmuissa.
Nyt se on poissa:

Harmaat neliöt ovat ajastimen käsittelemiä tehtävänäytteitä.
Odotellaan vähän, työt napsautetaan työntekijöiden toimesta:

Vihreät ovat tietysti saaneet työnsä onnistuneesti päätökseen. Punaiset eivät ole kovin menestyviä.
Muuten, tuotteessamme ei ole kansiota
./dags, koneiden välillä ei ole synkronointia - kaikki päivät ovat sisällägitGitlabissamme, ja Gitlab CI jakaa päivityksiä koneille, kun ne yhdistetäänmaster.
Vähän Flowerista
Sillä aikaa, kun työntekijät puskevat tuttejamme, muistetaan toinen työkalu, joka voi näyttää meille jotain - Kukka.
Aivan ensimmäinen sivu, jossa on yhteenveto työntekijöiden solmuista:

Intensiivisin sivu, jossa on suoritettuja tehtäviä:

Tylsin sivu välittäjämme statuksella:

Kirkkaimmalla sivulla on tehtävän tilakaaviot ja niiden suoritusaika:

Lataamme alikuormitetut
Joten kaikki tehtävät on suoritettu, voit viedä haavoittuneet pois.

Ja haavoittuneita oli monia - syystä tai toisesta. Jos Airflowa käytetään oikein, juuri nämä neliöt osoittavat, että tiedot eivät todellakaan saapuneet perille.
Sinun on katsottava lokia ja käynnistettävä kaatuneet tehtävät uudelleen.
Napsauta mitä tahansa neliötä, näemme käytettävissämme olevat toiminnot:

Voit ottaa ja poistaa kaatuneet. Toisin sanoen unohdamme, että jokin on epäonnistunut siellä, ja sama ilmentymätehtävä menee ajoittimeen.

On selvää, että tämän tekeminen hiirellä kaikilla punaisilla neliöillä ei ole kovin inhimillistä - tätä emme odota Airflowlta. Luonnollisesti meillä on joukkotuhoaseita: Browse/Task Instances

Valitaan kaikki kerralla ja nollataan, napsauta oikeaa kohdetta:

Siivouksen jälkeen taksimme näyttävät tältä (ne odottavat jo aikatauluttajaa aikatauluttamaan ne):

Liitännät, koukut ja muut muuttujat
On aika katsoa seuraavaa DAG:ta, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ovatko kaikki koskaan päivittäneet raporttia? Tämä on taas hän: siellä on luettelo lähteistä, joista tiedot saa; siellä on luettelo, mihin sijoittaa; älä unohda puhua, kun kaikki tapahtui tai meni rikki (no, tämä ei koske meitä, ei).
Käydään tiedosto läpi uudelleen ja katsotaan uusia epäselviä juttuja:
from commons.operators import TelegramBotSendMessage- mikään ei estä meitä tekemästä omia operaattoreita, joita hyödynsimme tekemällä pienen kääreen viestien lähettämiseen Unblockedille. (Puhumme lisää tästä operaattorista alla);default_args={}- dag voi jakaa samat argumentit kaikille operaattoreilleen;to='{{ var.value.all_the_kings_men }}'- kenttätomeillä ei ole kovakoodattua, vaan dynaamisesti luotua käyttämällä Jinjaa ja muuttujaa sähköpostilistalla, jonka laitoin huolellisestiAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— käyttäjän käynnistämisen ehto. Meidän tapauksessamme kirje lentää pomoille vain, jos kaikki riippuvuudet ovat selvinneet onnistuneesti;tg_bot_conn_id='tg_main'- argumentitconn_idhyväksyä luomamme yhteystunnuksetAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegramin viestit lentävät pois vain, jos tehtäviä on kaatunut;task_concurrency=1- kielletään yhden tehtävän useiden tehtäväinstanssien samanaikainen käynnistäminen. Muuten saamme useiden samanaikaisen käynnistyksenVerticaOperator(katso yhtä pöytää);report_update >> [email, tg]- kaikkiVerticaOperatorlähentyvät kirjeitä ja viestejä, kuten tämä:

Mutta koska ilmoittajaoperaattoreilla on erilaiset käynnistysehdot, vain yksi toimii. Puunäkymässä kaikki näyttää hieman vähemmän visuaalliselta:

Sanon muutaman sanan aiheesta makroja ja heidän ystävänsä - muuttujia.
Makrot ovat Jinja-paikkamerkkejä, jotka voivat korvata erilaisia hyödyllisiä tietoja operaattoriargumenteiksi. Esimerkiksi näin:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} laajenee kontekstimuuttujan sisältöön execution_date muodossa YYYY-MM-DD: 2020-07-14. Parasta on, että kontekstimuuttujat naulataan tiettyyn tehtäväesiintymään (neliöön puunäkymässä), ja kun käynnistetään uudelleen, paikkamerkit laajenevat samoihin arvoihin.
Määritettyjä arvoja voi tarkastella kunkin tehtävän ilmentymän Rendered-painikkeella. Kirjeen lähettäminen tapahtuu seuraavasti:

Ja niin tehtävässä viestin lähettämisessä:

Täydellinen luettelo uusimman saatavilla olevan version sisäänrakennetuista makroista on saatavilla täältä:
Lisäksi pluginien avulla voimme ilmoittaa omat makromme, mutta se on toinen tarina.
Ennalta määritettyjen asioiden lisäksi voimme korvata muuttujien arvot (käytin tätä jo yllä olevassa koodissa). Luodaan sisään Admin/Variables pari asiaa:

Kaikki mitä voit käyttää:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Arvo voi olla skalaari tai se voi olla myös JSON. JSON:n tapauksessa:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}käytä vain polkua haluttuun avaimeen: {{ var.json.bot_config.bot.token }}.
Sanon kirjaimellisesti yhden sanan ja näytän yhden kuvakaappauksen aiheesta yhteys. Kaikki on alkeellista täällä: sivulla Admin/Connections luomme yhteyden, lisäämme sisäänkirjautumistunnuksemme / salasanamme ja tarkemmat parametrit sinne. Kuten tämä:

Salasanat voidaan salata (oletusarvoa perusteellisemmin) tai voit jättää yhteystyypin pois (kuten tein tg_main) - Tosiasia on, että tyyppiluettelo on langallinen Airflow-malleissa, eikä sitä voi laajentaa ilman lähdekoodeja (jos yhtäkkiä en googlettanut jotain, korjaa minua), mutta mikään ei estä meitä saamasta krediittejä. nimi.
Voit myös muodostaa useita yhteyksiä samalla nimellä: tässä tapauksessa menetelmä BaseHook.get_connection(), joka saa meille yhteyksiä nimellä, antaa satunnainen useilta kaimailta (olisi loogisempaa tehdä Round Robin, mutta jätetään se Airflow-kehittäjien omantunnon varaan).
Muuttujat ja yhteydet ovat varmasti hienoja työkaluja, mutta on tärkeää, että et menetä tasapainoa: mitkä osat virtauksistasi tallennat itse koodiin ja mitkä osat annat Airflowlle tallennettavaksi. Toisaalta voi olla kätevää muuttaa nopeasti arvoa, esimerkiksi postilaatikkoa, käyttöliittymän kautta. Toisaalta tämä on silti paluu hiiren napsautukseen, josta halusimme (minä) päästä eroon.
Yhteyksien parissa työskenteleminen on yksi tehtävistä koukut. Yleisesti ottaen Airflow-koukut ovat pisteitä, joilla se yhdistetään kolmannen osapuolen palveluihin ja kirjastoihin. Esim, JiraHook avaa asiakkaan meille vuorovaikutukseen Jiran kanssa (voit siirtää tehtäviä edestakaisin) ja SambaHook voit työntää paikallisen tiedoston smb-kohta.
Muokatun operaattorin jäsentäminen
Ja pääsimme katsomaan kuinka se on tehty TelegramBotSendMessage
Koodi commons/operators.py varsinaisen operaattorin kanssa:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Täällä, kuten kaikessa muussakin Airflowssa, kaikki on hyvin yksinkertaista:
- Peritty
BaseOperator, joka toteuttaa aika monta Airflow-kohtaista asiaa (katso vapaa-aikaasi) - Ilmoitetut kentät
template_fields, jossa Jinja etsii makroja käsiteltäväksi. - Järjestettiin oikeat argumentit puolesta
__init__(), aseta oletusasetukset tarvittaessa. - Emme myöskään unohtaneet esi-isän alustusta.
- Avasi vastaavan koukun
TelegramBotHooksaanut siitä asiakasobjektin. - Ohitettu (uudelleenmääritetty) menetelmä
BaseOperator.execute(), joka Airfow nykii, kun on aika käynnistää operaattori - siinä toteutamme päätoimenpiteen, unohtaen kirjautua sisään. (Kirjaudumme muuten heti sisäänstdoutиstderr- Ilmavirta sieppaa kaiken, kääri sen kauniisti, hajottaa tarvittaessa.)
Katsotaan mitä meillä on commons/hooks.py. Tiedoston ensimmäinen osa koukulla:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEn edes tiedä mitä tässä selittäisin, huomautan vain tärkeät seikat:
- Perimme, ajattelemme argumentteja - useimmissa tapauksissa se on yksi:
conn_id; - Vakiomenetelmien ohittaminen: rajoitin itseäni
get_conn(), jossa saan yhteysparametrit nimen mukaan ja vain osionextra(tämä on JSON-kenttä), johon laitoin (omien ohjeideni mukaan!) Telegram-bottitunnuksen:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Luon esimerkin meidän
TelegramBot, antamalla sille tietyn tunnuksen.
Siinä kaikki. Voit saada asiakkaan koukusta käyttämällä TelegramBotHook().clent tai TelegramBotHook().get_conn().
Ja tiedoston toinen osa, jossa teen mikrokääreen Telegram REST API:lle, jotta en vedä samaa yhdelle menetelmälle sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Oikea tapa on laskea kaikki yhteen:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- laita laajennuksessa julkiseen arkistoon ja anna se avoimelle lähteelle.
Tutkiessamme tätä kaikkea raporttipäivityksemme onnistuivat epäonnistumaan ja lähettämään minulle kanavassa virheilmoituksen. Menen katsomaan onko vika...

Jotain meni rikki koirassamme! Eikö se ole sitä mitä odotimme? Tarkalleen!
Aiotko kaataa?
Tuntuuko sinusta, että missasin jotain? Näyttää siltä, että hän lupasi siirtää tietoja SQL Serveristä Verticaan, ja sitten hän otti sen ja siirtyi aiheesta, roisto!
Tämä julmuus oli tahallista, minun piti vain tulkita sinulle terminologiaa. Nyt voit mennä pidemmälle.
Suunnitelmamme oli tämä:
- Do dag
- Luo tehtäviä
- Katso kuinka kaunista kaikki on
- Määritä istuntonumerot täyttöihin
- Hae tietoja SQL Serveristä
- Laita tiedot Verticaan
- Kerää tilastoja
Joten, jotta tämä kaikki saadaan toimimaan, tein pienen lisäyksen meidän docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pySiellä nostetaan:
- Vertica isäntänä
dwhkaikkein oletusasetuksilla, - kolme SQL Serverin esiintymää,
- täytämme jälkimmäisen tietokannat joillakin tiedoilla (älä missään tapauksessa tutki
mssql_init.py!)
Käynnistämme kaiken hyvän hieman monimutkaisemman komennon avulla kuin viime kerralla:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Voit käyttää tuotetta mitä ihmesatunnaistajamme loi Data Profiling/Ad Hoc Query:

Tärkeintä ei ole näyttää sitä analyytikoille
tarkentaa ETL-istunnot En tee, siellä kaikki on triviaalia: teemme pohjan, siinä on kyltti, käärimme kaiken kontekstinhallinnan avulla ja nyt teemme näin:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passAika on koittanut keräämme tietojamme puolentoistasadasta pöydästämme. Tehdään tämä erittäin vaatimattomien rivien avulla:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Koukun avulla saamme Airflowsta
pymssql-kytkeä - Korvataan pyyntöön päivämäärän muodossa oleva rajoitus - mallimoottori heittää sen funktioon.
- Ruokitaan pyyntöämme
pandaskuka saa meidätDataFrame- siitä on meille hyötyä tulevaisuudessa.
Käytän substituutiota
{dt}pyyntöparametrin sijaan%sei siksi, että olisin paha Pinocchio, vaan siksipandasei voi käsitelläpymssqlja luistaa viimeisenparams: Listvaikka hän todella haluaatuple.
Huomaa myös, että kehittäjäpymssqlpäätti olla tukematta häntä enää, ja on aika muuttaa poispyodbc.
Katsotaanpa, millä Airflow täytti funktioidemme argumentit:

Jos tietoja ei ole, ei ole mitään järkeä jatkaa. Mutta on myös outoa pitää täyttöä onnistuneena. Mutta tämä ei ole virhe. A-ah-ah, mitä tehdä?! Ja tässä mitä:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException kertoo Airflowlle, että virheitä ei ole, mutta ohitamme tehtävän. Käyttöliittymässä ei ole vihreää tai punaista neliötä, vaan vaaleanpunaista.
Heitetään tietomme useita sarakkeita:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Nimittäin
- Tietokanta, josta saimme tilaukset,
- Tulvaistuntomme tunnus (se on erilainen jokaiseen tehtävään),
- Hajautus lähteestä ja tilaustunnus - jotta lopullisessa tietokannassa (jossa kaikki kaadetaan yhteen taulukkoon) meillä on yksilöllinen tilaustunnus.
Jäljelle jää toiseksi viimeinen vaihe: kaada kaikki Verticaan. Ja kummallista kyllä, yksi upeimmista ja tehokkaimmista tavoista tehdä tämä on CSV:n kautta!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Valmistamme erikoisvastaanottimen
StringIO. pandasystävällisesti laittaa meidänDataFramemuodossaCSV-linjat.- Avataan koukulla yhteys suosikki Verticaan.
- Ja nyt avulla
copy()lähetä tietomme suoraan Vertikaan!
Otamme kuljettajalta, kuinka monta riviä oli täytetty, ja kerromme istunnonjohtajalle, että kaikki on kunnossa:
session.loaded_rows = cursor.rowcount
session.successful = TrueSiinä kaikki.
Myynnissä luomme kohdelevyn manuaalisesti. Tässä sallin itselleni pienen koneen:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)käytän
VerticaOperator()Luon tietokantaskeeman ja taulukon (jos niitä ei vielä ole, tietysti). Tärkeintä on järjestää riippuvuudet oikein:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadYhteenvetona
- No, - sanoi pieni hiiri, - eikös nyt
Oletko varma, että olen metsän kauhein eläin?
Julia Donaldson, The Gruffalo
Luulen, että jos kollegoillani ja minulla olisi kilpailu: kuka nopeasti luo ja käynnistää ETL-prosessin tyhjästä: he SSIS:illään ja hiirellä ja minä Airflowlla... Ja sitten vertaisimme myös huollon helppoutta... Vau, uskon, että olet samaa mieltä siitä, että voitan heidät kaikilla rintamilla!
Jos vähän vakavammin, niin Apache Airflow - kuvailemalla prosesseja ohjelmakoodin muodossa - teki työni lisää mukavampaa ja nautinnollisempaa.
Sen rajaton laajennettavuus sekä laajennuksina että skaalautuvuuden suhteen antaa sinulle mahdollisuuden käyttää Airflow'ta lähes kaikilla alueilla: jopa koko tiedonkeruun, valmistelun ja käsittelyn aikana, jopa raketteja laukaiseessa (Marsiin, kurssi).
Osa loppu, viite ja tiedot
Keräsimme sinulle
start_date. Kyllä, tämä on jo paikallinen meemi. Via Dougin tärkein argumenttistart_datekaikki ohi. Lyhyesti, jos määritätstart_datenykyinen päivämäärä jaschedule_interval- Jonain päivänä DAG alkaa huomenna ei aikaisemmin.start_date = datetime(2020, 7, 7, 0, 1, 2)Eikä enää ongelmia.
Siihen liittyy toinen ajonaikainen virhe:
Task is missing the start_date parameter, mikä useimmiten osoittaa, että olet unohtanut sitoutua dag-operaattoriin.- Kaikki yhdellä koneella. Kyllä, ja pohjat (itse Airflow ja pinnoitteemme), ja web-palvelin, ja ajastin ja työntekijät. Ja se jopa toimi. Mutta ajan myötä palveluiden tehtävien määrä kasvoi, ja kun PostgreSQL alkoi vastata indeksiin 20 sekunnissa 5 ms:n sijaan, otimme sen ja kantoimme sen pois.
- LocalExecutor. Kyllä, istumme edelleen sen päällä ja olemme jo tulleet kuilun reunalle. LocalExecutor on riittänyt meille toistaiseksi, mutta nyt on aika laajentua ainakin yhdellä työntekijällä ja meidän on tehtävä lujasti töitä siirtyäksemme CeleryExecutoriin. Ja kun otetaan huomioon, että voit työskennellä sen kanssa yhdellä koneella, mikään ei estä sinua käyttämästä Selleryä edes palvelimella, joka "rehellisesti sanottuna ei tietenkään koskaan mene tuotantoon!"
- Ei käytössä sisäänrakennetut työkalut:
- Liitännät tallentaa huoltotiedot,
- SLA Miss vastata tehtäviin, jotka eivät sujuneet ajoissa,
- xcom metatietojen vaihtoa varten (sanoin metadata!) dag-tehtävien välillä.
- Postin väärinkäyttö. No, mitä voin sanoa? Hälytykset asetettiin kaikille kaatuneiden tehtävien toistuville. Nyt työssäni Gmailissa on yli 90 100 sähköpostia Airflowsta, ja verkkosähköpostin kuono kieltäytyy poimimasta ja poistamasta yli XNUMX:ta kerrallaan.
Lisää sudenkuoppia:
Lisää automaatiotyökaluja
Jotta voisimme työskennellä entistä enemmän päällämme eikä käsillämme, Airflow on valmistanut meille seuraavan:
- - hänellä on edelleen kokeellisen asema, mikä ei estä häntä työskentelemästä. Sen avulla voit paitsi saada tietoa päivämääristä ja tehtävistä, myös pysäyttää/aloittaa päiväpäivien, luoda DAG Run tai poolin.
- - Komentorivin kautta on saatavilla monia työkaluja, jotka eivät ole vain hankalia käyttää WebUI:n kautta, mutta ne ovat yleensä poissa. Esimerkiksi:
backfilltarvitaan tehtäväinstanssien käynnistämiseen uudelleen.
Esimerkiksi analyytikot tulivat ja sanoivat: "Ja sinulla, toveri, on hölynpölyä tiedoissa 1. - 13. tammikuuta! Korjaa, korjaa, korjaa, korjaa!" Ja sinä olet sellainen liesi:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Peruspalvelu:
initdb,resetdb,upgradedb,checkdb. run, jonka avulla voit suorittaa yhden ilmentymätehtävän ja jopa tehdä pisteitä kaikista riippuvuuksista. Lisäksi voit ajaa sen kauttaLocalExecutor, vaikka sinulla olisi selleriklusteri.- Tekee aika lailla samaa
test, vain myös pohjassa ei kirjoita mitään. connectionsmahdollistaa yhteyksien massan luomisen kuoresta.
- - melko kova vuorovaikutustapa, joka on tarkoitettu laajennuksille, eikä kuhise siinä pienillä käsillä. Mutta kuka estää meitä menemästä
/home/airflow/dags, Suoritaipythonja alkaa sotkemaan? Voit esimerkiksi viedä kaikki yhteydet seuraavalla koodilla:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Yhdistetään Airflow-metatietokantaan. En suosittele kirjoittamista sille, mutta tehtävätilojen saaminen eri mittareille voi olla paljon nopeampaa ja helpompaa kuin minkä tahansa API:n käyttäminen.
Oletetaan, että kaikki tehtävämme eivät ole idempotentteja, mutta ne voivat joskus kaatua, ja tämä on normaalia. Mutta muutama tukos on jo epäilyttävää, ja se olisi tarpeen tarkistaa.
Varo SQL:ää!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
viittaukset
Ja tietysti, ensimmäiset kymmenen linkkiä Googlen liikkeeseenlaskusta ovat Airflow-kansion sisältö kirjanmerkeistäni.
- - tietysti meidän on aloitettava toimistosta. asiakirjoja, mutta kuka lukee ohjeet?
- - No, ainakin lue tekijöiden suositukset.
- - aivan alku: käyttöliittymä kuvissa
- - peruskäsitteet on kuvattu hyvin, jos (yhtäkkiä!) et ymmärtänyt minusta jotain.
- - lyhyt opas Airflow-klusterin perustamiseen.
- - melkein sama mielenkiintoinen artikkeli, paitsi ehkä enemmän formalismia ja vähemmän esimerkkejä.
- - yhteistyöstä Sellerin kanssa.
- - tehtävien idempotenssista, tunnuksella lataamisesta päivämäärän sijaan, muunnoksista, tiedostorakenteesta ja muista mielenkiintoisista asioista.
- - tehtävien riippuvuudet ja Trigger Rule, jotka mainitsin vain ohimennen.
- - kuinka voittaa joitain "toimia suunnitellusti" ajastimessa, ladata kadonneita tietoja ja priorisoida tehtäviä.
- — hyödyllisiä SQL-kyselyitä Airflow-metatietoihin.
- - Siellä on hyödyllinen osio mukautetun anturin luomisesta.
- - mielenkiintoinen lyhyt huomautus infrastruktuurin rakentamisesta AWS for Data Science -palveluun.
- - yleisiä virheitä (kun joku ei vieläkään lue ohjeita).
- - hymyile kuinka ihmiset kätkevät salasanojen tallentamista, vaikka voit käyttää vain Yhteyksiä.
- - implisiittinen DAG-edelleenlähetys, kontekstin heitto funktioissa, jälleen riippuvuuksista ja myös tehtävien käynnistysten ohittamisesta.
- - käytöstä
default argumentsиparamsmalleissa sekä muuttujissa ja yhteyksissä. - - tarina siitä, kuinka suunnittelija valmistautuu Airflow 2.0:aan.
- - hieman vanhentunut artikkeli klusterin käyttöönotosta
docker-compose. - - dynaamisia tehtäviä käyttämällä malleja ja kontekstin edelleenlähetystä.
- — vakio- ja mukautetut ilmoitukset postitse ja Slackin kautta.
- - Haaroitustehtävät, makrot ja XCom.
Ja artikkelissa käytetyt linkit:
- - paikkamerkit käytettävissä malleissa.
- - Yleisiä virheitä luotaessa dageja.
- -
docker-composekokeilua, virheenkorjausta ja muuta varten. - — Python-kääre Telegram REST API:lle.
Lähde: will.com




