Texnologiya qanchalik rivojlanmasin, bir qator eskirgan yondashuvlar har doim rivojlanishdan orqada qoladi. Buning sababi silliq o'tish, inson omillari, texnologik ehtiyojlar yoki boshqa narsa bo'lishi mumkin. Ma'lumotlarni qayta ishlash sohasida ma'lumotlar manbalari ushbu qismda eng ko'p ochiladi. Biz bundan xalos bo'lishni qanchalik orzu qilmasin, lekin hozirgacha ma'lumotlarning bir qismi lahzali xabarchilar va elektron pochta xabarlarida yuboriladi, bu ko'proq arxaik formatlarni eslatib o'tmaydi. Men sizni Apache Airflow opsiyalaridan birini demontaj qilishni taklif qilaman, bunda siz qanday qilib elektron pochtadan ma'lumotlarni olishingiz mumkinligi tasvirlangan.
Sana oldin
Ko'pgina ma'lumotlar hali ham elektron pochta orqali, shaxslararo muloqotdan tortib, kompaniyalar o'rtasidagi o'zaro munosabatlar standartlariga qadar uzatiladi. Ma'lumot olish uchun interfeys yozish yoki ushbu ma'lumotni qulayroq manbalarga kiritadigan odamlarni ofisga joylashtirish mumkin bo'lsa yaxshi, lekin ko'pincha bu shunchaki imkonsiz bo'lishi mumkin. Men duch kelgan aniq vazifa taniqli CRM tizimini ma'lumotlar omboriga, keyin esa OLAP tizimiga ulash edi. Tarixan shunday bo'ldiki, bizning kompaniyamiz uchun ushbu tizimdan foydalanish biznesning ma'lum bir sohasida qulay edi. Shuning uchun, har bir kishi haqiqatan ham ushbu uchinchi tomon tizimining ma'lumotlari bilan ishlashni xohladi. Albatta, birinchi navbatda, ochiq API dan ma'lumotlarni olish imkoniyati o'rganildi. Afsuski, API barcha kerakli ma'lumotlarni olishni qamrab olmadi va oddiy qilib aytganda, u ko'p jihatdan egri edi va texnik yordam yanada kengroq funksionallikni ta'minlashni xohlamadi yoki yarmini kutib ololmadi. Ammo bu tizim vaqti-vaqti bilan etishmayotgan ma'lumotlarni arxivni tushirish uchun havola ko'rinishida pochta orqali olish imkoniyatini berdi.
Shuni ta'kidlash kerakki, bu biznes elektron pochta yoki messenjerlardan ma'lumotlarni yig'moqchi bo'lgan yagona holat emas edi. Biroq, bu holda, biz ma'lumotlarning bir qismini faqat shu tarzda taqdim etadigan uchinchi tomon kompaniyasiga ta'sir qila olmadik.
Apache havo oqimi
ETL jarayonlarini qurish uchun biz ko'pincha Apache Airflow dan foydalanamiz. Ushbu texnologiya bilan tanish bo'lmagan o'quvchi uning kontekstda va umuman olganda qanday ko'rinishini yaxshiroq tushunishi uchun men bir nechta kirish qismini tasvirlab beraman.
Apache Airflow - bu Python-da ETL (Extract-Transform-Loading) jarayonlarini qurish, bajarish va kuzatish uchun foydalaniladigan bepul platforma. Havo oqimidagi asosiy tushuncha yo'naltirilgan asiklik grafik bo'lib, u erda grafikning uchlari o'ziga xos jarayonlar, grafikning qirralari esa boshqaruv yoki axborot oqimidir. Jarayon shunchaki har qanday Python funktsiyasini chaqirishi mumkin yoki u sinf kontekstida bir nechta funktsiyalarni ketma-ket chaqirishdan murakkabroq mantiqqa ega bo'lishi mumkin. Eng tez-tez bajariladigan operatsiyalar uchun jarayon sifatida ishlatilishi mumkin bo'lgan ko'plab tayyor ishlanmalar mavjud. Bunday rivojlanishlarga quyidagilar kiradi:
- operatorlar - ma'lumotlarni bir joydan ikkinchi joyga, masalan, ma'lumotlar bazasi jadvalidan ma'lumotlar omboriga o'tkazish uchun;
- datchiklar - ma'lum bir hodisaning sodir bo'lishini kutish va boshqaruv oqimini grafikning keyingi cho'qqilariga yo'naltirish uchun;
- ilgaklar - quyi darajadagi operatsiyalar uchun, masalan, ma'lumotlar bazasi jadvalidan ma'lumotlarni olish uchun (bayonotlarda ishlatiladi);
- va hokazo.
Ushbu maqolada Apache Airflow-ni batafsil tavsiflash o'rinsiz bo'lar edi. Qisqacha tanishuvlarni ko'rish mumkin
Ma'lumot olish uchun ilgak
Avvalo, muammoni hal qilish uchun biz kanca yozishimiz kerak, uning yordamida biz:
- elektron pochtaga ulanish
- to'g'ri harfni toping
- xatdan ma'lumotlarni olish.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Mantiq shunday: biz ulanamiz, oxirgi eng mos harfni topamiz, agar boshqalar bo'lsa, biz ularga e'tibor bermaymiz. Ushbu funktsiyadan foydalaniladi, chunki keyingi harflar avvalgilarining barcha ma'lumotlarini o'z ichiga oladi. Agar bunday bo'lmasa, siz barcha harflar qatorini qaytarishingiz yoki birinchisini, qolganini keyingi o'tishda qayta ishlashingiz mumkin. Umuman olganda, hamma narsa, har doimgidek, vazifaga bog'liq.
Biz kancaga ikkita yordamchi funktsiyani qo'shamiz: faylni yuklab olish va elektron pochtadan havola yordamida faylni yuklab olish uchun. Aytgancha, ular operatorga ko'chirilishi mumkin, bu ushbu funksiyadan foydalanish chastotasiga bog'liq. Kancaga yana nima qo'shish kerak, yana vazifaga bog'liq: agar fayllar xatda darhol qabul qilinsa, siz xatga qo'shimchalarni yuklab olishingiz mumkin, agar ma'lumotlar xatda olingan bo'lsa, unda siz xatni tahlil qilishingiz kerak, va boshqalar. Mening holimda, xat arxivga bitta havola bilan birga keladi, uni ma'lum bir joyga qo'yishim va keyingi ishlov berish jarayonini boshlashim kerak.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Kod oddiy, shuning uchun unga qo'shimcha tushuntirish kerak emas. Men sizga imap_conn_id sehrli chizig'i haqida gapirib beraman. Apache Airflow ulanish parametrlarini (login, parol, manzil va boshqa parametrlar) saqlaydi, ularga string identifikatori orqali kirish mumkin. Vizual ravishda ulanishni boshqarish shunday ko'rinadi
Ma'lumotni kutish uchun sensor
Pochtadan ma'lumotlarni qanday ulash va qabul qilishni allaqachon bilganimiz sababli, endi ularni kutish uchun sensor yozishimiz mumkin. Mening holatimda, agar mavjud bo'lsa, ma'lumotlarni qayta ishlaydigan operatorni darhol yozish ishlamadi, chunki boshqa jarayonlar pochtadan olingan ma'lumotlar, shu jumladan boshqa manbalardan (API, telefoniya) tegishli ma'lumotlarni oladiganlar asosida ishlaydi. , veb ko'rsatkichlari va boshqalar). va hokazo). Men sizga bir misol keltiraman. CRM tizimida yangi foydalanuvchi paydo bo'ldi va biz uning UUID haqida hali ham bilmaymiz. Keyin, SIP telefoniyasidan ma'lumotlarni olishga harakat qilganda, biz uning UUID-ga bog'langan qo'ng'iroqlarni qabul qilamiz, lekin biz ularni saqlay va to'g'ri ishlata olmaymiz. Bunday masalalarda, ayniqsa, ular turli manbalardan olingan bo'lsa, ma'lumotlarning bog'liqligini yodda tutish kerak. Bu, albatta, ma'lumotlar yaxlitligini saqlash uchun etarli choralar, lekin ba'zi hollarda ular zarur. Ha, va resurslarni egallash uchun bo'sh turish ham mantiqiy emas.
Shunday qilib, bizning sensorimiz pochtada yangi ma'lumot bo'lsa, grafikning keyingi uchlarini ishga tushiradi va oldingi ma'lumotni ahamiyatsiz deb belgilaydi.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Biz ma'lumotlarni qabul qilamiz va foydalanamiz
Ma'lumotlarni qabul qilish va qayta ishlash uchun siz alohida operator yozishingiz mumkin, siz tayyor bo'lganlardan foydalanishingiz mumkin. Mantiq hali ham ahamiyatsiz bo'lgani uchun - masalan, xatdan ma'lumotlarni olish uchun men standart PythonOperatorni taklif qilaman.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Aytgancha, agar sizning korporativ pochtangiz ham mail.ru da bo'lsa, unda siz xatlarni mavzu, jo'natuvchi va boshqalar bo'yicha qidira olmaysiz. 2016 yilda ular buni joriy etishga va'da berishdi, lekin aftidan, fikrlarini o'zgartirdilar. Men bu muammoni kerakli harflar uchun alohida papka yaratish va pochta veb-interfeysida kerakli harflar uchun filtr o'rnatish orqali hal qildim. Shunday qilib, faqat kerakli harflar va qidiruv uchun shartlar, mening holimda, oddiygina (UNSEEN) ushbu papkaga kiradi.
Xulosa qilib aytganda, biz quyidagi ketma-ketlikka egamiz: shartlarga javob beradigan yangi harflar mavjudligini tekshiramiz, agar mavjud bo'lsa, arxivni oxirgi harfdagi havoladan foydalanib yuklab olamiz.
Oxirgi nuqtalar ostida ushbu arxiv ochiladi, arxiv ma'lumotlari tozalanadi va qayta ishlanadi va natijada hamma narsa ETL jarayonining quvur liniyasiga o'tadi, ammo bu allaqachon tugagan. maqola doirasi. Agar u qiziqarli va foydali bo'lib chiqsa, men mamnuniyat bilan Apache Airflow uchun ETL echimlari va ularning qismlarini tasvirlashni davom ettiraman.
Manba: www.habr.com