Pròiseas ETL airson dàta fhaighinn bho phost-d ann an Apache Airflow

Pròiseas ETL airson dàta fhaighinn bho phost-d ann an Apache Airflow

Ge bith dè an ìre de theicneòlas a bhios a’ leasachadh, tha sreath de dhòighean-obrach seann-fhasanta an-còmhnaidh air cùl leasachadh. Dh’ fhaodadh seo a bhith mar thoradh air eadar-ghluasad rèidh, factaran daonna, feumalachdan teicneòlais, no rudeigin eile. Ann an raon giollachd dàta, is e stòran dàta an fheadhainn as nochdte sa phàirt seo. Ge bith dè an ìre gu bheil sinn a ’bruadar mu bhith a’ faighinn cuidhteas seo, ach gu ruige seo tha pàirt den dàta air a chuir ann an teachdairean sa bhad agus puist-d, gun luaidh air cruthan nas sine. Tha mi a 'toirt cuireadh dhut aon de na roghainnean airson Apache Airflow a thoirt às a chèile, a' sealltainn mar as urrainn dhut dàta a thoirt bho phuist-d.

ro-eachdraidheil

Tha tòrr dàta fhathast air a ghluasad tro phost-d, bho chonaltradh eadar-phearsanta gu ìrean eadar-obrachaidh eadar companaidhean. Tha e math ma tha e comasach eadar-aghaidh a sgrìobhadh airson dàta fhaighinn no daoine a chuir san oifis a chuireas a-steach am fiosrachadh seo gu stòran nas goireasaiche, ach gu tric is dòcha nach bi seo comasach. B’ e an obair shònraichte a bha romham a bhith a’ ceangal an t-siostam CRM cliùiteach ris an taigh-bathair dàta, agus an uairsin ris an t-siostam OLAP. Tha e mar sin a thachair gu h-eachdraidheil gun robh cleachdadh an t-siostam seo goireasach don chompanaidh againn ann an raon sònraichte de ghnìomhachas. Mar sin, bha a h-uile duine dha-rìribh ag iarraidh a bhith comasach air obrachadh le dàta bhon t-siostam treas-phàrtaidh seo cuideachd. An toiseach, gu dearbh, chaidh sgrùdadh a dhèanamh air comasachd dàta fhaighinn bho API fosgailte. Gu mì-fhortanach, cha robh an API a’ còmhdach a bhith a’ faighinn a h-uile dàta riatanach, agus, gu sìmplidh, bha e cam ann an iomadh dòigh, agus cha robh taic theicnigeach ag iarraidh no cha b’ urrainn dhaibh coinneachadh letheach slighe gus comas-gnìomh nas coileanta a thoirt seachad. Ach thug an siostam seo cothrom bho àm gu àm an dàta a bha a dhìth fhaighinn tron ​​​​phost ann an cruth ceangal airson an tasglann a luchdachadh sìos.

Bu chòir a thoirt fa-near nach b’ e seo an aon chùis anns an robh an gnìomhachas airson dàta a chruinneachadh bho phuist-d no teachdairean sa bhad. Ach, anns a 'chùis seo, cha b' urrainn dhuinn buaidh a thoirt air companaidh treas-phàrtaidh a bheir seachad pàirt den dàta a-mhàin san dòigh seo.

sruth-adhair apache

Gus pròiseasan ETL a thogail, mar as trice bidh sinn a’ cleachdadh Apache Airflow. Gus am faigh leughadair nach eil eòlach air an teicneòlas seo tuigse nas fheàrr air mar a tha e a’ coimhead sa cho-theacsa agus san fharsaingeachd, bheir mi cunntas air fear tòiseachaidh no dhà.

Tha Apache Airflow na àrd-ùrlar an-asgaidh a thathas a’ cleachdadh gus pròiseasan ETL (Extract-Transform-Loading) a thogail, a chuir an gnìomh agus a sgrùdadh ann am Python. Is e am prìomh bhun-bheachd ann an Airflow graf acyclic stiùirichte, far a bheil vertices a’ ghraf nam pròiseasan sònraichte, agus is e oirean a’ ghraf an sruth smachd no fiosrachaidh. Faodaidh pròiseas dìreach gnìomh Python sam bith a ghairm, no faodaidh loidsig nas iom-fhillte a bhith aige bho bhith a’ gairm grunn ghnìomhan ann an co-theacsa clas. Airson na h-obraichean as trice, tha mòran leasachaidhean deiseil ann mu thràth a dh'fhaodar a chleachdadh mar phròiseasan. Am measg nan leasachaidhean sin tha:

  • gnìomhaichean - airson dàta a ghluasad bho aon àite gu àite eile, mar eisimpleir, bho chlàr stòr-dàta gu taigh-bathair dàta;
  • mothachaidhean - airson a bhith a 'feitheamh airson tachartas sònraichte agus a' stiùireadh an t-sruth smachd gu na h-earrainnean às dèidh sin den ghraf;
  • dubhan - airson obraichean aig ìre nas ìsle, mar eisimpleir, gus dàta fhaighinn bho chlàr stòr-dàta (air a chleachdadh ann an aithrisean);
  • agus mar sin air adhart.

Bhiodh e neo-iomchaidh cunntas mionaideach a thoirt air Apache Airflow san artaigil seo. Faodar ro-ràdh goirid fhaicinn an seo no an seo.

Hook airson dàta fhaighinn

An toiseach, gus an duilgheadas fhuasgladh, feumaidh sinn dubhan a sgrìobhadh leis am b’ urrainn dhuinn:

  • ceangail ri post-d
  • lorg an litir cheart
  • dàta fhaighinn bhon litir.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Is e seo an loidsig: bidh sinn a’ ceangal, lorg an litir mu dheireadh as buntainniche, ma tha feadhainn eile ann, bidh sinn gan leigeil seachad. Tha an gnìomh seo air a chleachdadh, oir tha a h-uile dàta bho fheadhainn nas tràithe ann an litrichean nas fhaide air adhart. Mura h-eil seo fìor, faodaidh tu sreath de na litrichean uile a thilleadh, no a 'chiad fhear a phròiseasadh, agus an còrr air an ath bhealaich. San fharsaingeachd, tha a h-uile dad, mar a tha e an-còmhnaidh, an urra ris an obair.

Cuiridh sinn dà ghnìomh taiceil ris an dubhan: airson faidhle a luchdachadh sìos agus airson faidhle a luchdachadh sìos a’ cleachdadh ceangal bho phost-d. Air an t-slighe, faodaidh iad a bhith air an gluasad chun a 'ghnìomhaiche, tha e an crochadh air cho tric' sa bhith a 'cleachdadh an gnìomh seo. Bidh dè eile a chuireas ris an dubhan, a-rithist, an urra ris a ’ghnìomh: ma gheibhear faidhlichean sa bhad san litir, faodaidh tu ceanglachan a luchdachadh sìos don litir, ma gheibhear an dàta san litir, feumaidh tu an litir a pharsadh, etc. Anns a 'chùis agam, tha an litir a' tighinn le aon cheangal ris an tasglann, a dh'fheumas mi a chuir ann an àite sònraichte agus tòiseachadh air a 'phròiseas giollachd eile.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Tha an còd sìmplidh, agus mar sin cha mhòr gu bheil feum air tuilleadh mìneachaidh. Innsidh mi dhut mun loidhne draoidheil imap_conn_id. Bidh Apache Airflow a’ stòradh paramadairean ceangail (logadh a-steach, facal-faire, seòladh, agus paramadairean eile) a gheibhear thuige le aithnichear sreang. Gu fradharcach, tha riaghladh ceangail a ’coimhead mar seo

Pròiseas ETL airson dàta fhaighinn bho phost-d ann an Apache Airflow

Sensor feitheamh airson dàta

Leis gu bheil fios againn mu thràth mar a cheanglas agus a gheibh sinn dàta bhon phost, is urrainn dhuinn a-nis sensor a sgrìobhadh gus feitheamh riutha. Anns a ’chùis agam, cha do dh’ obraich e gus gnìomhaiche a sgrìobhadh sa bhad a làimhsicheas an dàta, ma tha gin ann, leis gu bheil pròiseasan eile ag obair stèidhichte air an dàta a gheibhear bhon phost, a ’toirt a-steach an fheadhainn a bheir dàta co-cheangailte bho stòran eile (API, fòn. , meatrach lìn, msaa) msaa). Bheir mi eisimpleir dhut. Tha cleachdaiche ùr air nochdadh san t-siostam CRM, agus chan eil fios againn fhathast mun UUID aige. An uairsin, nuair a dh'fheuchas sinn ri dàta fhaighinn bho fhòn SIP, gheibh sinn fiosan ceangailte ris an UUID aige, ach cha bhith e comasach dhuinn an sàbhaladh agus an cleachdadh gu ceart. Ann an leithid de chùisean, tha e cudromach cuimhneachadh air eisimeileachd an dàta, gu sònraichte ma tha iad bho dhiofar thùsan. Tha iad sin, gu dearbh, nan ceumannan gu leòr gus ionracas dàta a ghleidheadh, ach ann an cuid de chùisean tha iad riatanach. Tha, agus tha e mì-reusanta cuideachd a bhith a’ gabhail fois ann an goireasan.

Mar sin, cuiridh an sensor againn vertices às deidh sin den ghraf air bhog ma tha fiosrachadh ùr anns a’ phost, agus comharraichidh e cuideachd gu bheil am fiosrachadh a bh ’ann roimhe neo-iomchaidh.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Bidh sinn a’ faighinn agus a’ cleachdadh dàta

Gus dàta fhaighinn agus a phròiseasadh, faodaidh tu gnìomhaiche air leth a sgrìobhadh, faodaidh tu feadhainn deiseil a chleachdadh. Gu ruige seo tha an loidsig beag - gus dàta a thoirt bhon litir, mar eisimpleir, tha mi a 'moladh an PythonOperator àbhaisteach

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Air an t-slighe, ma tha am post corporra agad cuideachd air mail.ru, cha bhith e comasach dhut litrichean a lorg le cuspair, neach-cuiridh, msaa. Air ais ann an 2016, gheall iad a thoirt a-steach, ach a rèir coltais dh’ atharraich iad an inntinn. Dh ’fhuasgail mi an duilgheadas seo le bhith a’ cruthachadh pasgan air leth airson na litrichean riatanach agus a ’stèidheachadh sìoltachan airson na litrichean riatanach ann an eadar-aghaidh lìn a’ phuist. Mar sin, is e dìreach na litrichean agus na cumhaichean riatanach airson an sgrùdaidh, na mo chùis, dìreach (UNSEEN) a dhol a-steach don phasgan seo.

A 'toirt geàrr-chunntas, tha an t-sreath a leanas againn: bidh sinn a' sgrùdadh a bheil litrichean ùra ann a tha a 'coinneachadh ris na cumhaichean, ma tha, an uairsin bidh sinn a' luchdachadh sìos an tasglann a 'cleachdadh a' cheangal bhon litir mu dheireadh.
Fo na dotagan mu dheireadh, tha e air fhàgail a-mach gun tèid an tasglann seo a dhì-phapadh, thèid an dàta bhon tasglann a ghlanadh agus a phròiseasadh, agus mar thoradh air an sin, thèid an rud gu lèir nas fhaide gu loidhne-phìoban pròiseas ETL, ach tha seo mar-thà nas fhaide na farsaingeachd an artaigil. Ma thionndaidh e a-mach inntinneach agus feumail, leanaidh mi gu toilichte a’ toirt cunntas air fuasglaidhean ETL agus na pàirtean aca airson Apache Airflow.

Source: www.habr.com

Cuir beachd ann