Te tukanga ETL mo te tiki raraunga mai i te imeera i Apache Airflow

Te tukanga ETL mo te tiki raraunga mai i te imeera i Apache Airflow

Ahakoa te nui o te hangarau e whanake ana, ka whai tonu te whanaketanga e te aho o nga huarahi tawhito. Ko tenei pea na te ngawari o te whakawhiti, nga ahuatanga tangata, nga hiahia hangarau, tetahi atu mea ranei. I roto i te waahi o te tukatuka raraunga, ko nga mea tino kitea i tenei waahanga ko nga puna raraunga. Ahakoa te nui o te moemoea mo te whakakore i tenei, i tenei wa ka tukuna etahi o nga raraunga i roto i nga karere tere me nga imeera, kaua e whakahua i nga whakatakotoranga tawhito. Ka tono ahau ki a koe ki te titiro ki tetahi o nga whiringa mo Apache Airflow, e whakaatu ana me pehea e taea ai e koe te kohi raraunga mai i nga imeera.

prehistory

He maha nga raraunga ka tukuna tonu ma te imeera, mai i nga whakawhitiwhiti korero ki nga paerewa o te taunekeneke i waenga i nga kamupene. He pai mena ka taea e koe te tuhi i tetahi atanga ki te whiwhi raraunga, ki te tuu ranei i nga tangata ki te tari ka uru atu enei korero ki nga puna watea ake, engari ko te nuinga o nga wa kare pea i reira. Ko te mahi motuhake i pa ki a au ko te hono i tetahi punaha CRM rongonui ki tetahi whare putunga raraunga, katahi ki te punaha OLAP. I nga wa o mua, mo ta maatau kamupene, he pai te whakamahi i tenei punaha ki tetahi waahanga pakihi. Na reira, i tino hiahia nga tangata katoa kia kaha ki te whakahaere me nga raraunga mai i tenei punaha tuatoru ano hoki. Tuatahi, ko te tikanga, i tirotirohia te tupono ki te tiki raraunga mai i te API tuwhera. Ko te mea pouri, karekau te API i kapi i te whiwhi i nga raraunga e tika ana, a, i roto i nga kupu ngawari, he kopikopiko te nuinga, karekau te tautoko hangarau i hiahia, kaore ranei i taea ki te tutuki i te haurua ki te whakarato i nga mahi matawhānui. Engari i whakawhiwhia e tenei punaha te whai waahi ki te tango i nga raraunga ngaro ma te imeera i te ahua o te hono ki te tango i te puranga.

Me tohu ehara i te mea ko tenei anake te keehi i hiahia ai tetahi pakihi ki te kohi raraunga mai i nga imeera me nga karere tere. Heoi, i tenei keehi, kaore e taea e matou te awe i tetahi kamupene tuatoru e whakarato ana i tetahi waahanga o nga raraunga i tenei huarahi anake.

Apache Rererangi

Hei hanga i nga tukanga ETL, he maha nga wa ka whakamahia e matou te Apache Airflow. Kia pai ake ai te mohio o te kaipanui ki tenei hangarau me pehea te ahua o te horopaki me te whanui, ka whakaahuahia e au etahi korero whakataki e rua.

Ko te Apache Airflow he papaahi kore utu e whakamahia ana ki te hanga, ki te whakahaere, ki te aro turuki i nga tukanga ETL (Tango-Whakawhiti-Uta) i roto i te Python. Ko te ariā matua i roto i te Airflow he kauwhata acyclic tika, ko nga pou o te kauwhata he tukanga motuhake, ko nga tapa o te kauwhata ko te rere o te mana whakahaere, o nga korero ranei. Ka taea e te tukanga te karanga noa i tetahi mahi Python, ka nui ake ranei te arorau uaua o te karanga raupapa i nga mahi maha i roto i te horopaki o te akomanga. Mo nga mahi tino noa, he maha nga whanaketanga kua rite ka taea te whakamahi hei tukanga. Ko enei whanaketanga ko:

  • kaiwhakahaere - mo te nuku raraunga mai i tetahi waahi ki tetahi atu, hei tauira, mai i te ripanga raraunga ki te whare putunga raraunga;
  • pūoko - ki te tatari mo te puta mai o tetahi huihuinga me te whakatika i te rere mana ki nga pou o te kauwhata o muri mai;
  • matau - mo nga mahi taumata-iti, hei tauira, mo te tiki raraunga mai i te ripanga putunga raraunga (i whakamahia i roto i nga korero);
  • me te pera.

Kare e tika te whakamaarama mo te Apache Airflow i roto i tenei tuhinga. Ka taea te kite i nga kupu whakataki poto konei ranei konei.

Matau mo te tiki raraunga

Tuatahi, hei whakaoti rapanga me tuhi matau e taea ai e tatou:

  • hono ki te īmēra;
  • kimihia te reta e hiahia ana koe;
  • whiwhi raraunga mai i te reta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Ko te arorau tenei: ka hono tatou, ka kimihia te reta whakamutunga e tino whai kiko ana, mena kei etahi atu, ka warewarehia e tatou. Ka whakamahia tenei mahi na te mea kei roto i nga karere o muri mai nga raraunga katoa mai i nga korero o mua. Ki te kore e penei, ka taea e koe te whakahoki i te huinga o nga reta katoa, ki te tukatuka ranei i te tuatahi me te toenga i runga i te paanui e whai ake nei. I te nuinga o nga wa, ka whakawhirinaki nga mea katoa ki te mahi.

Ka taapirihia e matou nga mahi awhina e rua ki te matau: mo te tango i tetahi konae me te tango i tetahi konae ma te whakamahi hononga mai i te reta. Ma te ara, ka taea te whakauru ki roto i te kaiwhakahaere, ka whakawhirinaki ki te maha o te whakamahinga o tenei mahi. He aha atu te taapiri ki te matau, ano, ka whakawhirinaki ki te mahi: mena ka tae tonu mai nga konae ki roto i te reta, katahi ka taea e koe te tango i nga taapiri ki te reta, mena ka tae mai nga raraunga ki te reta, katahi ka hiahia koe ki te tarai i te reta, aha atu. I roto i taku keehi, ka tae mai te reta me te hono kotahi ki te puranga, me tuu e au ki tetahi waahi ka timata ano te mahi.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

He maamaa te waehere, no reira kare e hiahiatia he whakamaarama taapiri. Ka korero noa ahau ki a koe mo te raina makutu imap_conn_id. Kei te rongoa a Apache Airflow i nga tawhā hononga (takiuru, kupuhipa, wāhitau me etahi atu tawhā), ka taea te uru atu ma te tautohu aho. Ki te titiro, he penei te ahua o te whakahaere hononga

Te tukanga ETL mo te tiki raraunga mai i te imeera i Apache Airflow

Pūoko mo te tatari mo nga raraunga

I te mea kua mohio tatou ki te hono me te whiwhi raraunga mai i te mēra, ka taea e tatou te tuhi i tetahi pukoro hei tatari mo tera. I taku keehi, ko te tuhi tonu i tetahi kaitoha ka whakahaere i nga raraunga, mena he, kaore i mahi, na te mea ko etahi atu tukanga e mahi ana i runga i nga raraunga i riro mai i te mēra, tae atu ki nga mea e tango ana i nga raraunga e pa ana mai i etahi atu puna (API, waea, inenga paetukutuku, etc.) etc.). Ka hoatu e ahau he tauira. Kua puta mai he kaiwhakamahi hou ki te punaha CRM, a kaore ano matou i te mohio mo tana UUID. Na, ka ngana ana matou ki te whiwhi raraunga mai i te waea SIP, ka whiwhi waea e herea ana ki tana UUID, engari kaore e taea e matou te tiaki tika me te whakamahi. I roto i enei patai, he mea nui kia maumahara ki te whakawhirinakitanga o nga raraunga, ina koa no nga puna rereke. Ko te tikanga, he iti rawa enei tikanga hei pupuri i te pono o nga raraunga, engari i etahi wa ka tika. A ko te nama taonga i te wa e mangere ana he poauau hoki.

No reira, ka whakarewahia e ta maatau pukoro nga pou o muri o te kauwhata mena he korero hou kei roto i te mēra, me te tohu hoki i nga korero o mua he koretake.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Te whiwhi me te whakamahi raraunga

Hei whiwhi me te tukatuka raraunga, ka taea e koe te tuhi i tetahi kaiwhakahaere motuhake, ka taea ranei e koe te whakamahi i nga mea kua rite. I te mea he iti tonu te arorau - ki te tango raraunga mai i tetahi reta, na hei tauira ka kii ahau i te PythonOperator paerewa

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Ma te ara, mena kei runga ano to imeera umanga i te mail.ru, kare e taea e koe te rapu reta ma te kaupapa, kaituku, aha atu. I oati ratou ki te whakauru mai ano i te tau 2016, engari kua huri ke o raatau whakaaro. I whakaotihia e ahau tenei raruraru ma te hanga i tetahi kōpaki motuhake mo nga reta e tika ana me te whakarite i tetahi tātari i roto i te atanga tukutuku mēra mo nga reta e tika ana. No reira, ko nga reta e tika ana me nga tikanga rapu, i taku keehi, ka uru noa (UNSEEN) ki tenei kōpaki.

Hei whakarapopototanga, kei a maatau te raupapa e whai ake nei: ka tirohia mena he reta hou e tutuki ana ki nga tikanga; mena kei reira, katahi ka tango i te puranga ma te whakamahi i te hono mai i te reta whakamutunga.
I raro i nga ellipses whakamutunga, kaore e whakakorehia ka wetewetehia tenei puranga, ka horoia nga raraunga mai i te puranga ka tukatukahia, a, ko te mutunga, ka haere atu tenei mea katoa ki te pipeline tukanga ETL, engari kei tua atu i te waahanga. o te tuhinga. Mena he mea whakamere me te whai hua, ka koa ahau ki te whakamaarama tonu i nga otinga ETL me o raatau waahanga mo Apache Airflow.

Source: will.com

Tāpiri i te kōrero