Tsarin ETL don samun bayanai daga imel a cikin Apache Airflow

Tsarin ETL don samun bayanai daga imel a cikin Apache Airflow

Komai nawa fasaha ta haɓaka, ɗimbin hanyoyin da suka shuɗe koyaushe suna bin bayan ci gaba. Wannan na iya kasancewa saboda sauyi mai laushi, abubuwan ɗan adam, buƙatun fasaha, ko wani abu dabam. A fagen sarrafa bayanai, tushen bayanai sun fi bayyana a wannan bangare. Komai nawa muke mafarkin kawar da wannan, amma ya zuwa yanzu ana aika wani ɓangare na bayanan a cikin saƙon da aka aiko da imel nan take, ba tare da ambaton ƙarin tsarin archaic ba. Ina gayyatar ku don rarraba ɗayan zaɓuɓɓukan don Apache Airflow, yana nuna yadda zaku iya ɗaukar bayanai daga imel.

prehistory

Har yanzu ana canjawa da yawa bayanai ta hanyar imel, daga hanyoyin sadarwar mutane zuwa ma'auni na hulɗar tsakanin kamfanoni. Yana da kyau idan yana yiwuwa a rubuta hanyar sadarwa don samun bayanai ko sanya mutane a ofis waɗanda za su shigar da wannan bayanin zuwa mafi dacewa hanyoyin, amma sau da yawa wannan yana iya yiwuwa ba zai yiwu ba. Babban aikin da na fuskanta shine haɗa sanannen tsarin CRM zuwa ma'ajiyar bayanai, sannan zuwa tsarin OLAP. Ya faru a tarihi cewa ga kamfaninmu amfani da wannan tsarin ya dace a wani yanki na kasuwanci. Saboda haka, kowa da kowa yana so ya sami damar aiki tare da bayanai daga wannan tsarin na ɓangare na uku kuma. Da farko, ba shakka, an yi nazarin yiwuwar samun bayanai daga buɗaɗɗen API. Abin takaici, API bai rufe samun duk bayanan da ake buƙata ba, kuma, a cikin sauƙi, ya kasance ta hanyoyi da yawa karkatacce, kuma goyon bayan fasaha ba ya so ko ba zai iya saduwa da rabi ba don samar da ƙarin ayyuka masu mahimmanci. Amma wannan tsarin ya ba da damar karɓar bayanan da suka ɓace lokaci-lokaci ta hanyar wasiku ta hanyar hanyar haɗin yanar gizo don sauke bayanan.

Ya kamata a lura cewa ba wannan ba ne kawai yanayin da kasuwancin ke son tattara bayanai daga imel ko saƙon nan take. Duk da haka, a wannan yanayin, ba za mu iya rinjayar wani kamfani na ɓangare na uku ba wanda ke ba da wani ɓangare na bayanan kawai ta wannan hanya.

apache iska

Don gina hanyoyin ETL, galibi muna amfani da Apache Airflow. Domin mai karatu wanda bai saba da wannan fasaha ya kara fahimtar yadda take a cikin mahallin da kuma gaba daya ba, zan bayyana wasu guda biyu na gabatarwa.

Apache Airflow dandamali ne na kyauta wanda ake amfani dashi don ginawa, aiwatarwa da saka idanu ETL (Extract-Transform-Loading) matakai a cikin Python. Babban ra'ayi a cikin Airflow shine jadawali acyclic da aka ba da umarni, inda madaidaicin jadawali takamaiman matakai ne, kuma gefuna na jadawali shine kwararar sarrafawa ko bayanai. Tsari na iya kiran kowane aikin Python kawai, ko yana iya samun ƙarin dabaru masu rikitarwa daga kiran ayyuka da yawa a cikin mahallin aji. Don mafi yawan ayyuka na yau da kullun, an riga an sami shirye-shiryen shirye-shiryen da yawa waɗanda za a iya amfani da su azaman matakai. Irin waɗannan abubuwan sun haɗa da:

  • masu aiki - don canja wurin bayanai daga wuri guda zuwa wani, misali, daga tebur na bayanai zuwa ɗakin ajiyar bayanai;
  • na'urori masu auna firikwensin - don jiran abin da ya faru na wani abu da kuma jagorantar tafiyar da sarrafawa zuwa madaidaicin jadawali;
  • hooks - don ƙananan ayyuka, alal misali, don samun bayanai daga tebur na bayanai (amfani da bayanan);
  • da sauransu.

Ba zai dace ba a bayyana Apache Airflow daki-daki a cikin wannan labarin. Ana iya kallon taƙaitaccen gabatarwa a nan ko a nan.

Kunna don samun bayanai

Da farko, don magance matsalar, muna buƙatar rubuta ƙugiya da za mu iya:

  • haɗi zuwa imel
  • nemo wasiƙar da ta dace
  • karbi bayanai daga wasiƙar.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Ma'anar ita ce: muna haɗawa, nemo harafin da ya fi dacewa, idan akwai wasu, mu yi watsi da su. Ana amfani da wannan aikin, domin daga baya haruffa sun ƙunshi duk bayanan waɗanda suka gabata. Idan ba haka lamarin yake ba, to zaku iya dawo da jeri na dukkan haruffa, ko aiwatar da na farko, sauran kuma akan hanyar wucewa ta gaba. Gabaɗaya, komai, kamar koyaushe, ya dogara da aikin.

Mun ƙara ayyuka biyu na taimako zuwa ƙugiya: don zazzage fayil da zazzage fayil ta amfani da hanyar haɗi daga imel. Ta hanyar, ana iya motsa su zuwa mai aiki, ya dogara da yawan amfani da wannan aikin. Abin da kuma don ƙarawa zuwa ƙugiya, sake, ya dogara da aikin: idan an karɓi fayiloli nan da nan a cikin wasiƙar, to, zaku iya saukar da haɗe-haɗe zuwa wasiƙar, idan an karɓi bayanan a cikin wasiƙar, to kuna buƙatar rarraba harafin, da dai sauransu. A halin da nake ciki, wasiƙar ta zo tare da hanyar haɗi guda ɗaya zuwa ga tarihin, wanda nake buƙatar sanyawa a wani wuri kuma in fara aiwatar da ƙarin aiki.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Lambar yana da sauƙi, don haka da wuya yana buƙatar ƙarin bayani. Zan gaya muku game da layin sihiri imap_conn_id. Apache Airflow yana adana sigogin haɗi (shiga, kalmar sirri, adireshi, da sauran sigogi) waɗanda za'a iya samun dama ga mai gano kirtani. A gani, sarrafa haɗin kai yayi kama da wannan

Tsarin ETL don samun bayanai daga imel a cikin Apache Airflow

Sensor don jira bayanai

Tun da mun riga mun san yadda ake haɗawa da karɓar bayanai daga wasiku, yanzu za mu iya rubuta firikwensin don jira su. A cikin yanayina, bai yi aiki ba don rubuta ma'aikacin nan da nan wanda zai sarrafa bayanan, idan akwai, saboda sauran hanyoyin suna aiki ne bisa bayanan da aka karɓa daga wasiƙar, gami da waɗanda ke ɗaukar bayanan da suka danganci wasu hanyoyin (API, telephony). , ma'aunin yanar gizo, da sauransu). da sauransu). Zan ba ku misali. Wani sabon mai amfani ya bayyana a cikin tsarin CRM, kuma har yanzu ba mu san game da UUID ɗin sa ba. Bayan haka, lokacin ƙoƙarin karɓar bayanai daga wayar SIP, za mu sami kiran da aka ɗaure zuwa UUID ɗinsa, amma ba za mu sami damar adanawa da amfani da su daidai ba. A irin waɗannan al'amura, yana da mahimmanci a kiyaye dogaro da bayanan, musamman idan sun fito daga tushe daban-daban. Waɗannan su ne, ba shakka, rashin isassun matakan kiyaye amincin bayanai, amma a wasu lokuta suna da mahimmanci. Haka ne, kuma yin watsi da mallakar albarkatun ma rashin hankali ne.

Don haka, firikwensin mu zai ƙaddamar da madaidaicin jadawali na gaba idan akwai sabon bayani a cikin wasiku, sannan kuma alama bayanan baya a matsayin mara amfani.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Muna karba da amfani da bayanai

Don karɓa da sarrafa bayanai, zaka iya rubuta wani afareta daban, zaka iya amfani da wanda aka shirya. Tun da har yanzu dabaru ba su da mahimmanci - don ɗaukar bayanai daga harafin, alal misali, na ba da shawarar daidaitaccen PythonOperator.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Af, idan har ila yau, imel ɗin ku yana kan mail.ru, to, ba za ku iya bincika haruffa ta hanyar batu, mai aikawa, da dai sauransu ba. A cikin 2016, sun yi alkawarin gabatar da shi, amma a fili sun canza tunaninsu. Na magance wannan matsalar ta hanyar ƙirƙirar babban fayil daban don mahimman haruffa da saita tace don mahimman haruffa a cikin mahallin gidan yanar gizo. Don haka, kawai haruffa da sharuɗɗan da ake buƙata don binciken, a cikin yanayina, kawai (GARA) shiga cikin wannan babban fayil ɗin.

Taƙaice, muna da jeri mai zuwa: muna bincika idan akwai sabbin haruffa waɗanda suka cika sharuɗɗan, idan akwai, to muna zazzage ma'ajiyar ta amfani da hanyar haɗi daga harafin ƙarshe.
A karkashin ɗigo na ƙarshe, an cire cewa za a buɗe wannan ma'ajiyar, za a share bayanan da ke cikin ma'ajin kuma a sarrafa su, kuma a sakamakon haka, duk abin zai ci gaba zuwa bututun tsarin ETL, amma wannan ya riga ya wuce. iyakar labarin. Idan ya zama mai ban sha'awa kuma mai amfani, to zan ci gaba da bayyana hanyoyin ETL da sassan su don Apache Airflow.

source: www.habr.com

Add a comment