Tidak peduli seberapa besar perkembangan teknologi, serangkaian pendekatan yang sudah ketinggalan zaman selalu tertinggal di belakang perkembangannya. Hal ini mungkin disebabkan oleh kelancaran transisi, faktor manusia, kebutuhan teknologi, atau hal lainnya. Dalam bidang pengolahan data, sumber data merupakan hal yang paling terbuka pada bagian ini. Tidak peduli seberapa besar impian kita untuk menghilangkannya, namun sejauh ini sebagian data dikirim dalam pesan instan dan email, belum lagi format yang lebih kuno. Saya mengundang Anda untuk membongkar salah satu opsi untuk Apache Airflow, yang mengilustrasikan bagaimana Anda dapat mengambil data dari email.
prasejarah
Banyak data yang masih ditransfer melalui email, mulai dari komunikasi interpersonal hingga standar interaksi antar perusahaan. Ada baiknya jika memungkinkan untuk menulis antarmuka untuk memperoleh data atau menempatkan orang di kantor yang akan memasukkan informasi ini ke sumber yang lebih nyaman, tetapi seringkali hal ini tidak dapat dilakukan. Tugas khusus yang saya hadapi adalah menghubungkan sistem CRM yang terkenal buruk ke gudang data, dan kemudian ke sistem OLAP. Secara historis, bagi perusahaan kami, penggunaan sistem ini nyaman dalam bidang bisnis tertentu. Oleh karena itu, semua orang sangat ingin dapat beroperasi dengan data dari sistem pihak ketiga ini juga. Pertama-tama, tentu saja, kemungkinan memperoleh data dari API terbuka telah dipelajari. Sayangnya, API tidak mencakup perolehan semua data yang diperlukan, dan, secara sederhana, API tersebut dalam banyak hal tidak benar, dan dukungan teknis tidak ingin atau tidak dapat memenuhi setengah jalan untuk menyediakan fungsionalitas yang lebih komprehensif. Namun sistem ini memberikan kesempatan untuk menerima data yang hilang secara berkala melalui surat dalam bentuk link untuk membongkar arsip.
Perlu dicatat bahwa ini bukan satu-satunya kasus di mana bisnis ingin mengumpulkan data dari email atau pesan instan. Namun, dalam kasus ini, kami tidak dapat mempengaruhi perusahaan pihak ketiga yang menyediakan sebagian data hanya dengan cara ini.
Aliran Udara Apache
Untuk membangun proses ETL, kami paling sering menggunakan Apache Airflow. Agar pembaca yang belum familiar dengan teknologi ini dapat lebih memahami tampilannya dalam konteks dan secara umum, saya akan menjelaskan beberapa pengantar.
Apache Airflow adalah platform gratis yang digunakan untuk membangun, mengeksekusi, dan memantau proses ETL (Extract-Transform-Loading) dengan Python. Konsep utama dalam Aliran Udara adalah grafik asiklik berarah, di mana simpul-simpul grafik adalah proses tertentu, dan tepi-tepi grafik adalah aliran kontrol atau informasi. Suatu proses dapat dengan mudah memanggil fungsi Python apa pun, atau dapat memiliki logika yang lebih kompleks dengan memanggil beberapa fungsi secara berurutan dalam konteks suatu kelas. Untuk pengoperasian yang paling sering, sudah banyak pengembangan siap pakai yang dapat digunakan sebagai proses. Perkembangan tersebut meliputi:
- operator - untuk mentransfer data dari satu tempat ke tempat lain, misalnya, dari tabel database ke gudang data;
- sensor - untuk menunggu terjadinya peristiwa tertentu dan mengarahkan aliran kontrol ke simpul berikutnya pada grafik;
- kait - untuk operasi tingkat rendah, misalnya, untuk mendapatkan data dari tabel database (digunakan dalam pernyataan);
- dan lain-lain
Tidak tepat untuk menjelaskan Apache Airflow secara detail di artikel ini. Perkenalan singkat dapat dilihat
Kait untuk mendapatkan data
Pertama-tama, untuk menyelesaikan masalah ini, kita perlu menulis sebuah pengait yang dapat kita gunakan:
- terhubung ke email
- menemukan huruf yang tepat
- menerima data dari surat itu.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Logikanya begini: kita sambung, cari huruf terakhir yang paling relevan, kalau ada yang lain kita abaikan. Fungsi ini digunakan karena huruf-huruf selanjutnya berisi semua data huruf-huruf sebelumnya. Jika hal ini tidak terjadi, maka Anda dapat mengembalikan array yang berisi semua huruf, atau memproses huruf pertama, dan sisanya pada lintasan berikutnya. Secara umum, semuanya, seperti biasa, bergantung pada tugasnya.
Kami menambahkan dua fungsi tambahan ke hook: untuk mengunduh file dan untuk mengunduh file menggunakan tautan dari email. Omong-omong, mereka dapat dipindahkan ke operator, itu tergantung pada frekuensi penggunaan fungsi ini. Apa lagi yang harus ditambahkan ke hook, sekali lagi, tergantung pada tugasnya: jika file langsung diterima dalam surat, maka Anda dapat mengunduh lampiran pada surat itu, jika data diterima dalam surat itu, maka Anda perlu menguraikan surat itu, dll. Dalam kasus saya, surat itu dilengkapi dengan satu tautan ke arsip, yang harus saya letakkan di tempat tertentu dan memulai proses pemrosesan lebih lanjut.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Kodenya sederhana, sehingga tidak memerlukan penjelasan lebih lanjut. Langsung saja saya ceritakan tentang garis ajaib imap_conn_id. Apache Airflow menyimpan parameter koneksi (login, kata sandi, alamat, dan parameter lainnya) yang dapat diakses oleh pengidentifikasi string. Secara visual, manajemen koneksi terlihat seperti ini
Sensor untuk menunggu data
Karena kita sudah mengetahui cara menyambung dan menerima data dari email, sekarang kita dapat menulis sensor untuk menunggunya. Dalam kasus saya, tidak mungkin untuk langsung menulis operator yang akan memproses data, jika ada, karena proses lain bekerja berdasarkan data yang diterima dari surat, termasuk proses yang mengambil data terkait dari sumber lain (API, telepon , metrik web, dll.).dll). Saya akan memberi Anda sebuah contoh. Pengguna baru telah muncul di sistem CRM, dan kami masih belum mengetahui tentang UUID-nya. Kemudian, ketika mencoba menerima data dari telepon SIP, kami akan menerima panggilan yang terkait dengan UUID-nya, tetapi kami tidak dapat menyimpan dan menggunakannya dengan benar. Dalam hal ini, penting untuk mengingat ketergantungan data, terutama jika data tersebut berasal dari sumber yang berbeda. Tentu saja langkah-langkah ini tidak cukup untuk menjaga integritas data, namun dalam beberapa kasus hal ini diperlukan. Ya, dan bermalas-malasan untuk menduduki sumber daya juga tidak rasional.
Dengan demikian, sensor kami akan meluncurkan simpul grafik berikutnya jika ada informasi baru dalam email, dan juga menandai informasi sebelumnya sebagai tidak relevan.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Kami menerima dan menggunakan data
Untuk menerima dan mengolah data, Anda dapat menulis operator terpisah, Anda dapat menggunakan yang sudah jadi. Karena sejauh ini logikanya sepele - untuk mengambil data dari surat itu, misalnya, saya menyarankan PythonOperator standar
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Omong-omong, jika surat perusahaan Anda juga ada di mail.ru, Anda tidak akan bisa mencari surat berdasarkan subjek, pengirim, dll. Pada tahun 2016 lalu, mereka berjanji untuk memperkenalkannya, namun ternyata berubah pikiran. Saya memecahkan masalah ini dengan membuat folder terpisah untuk surat-surat yang diperlukan dan menyiapkan filter untuk surat-surat yang diperlukan di antarmuka web surat. Jadi, hanya huruf-huruf yang diperlukan dan ketentuan pencarian, dalam kasus saya, cukup (TIDAK DILIHAT) yang masuk ke folder ini.
Ringkasnya, kita punya urutannya sebagai berikut: kita cek apakah ada surat baru yang memenuhi syarat, jika ada maka kita download arsipnya menggunakan link dari surat terakhir.
Di bawah titik terakhir, dihilangkan bahwa arsip ini akan dibongkar, data dari arsip akan dibersihkan dan diproses, dan sebagai hasilnya, semuanya akan berlanjut ke jalur proses ETL, tetapi ini sudah melampaui ruang lingkup artikel. Jika ternyata menarik dan bermanfaat, saya dengan senang hati akan terus menjelaskan solusi ETL dan bagian-bagiannya untuk Apache Airflow.
Sumber: www.habr.com