Próiseas ETL chun sonraí a fháil ó ríomhphost in Apache Airflow

Próiseas ETL chun sonraí a fháil ó ríomhphost in Apache Airflow

Is cuma cé mhéad a fhorbraíonn an teicneolaíocht, tá sraith de chur chuige atá as dáta i gcónaí taobh thiar den fhorbairt. D'fhéadfadh sé seo a bheith mar gheall ar aistriú rianúil, fachtóirí daonna, riachtanais teicneolaíochta, nó rud éigin eile. I réimse na próiseála sonraí, is iad foinsí sonraí na cinn is nochtadh sa chuid seo. Is cuma cé mhéad aisling againn fáil réidh leis seo, ach go dtí seo seoltar cuid de na sonraí i dteachtairí meandracha agus ríomhphoist, gan trácht ar fhormáidí níos ársa. Tugaim cuireadh duit ceann de na roghanna le haghaidh Apache Airflow a dhíchóimeáil, ag léiriú conas is féidir leat sonraí a ghlacadh ó ríomhphoist.

réamhstair

Aistrítear go leor sonraí fós trí ríomhphost, ó chumarsáid idirphearsanta go caighdeáin idirghníomhaíochta idir cuideachtaí. Is maith an rud é más féidir comhéadan a scríobh chun sonraí a fháil nó daoine a chur san oifig a chuirfidh an fhaisnéis seo isteach i bhfoinsí níos áisiúla, ach is minic nach féidir é seo a dhéanamh. Ba é an tasc sonrach a bhí romham ná an córas CRM iomráiteach a nascadh leis an stóras sonraí, agus ansin leis an gcóras OLAP. Tharla sé sin go stairiúil go raibh úsáid an chórais seo áisiúil dár gcuideachta i réimse áirithe gnó. Dá bhrí sin, ba mhian le gach duine a bheith in ann oibriú le sonraí ón gcóras tríú páirtí seo freisin. Ar an gcéad dul síos, ar ndóigh, rinneadh staidéar ar an bhféidearthacht sonraí a fháil ó API oscailte. Ar an drochuair, níor chlúdaigh an API na sonraí riachtanacha go léir a fháil, agus, i dtéarmaí simplí, bhí sé i go leor bealaí cam, agus ní raibh tacaíocht theicniúil ag iarraidh nó nach bhféadfadh sé freastal ar leathbhealach chun feidhmiúlacht níos cuimsithí a sholáthar. Ach thug an córas seo deis na sonraí a bhí in easnamh a fháil go tréimhsiúil tríd an bpost i bhfoirm naisc chun an chartlann a dhíluchtú.

Ba chóir a thabhairt faoi deara nárbh é seo an t-aon chás ina raibh an gnó ag iarraidh sonraí a bhailiú ó ríomhphoist nó ó theachtairí meandracha. Mar sin féin, sa chás seo, níorbh fhéidir linn tionchar a imirt ar chuideachta tríú páirtí a sholáthraíonn cuid de na sonraí ar an mbealach seo amháin.

Aershreabhadh Apache

Chun próisis ETL a thógáil, is minic a úsáidimid Apache Airflow. Ionas go mbeidh tuiscint níos fearr ag léitheoir nach bhfuil cur amach aige ar an teicneolaíocht seo ar an gcuma atá uirthi sa chomhthéacs agus go ginearálta, déanfaidh mé cur síos ar chúpla réamhrá.

Is ardán saor in aisce é Apache Airflow a úsáidtear chun próisis ETL (Extract-Transform-Loading) a thógáil, a fhorghníomhú agus a mhonatóiriú i Python. Is é an príomh-choincheap i Airflow ná graf aicyclic faoi stiúir, áit a bhfuil rinn an ghraif próisis shonracha, agus is é imill an ghraif an sreabhadh rialaithe nó faisnéise. Is féidir le próiseas go simplí glaoch ar aon fheidhm Python, nó is féidir loighic níos casta a bheith aige ó ghlaoch go seicheamhach ar roinnt feidhmeanna i gcomhthéacs aicme. I gcás na n-oibríochtaí is minice, tá go leor forbairtí réamhdhéanta ann cheana ar féidir iad a úsáid mar phróisis. I measc na bhforbairtí sin tá:

  • oibreoirí - chun sonraí a aistriú ó áit amháin go háit eile, mar shampla, ó thábla bunachar sonraí go stóras sonraí;
  • braiteoirí - chun fanacht le teagmhas áirithe a tharla agus an sreabhadh rialaithe a threorú chuig na rinn ina dhiaidh sin den ghraf;
  • crúcaí - le haghaidh oibríochtaí ar leibhéal níos ísle, mar shampla, chun sonraí a fháil ó tábla bunachar sonraí (a úsáidtear i ráitis);
  • etc

Bheadh ​​​​sé míchuí cur síos a dhéanamh go mion ar Apache Airflow san Airteagal seo. Is féidir réamhrá gairid a fheiceáil anseoanseo.

Hook chun sonraí a fháil

Ar an gcéad dul síos, chun an fhadhb a réiteach, caithfimid duán a scríobh lena bhféadfaimis:

  • ceangal le ríomhphost
  • teacht ar an litir ceart
  • sonraí a fháil ón litir.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Is é seo an loighic: déanaimid ceangal, faighimid an litir dheireanach is ábhartha, má tá cinn eile ann, déanaimid neamhaird orthu. Úsáidtear an fheidhm seo, toisc go bhfuil na sonraí go léir de na cinn níos luaithe i litreacha níos déanaí. Mura bhfuil sé seo amhlaidh, ansin is féidir leat sraith de gach litir a sheoladh ar ais, nó an chéad cheann a phróiseáil, agus an chuid eile ar an gcéad pas eile. Go ginearálta, braitheann gach rud, mar i gcónaí, ar an tasc.

Cuirimid dhá fheidhm chúnta leis an hook: chun comhad a íoslódáil agus chun comhad a íoslódáil ag baint úsáide as nasc ó ríomhphost. Dála an scéil, is féidir iad a aistriú chuig an oibreoir, braitheann sé ar mhinicíocht úsáid a bhaint as an bhfeidhmiúlacht seo. Braitheann an méid eile atá le cur leis an hook, arís, ar an tasc: má fhaightear comhaid láithreach sa litir, ansin is féidir leat ceangaltáin a íoslódáil leis an litir, má fhaightear na sonraí sa litir, ansin is gá duit an litir a pharsáil, srl. I mo chás, tagann an litir le nasc amháin chuig an gcartlann, a chaithfidh mé a chur in áit áirithe agus tús a chur leis an bpróiseas próiseála breise.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Tá an cód simplí, mar sin is ar éigean go dteastaíonn tuilleadh mínithe air. Inseoidh mé duit faoin líne draíochta imap_conn_id. Stórálann Apache Airflow paraiméadair nasc (logáil isteach, pasfhocal, seoladh, agus paraiméadair eile) ar féidir rochtain a fháil orthu le haitheantóir teaghrán. Ó thaobh amhairc de, tá cuma mar seo ar bhainistíocht nasc

Próiseas ETL chun sonraí a fháil ó ríomhphost in Apache Airflow

Braiteoir chun fanacht ar shonraí

Ós rud é go bhfuil a fhios againn cheana féin conas sonraí a nascadh agus a fháil ón bpost, is féidir linn braiteoir a scríobh anois chun fanacht leo. I mo chás, níor oibrigh sé chun oibreoir a scríobh láithreach a phróiseálfaidh na sonraí, más ann dóibh, toisc go n-oibríonn próisis eile bunaithe ar na sonraí a fuarthas ón ríomhphost, lena n-áirítear iad siúd a thógann sonraí gaolmhara ó fhoinsí eile (API, teileafónaíocht , méadracht gréasáin, etc.) etc.). Tabharfaidh mé sampla duit. Tá úsáideoir nua le feiceáil sa chóras CRM, agus níl a fhios againn fós faoina UUID. Ansin, agus muid ag iarraidh sonraí a fháil ó theileafónaíocht SIP, gheobhaidh muid glaonna ceangailte lena UUID, ach ní bheimid in ann iad a shábháil agus a úsáid i gceart. I gcúrsaí den sórt sin, tá sé tábhachtach spleáchas na sonraí a choinneáil i gcuimhne, go háirithe má thagann siad ó fhoinsí éagsúla. Ní leor bearta iad seo, ar ndóigh, chun sláine sonraí a chaomhnú, ach i gcásanna áirithe tá siad riachtanach. Sea, agus tá sé neamhréasúnach freisin acmhainní a áitiú.

Mar sin, seolfaidh ár braiteoir rinn ina dhiaidh sin den ghraf má tá faisnéis úr sa phost, agus marcálfaidh sé freisin nach mbaineann an fhaisnéis roimhe seo le hábhar.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Faighimid agus úsáidimid sonraí

Chun sonraí a fháil agus a phróiseáil, is féidir leat oibreoir ar leith a scríobh, is féidir leat cinn réidh a úsáid. Ós rud é go dtí seo tá an loighic fánach - sonraí a thógáil ón litir, mar shampla, molaim an PythonOperator caighdeánach

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Dála an scéil, má tá do phost corparáideach freisin ar mail.ru, ansin ní bheidh tú in ann cuardach a dhéanamh ar litreacha de réir ábhair, seoltóir, etc. Ar ais i 2016, gheall siad é a thabhairt isteach, ach d'athraigh siad a n-intinn de réir dealraimh. Réitigh mé an fhadhb seo trí fhillteán ar leith a chruthú do na litreacha riachtanacha agus scagaire a bhunú do na litreacha riachtanacha sa chomhéadan gréasáin ríomhphoist. Mar sin, ní théann ach na litreacha agus na coinníollacha riachtanacha don chuardach, i mo chás, go simplí (UNSEEN) isteach san fhillteán seo.

Go hachomair, tá an t-ord seo a leanas againn: déanaimid seiceáil an bhfuil litreacha nua ann a chomhlíonann na coinníollacha, má tá, ansin déanaimid an chartlann a íoslódáil ag baint úsáide as an nasc ón litir dheireanach.
Faoi na poncanna deireanacha, fágtar ar lár go ndéanfar an chartlann seo a dhíphacáil, go ndéanfar na sonraí ón gcartlann a ghlanadh agus a phróiseáil, agus mar thoradh air sin, rachaidh an rud ar fad níos faide chuig píblíne an phróisis ETL, ach tá sé seo níos faide ná sin cheana féin. scóip an ailt. Má d'éirigh sé amach suimiúil agus úsáideach, ansin beidh mé sásta leanúint ar aghaidh ag cur síos ar réitigh ETL agus a gcuid páirteanna do Apache Airflow.

Foinse: will.com

Add a comment