Hei, olen Dmitry Logvinenko - Vezet-yritysryhmän Analytics-osaston tietoinsinööri.
Kerron sinulle upeasta työkalusta ETL-prosessien kehittämiseen - Apache Airflow. Mutta Airflow on niin monipuolinen ja monipuolinen, että sitä kannattaa tarkastella lähemmin, vaikka et olisikaan mukana tietovirroissa, mutta joutuisit ajoittain käynnistämään prosesseja ja seuraamaan niiden toteutumista.
Ja kyllä, en vain kerro, vaan myös näytän: ohjelmassa on paljon koodia, kuvakaappauksia ja suosituksia.
Mitä yleensä näet, kun googletat sanaa Airflow / Wikimedia Commons
- vain parempi, ja se tehtiin täysin eri tarkoituksiin, nimittäin (kuten ennen kattia on kirjoitettu):
tehtävien suorittaminen ja valvonta rajoittamattomalla määrällä koneita (niin monta selleriä/kubernettiä ja omatunto sallii)
dynaamisen työnkulun luomisen kanssa erittäin helposti kirjoitettavasta ja ymmärrettävästä Python-koodista
ja kyky yhdistää tietokannat ja API:t toisiinsa käyttämällä sekä valmiita komponentteja että kotitekoisia laajennuksia (mikä on erittäin yksinkertaista).
Käytämme Apache Airflowta seuraavasti:
keräämme tietoja eri lähteistä (monet SQL Server- ja PostgreSQL-instanssit, erilaiset API:t sovellusmittareineen, jopa 1C) DWH:ssa ja ODS:ssä (meillä on Vertica ja Clickhouse).
kuinka edistynyt cron, joka käynnistää ODS:n tietojen konsolidointiprosessit ja valvoo myös niiden ylläpitoa.
Viime aikoihin asti tarpeemme kattoi yksi pieni palvelin, jossa oli 32 ydintä ja 50 Gt RAM-muistia. Airflow:ssa tämä toimii:
lisää 200 päivää (itse asiassa työnkulkuja, joihin täyttimme tehtäviä),
jokaisessa keskimäärin 70 tehtävää,
tämä hyvyys alkaa (myös keskimäärin) kerran tunnissa.
Ja kuinka laajennimme, kirjoitan alla, mutta nyt määritellään se yber-ongelma, jonka ratkaisemme:
Alkuperäisiä SQL-palvelimia on kolme, joissa kussakin on 50 tietokantaa - yhden projektin esiintymiä, vastaavasti, niillä on sama rakenne (melkein kaikkialla, mua-ha-ha), mikä tarkoittaa, että jokaisessa on Tilaukset-taulukko (onneksi taulukko, jossa se on nimi voidaan työntää mihin tahansa liiketoimintaan). Otamme tiedot lisäämällä palvelukenttiä (lähdepalvelin, lähdetietokanta, ETL-tehtävätunnus) ja heitämme ne naiivisti vaikkapa Verticaan.
Mennään!
Pääosa, käytännöllinen (ja hieman teoreettinen)
Miksi me (ja sinä)
Kun puut olivat isoja ja minä olin yksinkertainen SQL-schik yhdessä venäläisessä vähittäiskaupassa, huijasimme ETL-prosesseja eli tietovirtoja käyttämällä kahta käytettävissämme olevaa työkalua:
Informatica Power Center - erittäin levittävä järjestelmä, erittäin tuottava, omalla laitteistollaan, omalla versioillaan. Käytin Jumala varjelkoon 1% sen kyvyistä. Miksi? Ensinnäkin tämä käyttöliittymä, jostain 380-luvulta, painoi meitä henkisesti. Toiseksi tämä konsti on suunniteltu erittäin hienoihin prosesseihin, raivokkaaseen komponenttien uudelleenkäyttöön ja muihin erittäin tärkeisiin yritystemppuihin. Siitä tosiasiasta, että se maksaa, kuten Airbus AXNUMX:n siipi / vuosi, emme sano mitään.
Varo, kuvakaappaus voi vahingoittaa hieman alle 30-vuotiaita
SQL Server Integration Server - Käytimme tätä toveria projektin sisäisissä virroissamme. No itse asiassa: käytämme jo SQL Serveriä, ja olisi jotenkin kohtuutonta olla käyttämättä sen ETL-työkaluja. Kaikki siinä on hyvää: sekä käyttöliittymä on kaunis, että edistymisraportit... Mutta emme siksi rakasta ohjelmistotuotteita, oi, ei tätä varten. Versio se dtsx (joka on XML, jossa solmut sekoitetaan tallennuksen yhteydessä) voimme, mutta mitä järkeä on? Mitä jos tekisit tehtäväpaketin, joka vetää satoja taulukoita palvelimelta toiselle? Kyllä, mikä sata, etusormesi putoaa kahdestakymmenestä palasta hiiren painiketta napsauttamalla. Mutta hän näyttää ehdottomasti muodikkaammalta:
Etsimme varmasti ulospääsyä. Tapaus jopa melkein tuli itse kirjoitettuun SSIS-pakettigeneraattoriin ...
…ja sitten sain uuden työpaikan. Ja Apache Airflow ohitti minut siinä.
Kun sain selville, että ETL-prosessikuvaukset ovat yksinkertaista Python-koodia, en vain tanssinut ilosta. Näin tietovirrat versioitiin ja diffattiin, ja yksirakenneisten taulukoiden kaatamisesta sadoista tietokannoista yhteen kohteeseen tuli Python-koodia puolessatoista tai kahdessa 13” näytössä.
Klusterin kokoaminen
Älkäämme järjestäkö täysin päiväkotia, äläkä puhu täällä täysin itsestään selvistä asioista, kuten Airflown asentamisesta, valitsemastasi tietokannasta, Sellerin ja muista telakoissa kuvatuista tapauksista.
Jotta voimme aloittaa kokeilut välittömästi, luonnostelin docker-compose.yml jossa:
Nostetaan oikeasti Ilmavirta: Aikataulu, Web-palvelin. Flower tulee myös pyörimään siellä valvomaan Selleritehtäviä (koska se on jo työnnetty apache/airflow:1.10.10-python3.7, mutta emme välitä)
PostgreSQL, johon Airflow kirjoittaa palvelutietonsa (ajoitustiedot, suoritustilastot jne.), ja Selleri merkitsee suoritetut tehtävät;
Redis, joka toimii sellerin tehtävävälittäjänä;
Selleri työntekijä, joka osallistuu suoraan tehtävien suorittamiseen.
Kansioon ./dags lisäämme tiedostomme dags-kuvauksen kanssa. Ne poimitaan lennossa, joten koko pinoa ei tarvitse jongleerata jokaisen aivastauksen jälkeen.
Joissain paikoissa esimerkkien koodia ei näytetä kokonaan (jotta teksti ei sotkeutuisi), mutta jossain sitä muutetaan prosessin aikana. Täydelliset toimivat koodiesimerkit löytyvät arkistosta https://github.com/dm-logv/airflow-tutorial.
Sävellyksen kokoonpanossa luotin pitkälti tunnettuun kuvaan puckel/doccker-ilmavirta - muista tarkistaa se. Ehkä et tarvitse mitään muuta elämääsi.
Kaikki ilmavirran asetukset ovat käytettävissä paitsi kautta airflow.cfg, mutta myös ympäristömuuttujien kautta (kiitos kehittäjille), joita käytin ilkeästi hyväkseni.
Se ei tietenkään ole tuotantovalmis: en tietoisesti laittanut sykettä konteille, en vaivautunut turvallisuuteen. Mutta tein kokeilijoillemme sopivan minimin.
Ota huomioon, että:
dag-kansion on oltava sekä ajoittajan että työntekijöiden käytettävissä.
Sama koskee kaikkia kolmannen osapuolen kirjastoja - ne kaikki on asennettava koneille, joissa on ajastin ja työntekijät.
No nyt on yksinkertaista:
$ docker-compose up --scale worker=3
Kun kaikki on noussut, voit katsoa verkkokäyttöliittymiä:
Jos et ymmärtänyt mitään kaikista näistä "päivistä", tässä on lyhyt sanakirja:
Scheduler - Airflown tärkein setä, joka hallitsee, että robotit työskentelevät kovasti, ei ihminen: tarkkailee aikataulua, päivittää päivämääriä, käynnistää tehtäviä.
Yleensä vanhemmissa versioissa hänellä oli ongelmia muistin kanssa (ei, ei muistinmenetystä, vaan vuotoja) ja vanha parametri jäi jopa asetuksiin run_duration - sen uudelleenkäynnistysväli. Mutta nyt kaikki on hyvin.
PÄIVÄ (alias "dag") - "suunnattu asyklinen graafi", mutta tällainen määritelmä kertoo harvoille ihmisille, mutta itse asiassa se on säilö tehtäville, jotka ovat vuorovaikutuksessa toistensa kanssa (katso alla) tai analoginen paketille SSIS:ssä ja Workflow in Informaticassa .
Dagien lisäksi voi vielä olla alidageja, mutta emme todennäköisesti pääse niihin.
DAG Juoksu - alustettu dag, jolle on määritetty oma execution_date. Saman dag:n Dagranit voivat toimia rinnakkain (jos olet tehnyt tehtäväsi idempotenteiksi, tietysti).
operaattori ovat koodinpätkiä, jotka vastaavat tietyn toiminnon suorittamisesta. Operaattoreita on kolmenlaisia:
toimintakuten meidän suosikkimme PythonOperator, joka voi suorittaa minkä tahansa (kelvollisen) Python-koodin;
siirtää, jotka kuljettavat tietoja paikasta toiseen, esim. MsSqlToHiveTransfer;
anturi toisaalta sen avulla voit reagoida tai hidastaa dag:n jatkoa, kunnes tapahtuma tapahtuu. HttpSensor voi vetää määritetyn päätepisteen, ja kun haluttu vastaus odottaa, aloita siirto GoogleCloudStorageToS3Operator. Utelias mieli kysyy: "miksi? Loppujen lopuksi voit tehdä toistoja suoraan operaattorissa!” Ja sitten, jotta tehtävien joukko ei tukkeutuisi keskeytetyillä operaattoreilla. Anturi käynnistyy, tarkistaa ja sammuu ennen seuraavaa yritystä.
Tehtävä - Ilmoitetut operaattorit, tyypistä riippumatta ja jotka on liitetty dag:iin, ylennetään tehtävän arvoon.
tehtävän esimerkki - kun yleissuunnittelija päätti, että on aika lähettää tehtävät taisteluun esiintyjätyöntekijöiden päälle (oikein paikan päällä, jos käytämme LocalExecutor tai etäsolmuun, jos kyseessä on CeleryExecutor), se määrittää niille kontekstin (eli joukon muuttujia - suoritusparametreja), laajentaa komento- tai kyselymalleja ja yhdistää ne.
Luomme tehtäviä
Ensin hahmotellaan dougimme yleinen kaavio, ja sitten sukeltamme yksityiskohtiin yhä enemmän, koska käytämme joitain ei-triviaaleja ratkaisuja.
Joten yksinkertaisimmassa muodossaan tällainen dag näyttää tältä:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Ymmärretään:
Ensin tuomme tarvittavat libsit ja jotain muuta;
sql_server_ds - Onko List[namedtuple[str, str]] Airflow Connectionsin yhteyksien nimet ja tietokannat, joista otamme levymme;
dag - ilmoitus päivästämme, jonka on välttämättä oltava mukana globals(), muuten Airflow ei löydä sitä. Dougin on myös sanottava:
mikä hänen nimensä on orders - tämä nimi tulee näkyviin verkkokäyttöliittymään,
että hän työskentelee keskiyöstä XNUMX. heinäkuuta,
ja sen pitäisi käydä noin 6 tunnin välein (koville miehille täällä sen sijaan timedelta() hyväksyttäväksi cron-linja 0 0 0/6 ? * * *, vähemmän cool - ilmaus kuin @daily);
workflow() tekee päätyön, mutta ei nyt. Toistaiseksi vain upotamme kontekstimme lokiin.
Ja nyt tehtävien luomisen yksinkertainen taika:
käymme läpi lähteemme;
alustaa PythonOperator, joka suorittaa nuken workflow(). Älä unohda määrittää tehtävälle yksilöivää (dag:n sisällä) nimeä ja sitoa itse dag. Lippu provide_context puolestaan lisää funktioon lisäargumentteja, joita keräämme huolellisesti käyttämällä **context.
Toistaiseksi siinä kaikki. Mitä saimme:
uusi päivä verkkokäyttöliittymässä,
puolitoista sataa tehtävää, jotka suoritetaan rinnakkain (jos Airflow-, Sellery-asetukset ja palvelimen kapasiteetti sen sallivat).
No, melkein sain sen.
Kuka asentaa riippuvuudet?
Yksinkertaistaakseni tätä koko asiaa, menin asiaan docker-compose.yml käsittelyä requirements.txt kaikissa solmuissa.
Nyt se on poissa:
Harmaat neliöt ovat ajastimen käsittelemiä tehtävänäytteitä.
Odotellaan vähän, työt napsautetaan työntekijöiden toimesta:
Vihreät ovat tietysti saaneet työnsä onnistuneesti päätökseen. Punaiset eivät ole kovin menestyviä.
Muuten, tuotteessamme ei ole kansiota ./dags, koneiden välillä ei ole synkronointia - kaikki päivät ovat sisällä git Gitlabissamme, ja Gitlab CI jakaa päivityksiä koneille, kun ne yhdistetään master.
Vähän Flowerista
Sillä aikaa, kun työntekijät puskevat tuttejamme, muistetaan toinen työkalu, joka voi näyttää meille jotain - Kukka.
Aivan ensimmäinen sivu, jossa on yhteenveto työntekijöiden solmuista:
Intensiivisin sivu, jossa on suoritettuja tehtäviä:
Tylsin sivu välittäjämme statuksella:
Kirkkaimmalla sivulla on tehtävän tilakaaviot ja niiden suoritusaika:
Lataamme alikuormitetut
Joten kaikki tehtävät on suoritettu, voit viedä haavoittuneet pois.
Ja haavoittuneita oli monia - syystä tai toisesta. Jos Airflowa käytetään oikein, juuri nämä neliöt osoittavat, että tiedot eivät todellakaan saapuneet perille.
Sinun on katsottava lokia ja käynnistettävä kaatuneet tehtävät uudelleen.
Napsauta mitä tahansa neliötä, näemme käytettävissämme olevat toiminnot:
Voit ottaa ja poistaa kaatuneet. Toisin sanoen unohdamme, että jokin on epäonnistunut siellä, ja sama ilmentymätehtävä menee ajoittimeen.
On selvää, että tämän tekeminen hiirellä kaikilla punaisilla neliöillä ei ole kovin inhimillistä - tätä emme odota Airflowlta. Luonnollisesti meillä on joukkotuhoaseita: Browse/Task Instances
Valitaan kaikki kerralla ja nollataan, napsauta oikeaa kohdetta:
Siivouksen jälkeen taksimme näyttävät tältä (ne odottavat jo aikatauluttajaa aikatauluttamaan ne):
Liitännät, koukut ja muut muuttujat
On aika katsoa seuraavaa DAG:ta, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Ovatko kaikki koskaan päivittäneet raporttia? Tämä on taas hän: siellä on luettelo lähteistä, joista tiedot saa; siellä on luettelo, mihin sijoittaa; älä unohda puhua, kun kaikki tapahtui tai meni rikki (no, tämä ei koske meitä, ei).
Käydään tiedosto läpi uudelleen ja katsotaan uusia epäselviä juttuja:
from commons.operators import TelegramBotSendMessage - mikään ei estä meitä tekemästä omia operaattoreita, joita hyödynsimme tekemällä pienen kääreen viestien lähettämiseen Unblockedille. (Puhumme lisää tästä operaattorista alla);
default_args={} - dag voi jakaa samat argumentit kaikille operaattoreilleen;
to='{{ var.value.all_the_kings_men }}' - kenttä to meillä ei ole kovakoodattua, vaan dynaamisesti luotua käyttämällä Jinjaa ja muuttujaa sähköpostilistalla, jonka laitoin huolellisesti Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — käyttäjän käynnistämisen ehto. Meidän tapauksessamme kirje lentää pomoille vain, jos kaikki riippuvuudet ovat selvinneet onnistuneesti;
tg_bot_conn_id='tg_main' - argumentit conn_id hyväksyä luomamme yhteystunnukset Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Telegramin viestit lentävät pois vain, jos tehtäviä on kaatunut;
task_concurrency=1 - kielletään yhden tehtävän useiden tehtäväinstanssien samanaikainen käynnistäminen. Muuten saamme useiden samanaikaisen käynnistyksen VerticaOperator (katso yhtä pöytää);
report_update >> [email, tg] - kaikki VerticaOperator lähentyvät kirjeitä ja viestejä, kuten tämä:
Mutta koska ilmoittajaoperaattoreilla on erilaiset käynnistysehdot, vain yksi toimii. Puunäkymässä kaikki näyttää hieman vähemmän visuaalliselta:
Sanon muutaman sanan aiheesta makroja ja heidän ystävänsä - muuttujia.
Makrot ovat Jinja-paikkamerkkejä, jotka voivat korvata erilaisia hyödyllisiä tietoja operaattoriargumenteiksi. Esimerkiksi näin:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} laajenee kontekstimuuttujan sisältöön execution_date muodossa YYYY-MM-DD: 2020-07-14. Parasta on, että kontekstimuuttujat naulataan tiettyyn tehtäväesiintymään (neliöön puunäkymässä), ja kun käynnistetään uudelleen, paikkamerkit laajenevat samoihin arvoihin.
Määritettyjä arvoja voi tarkastella kunkin tehtävän ilmentymän Rendered-painikkeella. Kirjeen lähettäminen tapahtuu seuraavasti:
Ja niin tehtävässä viestin lähettämisessä:
Täydellinen luettelo uusimman saatavilla olevan version sisäänrakennetuista makroista on saatavilla täältä: makroviittaus
Lisäksi pluginien avulla voimme ilmoittaa omat makromme, mutta se on toinen tarina.
Ennalta määritettyjen asioiden lisäksi voimme korvata muuttujien arvot (käytin tätä jo yllä olevassa koodissa). Luodaan sisään Admin/Variables pari asiaa:
käytä vain polkua haluttuun avaimeen: {{ var.json.bot_config.bot.token }}.
Sanon kirjaimellisesti yhden sanan ja näytän yhden kuvakaappauksen aiheesta yhteys. Kaikki on alkeellista täällä: sivulla Admin/Connections luomme yhteyden, lisäämme sisäänkirjautumistunnuksemme / salasanamme ja tarkemmat parametrit sinne. Kuten tämä:
Salasanat voidaan salata (oletusarvoa perusteellisemmin) tai voit jättää yhteystyypin pois (kuten tein tg_main) - Tosiasia on, että tyyppiluettelo on langallinen Airflow-malleissa, eikä sitä voi laajentaa ilman lähdekoodeja (jos yhtäkkiä en googlettanut jotain, korjaa minua), mutta mikään ei estä meitä saamasta krediittejä. nimi.
Voit myös muodostaa useita yhteyksiä samalla nimellä: tässä tapauksessa menetelmä BaseHook.get_connection(), joka saa meille yhteyksiä nimellä, antaa satunnainen useilta kaimailta (olisi loogisempaa tehdä Round Robin, mutta jätetään se Airflow-kehittäjien omantunnon varaan).
Muuttujat ja yhteydet ovat varmasti hienoja työkaluja, mutta on tärkeää, että et menetä tasapainoa: mitkä osat virtauksistasi tallennat itse koodiin ja mitkä osat annat Airflowlle tallennettavaksi. Toisaalta voi olla kätevää muuttaa nopeasti arvoa, esimerkiksi postilaatikkoa, käyttöliittymän kautta. Toisaalta tämä on silti paluu hiiren napsautukseen, josta halusimme (minä) päästä eroon.
Yhteyksien parissa työskenteleminen on yksi tehtävistä koukut. Yleisesti ottaen Airflow-koukut ovat pisteitä, joilla se yhdistetään kolmannen osapuolen palveluihin ja kirjastoihin. Esim, JiraHook avaa asiakkaan meille vuorovaikutukseen Jiran kanssa (voit siirtää tehtäviä edestakaisin) ja SambaHook voit työntää paikallisen tiedoston smb-kohta.
Muokatun operaattorin jäsentäminen
Ja pääsimme katsomaan kuinka se on tehty TelegramBotSendMessage
Koodi commons/operators.py varsinaisen operaattorin kanssa:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Täällä, kuten kaikessa muussakin Airflowssa, kaikki on hyvin yksinkertaista:
Peritty BaseOperator, joka toteuttaa aika monta Airflow-kohtaista asiaa (katso vapaa-aikaasi)
Ilmoitetut kentät template_fields, jossa Jinja etsii makroja käsiteltäväksi.
Järjestettiin oikeat argumentit puolesta __init__(), aseta oletusasetukset tarvittaessa.
Emme myöskään unohtaneet esi-isän alustusta.
Avasi vastaavan koukun TelegramBotHooksaanut siitä asiakasobjektin.
Ohitettu (uudelleenmääritetty) menetelmä BaseOperator.execute(), joka Airfow nykii, kun on aika käynnistää operaattori - siinä toteutamme päätoimenpiteen, unohtaen kirjautua sisään. (Kirjaudumme muuten heti sisään stdout и stderr - Ilmavirta sieppaa kaiken, kääri sen kauniisti, hajottaa tarvittaessa.)
Katsotaan mitä meillä on commons/hooks.py. Tiedoston ensimmäinen osa koukulla:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
En edes tiedä mitä tässä selittäisin, huomautan vain tärkeät seikat:
Perimme, ajattelemme argumentteja - useimmissa tapauksissa se on yksi: conn_id;
Vakiomenetelmien ohittaminen: rajoitin itseäni get_conn(), jossa saan yhteysparametrit nimen mukaan ja vain osion extra (tämä on JSON-kenttä), johon laitoin (omien ohjeideni mukaan!) Telegram-bottitunnuksen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Luon esimerkin meidän TelegramBot, antamalla sille tietyn tunnuksen.
Siinä kaikki. Voit saada asiakkaan koukusta käyttämällä TelegramBotHook().clent tai TelegramBotHook().get_conn().
Ja tiedoston toinen osa, jossa teen mikrokääreen Telegram REST API:lle, jotta en vedä samaa python-telegram-bot yhdelle menetelmälle sendMessage.
Oikea tapa on laskea kaikki yhteen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - laita laajennuksessa julkiseen arkistoon ja anna se avoimelle lähteelle.
Tutkiessamme tätä kaikkea raporttipäivityksemme onnistuivat epäonnistumaan ja lähettämään minulle kanavassa virheilmoituksen. Menen katsomaan onko vika...
Jotain meni rikki koirassamme! Eikö se ole sitä mitä odotimme? Tarkalleen!
Aiotko kaataa?
Tuntuuko sinusta, että missasin jotain? Näyttää siltä, että hän lupasi siirtää tietoja SQL Serveristä Verticaan, ja sitten hän otti sen ja siirtyi aiheesta, roisto!
Tämä julmuus oli tahallista, minun piti vain tulkita sinulle terminologiaa. Nyt voit mennä pidemmälle.
Suunnitelmamme oli tämä:
Do dag
Luo tehtäviä
Katso kuinka kaunista kaikki on
Määritä istuntonumerot täyttöihin
Hae tietoja SQL Serveristä
Laita tiedot Verticaan
Kerää tilastoja
Joten, jotta tämä kaikki saadaan toimimaan, tein pienen lisäyksen meidän docker-compose.yml:
täytämme jälkimmäisen tietokannat joillakin tiedoilla (älä missään tapauksessa tutki mssql_init.py!)
Käynnistämme kaiken hyvän hieman monimutkaisemman komennon avulla kuin viime kerralla:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Voit käyttää tuotetta mitä ihmesatunnaistajamme loi Data Profiling/Ad Hoc Query:
Tärkeintä ei ole näyttää sitä analyytikoille
tarkentaa ETL-istunnot En tee, siellä kaikki on triviaalia: teemme pohjan, siinä on kyltti, käärimme kaiken kontekstinhallinnan avulla ja nyt teemme näin:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Aika on koittanut keräämme tietojamme puolentoistasadasta pöydästämme. Tehdään tämä erittäin vaatimattomien rivien avulla:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Koukun avulla saamme Airflowsta pymssql-kytkeä
Korvataan pyyntöön päivämäärän muodossa oleva rajoitus - mallimoottori heittää sen funktioon.
Ruokitaan pyyntöämme pandaskuka saa meidät DataFrame - siitä on meille hyötyä tulevaisuudessa.
Käytän substituutiota {dt} pyyntöparametrin sijaan %s ei siksi, että olisin paha Pinocchio, vaan siksi pandas ei voi käsitellä pymssql ja luistaa viimeisen params: Listvaikka hän todella haluaa tuple.
Huomaa myös, että kehittäjä pymssql päätti olla tukematta häntä enää, ja on aika muuttaa pois pyodbc.
Katsotaanpa, millä Airflow täytti funktioidemme argumentit:
Jos tietoja ei ole, ei ole mitään järkeä jatkaa. Mutta on myös outoa pitää täyttöä onnistuneena. Mutta tämä ei ole virhe. A-ah-ah, mitä tehdä?! Ja tässä mitä:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException kertoo Airflowlle, että virheitä ei ole, mutta ohitamme tehtävän. Käyttöliittymässä ei ole vihreää tai punaista neliötä, vaan vaaleanpunaista.
Tulvaistuntomme tunnus (se on erilainen jokaiseen tehtävään),
Hajautus lähteestä ja tilaustunnus - jotta lopullisessa tietokannassa (jossa kaikki kaadetaan yhteen taulukkoon) meillä on yksilöllinen tilaustunnus.
Jäljelle jää toiseksi viimeinen vaihe: kaada kaikki Verticaan. Ja kummallista kyllä, yksi upeimmista ja tehokkaimmista tavoista tehdä tämä on CSV:n kautta!
- No, - sanoi pieni hiiri, - eikös nyt
Oletko varma, että olen metsän kauhein eläin?
Julia Donaldson, The Gruffalo
Luulen, että jos kollegoillani ja minulla olisi kilpailu: kuka nopeasti luo ja käynnistää ETL-prosessin tyhjästä: he SSIS:illään ja hiirellä ja minä Airflowlla... Ja sitten vertaisimme myös huollon helppoutta... Vau, uskon, että olet samaa mieltä siitä, että voitan heidät kaikilla rintamilla!
Jos vähän vakavammin, niin Apache Airflow - kuvailemalla prosesseja ohjelmakoodin muodossa - teki työni lisää mukavampaa ja nautinnollisempaa.
Sen rajaton laajennettavuus sekä laajennuksina että skaalautuvuuden suhteen antaa sinulle mahdollisuuden käyttää Airflow'ta lähes kaikilla alueilla: jopa koko tiedonkeruun, valmistelun ja käsittelyn aikana, jopa raketteja laukaiseessa (Marsiin, kurssi).
Osa loppu, viite ja tiedot
Keräsimme sinulle
start_date. Kyllä, tämä on jo paikallinen meemi. Via Dougin tärkein argumentti start_date kaikki ohi. Lyhyesti, jos määrität start_date nykyinen päivämäärä ja schedule_interval - Jonain päivänä DAG alkaa huomenna ei aikaisemmin.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Eikä enää ongelmia.
Siihen liittyy toinen ajonaikainen virhe: Task is missing the start_date parameter, mikä useimmiten osoittaa, että olet unohtanut sitoutua dag-operaattoriin.
Kaikki yhdellä koneella. Kyllä, ja pohjat (itse Airflow ja pinnoitteemme), ja web-palvelin, ja ajastin ja työntekijät. Ja se jopa toimi. Mutta ajan myötä palveluiden tehtävien määrä kasvoi, ja kun PostgreSQL alkoi vastata indeksiin 20 sekunnissa 5 ms:n sijaan, otimme sen ja kantoimme sen pois.
LocalExecutor. Kyllä, istumme edelleen sen päällä ja olemme jo tulleet kuilun reunalle. LocalExecutor on riittänyt meille toistaiseksi, mutta nyt on aika laajentua ainakin yhdellä työntekijällä ja meidän on tehtävä lujasti töitä siirtyäksemme CeleryExecutoriin. Ja kun otetaan huomioon, että voit työskennellä sen kanssa yhdellä koneella, mikään ei estä sinua käyttämästä Selleryä edes palvelimella, joka "rehellisesti sanottuna ei tietenkään koskaan mene tuotantoon!"
Ei käytössä sisäänrakennetut työkalut:
Liitännät tallentaa huoltotiedot,
SLA Miss vastata tehtäviin, jotka eivät sujuneet ajoissa,
xcom metatietojen vaihtoa varten (sanoin metadata!) dag-tehtävien välillä.
Postin väärinkäyttö. No, mitä voin sanoa? Hälytykset asetettiin kaikille kaatuneiden tehtävien toistuville. Nyt työssäni Gmailissa on yli 90 100 sähköpostia Airflowsta, ja verkkosähköpostin kuono kieltäytyy poimimasta ja poistamasta yli XNUMX:ta kerrallaan.
Jotta voisimme työskennellä entistä enemmän päällämme eikä käsillämme, Airflow on valmistanut meille seuraavan:
REST API - hänellä on edelleen kokeellisen asema, mikä ei estä häntä työskentelemästä. Sen avulla voit paitsi saada tietoa päivämääristä ja tehtävistä, myös pysäyttää/aloittaa päiväpäivien, luoda DAG Run tai poolin.
CLI - Komentorivin kautta on saatavilla monia työkaluja, jotka eivät ole vain hankalia käyttää WebUI:n kautta, mutta ne ovat yleensä poissa. Esimerkiksi:
backfill tarvitaan tehtäväinstanssien käynnistämiseen uudelleen.
Esimerkiksi analyytikot tulivat ja sanoivat: "Ja sinulla, toveri, on hölynpölyä tiedoissa 1. - 13. tammikuuta! Korjaa, korjaa, korjaa, korjaa!" Ja sinä olet sellainen liesi:
run, jonka avulla voit suorittaa yhden ilmentymätehtävän ja jopa tehdä pisteitä kaikista riippuvuuksista. Lisäksi voit ajaa sen kautta LocalExecutor, vaikka sinulla olisi selleriklusteri.
Tekee aika lailla samaa test, vain myös pohjassa ei kirjoita mitään.
connections mahdollistaa yhteyksien massan luomisen kuoresta.
Python-sovellusliittymä - melko kova vuorovaikutustapa, joka on tarkoitettu laajennuksille, eikä kuhise siinä pienillä käsillä. Mutta kuka estää meitä menemästä /home/airflow/dags, Suorita ipython ja alkaa sotkemaan? Voit esimerkiksi viedä kaikki yhteydet seuraavalla koodilla:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Yhdistetään Airflow-metatietokantaan. En suosittele kirjoittamista sille, mutta tehtävätilojen saaminen eri mittareille voi olla paljon nopeampaa ja helpompaa kuin minkä tahansa API:n käyttäminen.
Oletetaan, että kaikki tehtävämme eivät ole idempotentteja, mutta ne voivat joskus kaatua, ja tämä on normaalia. Mutta muutama tukos on jo epäilyttävää, ja se olisi tarpeen tarkistaa.
Varo SQL:ää!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
viittaukset
Ja tietysti, ensimmäiset kymmenen linkkiä Googlen liikkeeseenlaskusta ovat Airflow-kansion sisältö kirjanmerkeistäni.
Pythonin Zen ja Apache Airflow - implisiittinen DAG-edelleenlähetys, kontekstin heitto funktioissa, jälleen riippuvuuksista ja myös tehtävien käynnistysten ohittamisesta.