Tidak kira berapa banyak teknologi berkembang, pembangunan sentiasa diikuti oleh rentetan pendekatan yang ketinggalan zaman. Ini mungkin disebabkan oleh peralihan yang lancar, faktor manusia, keperluan teknologi, atau sesuatu yang lain. Dalam bidang pemprosesan data, yang paling mendedahkan dalam bahagian ini ialah sumber data. Tidak kira betapa kita bermimpi untuk menyingkirkan ini, buat masa ini beberapa data dihantar dalam pemesej segera dan e-mel, apatah lagi format yang lebih kuno. Saya menjemput anda untuk melihat salah satu pilihan untuk Apache Airflow, menggambarkan cara anda boleh mengumpul data daripada e-mel.
prasejarah
Banyak data masih dihantar melalui e-mel, daripada komunikasi interpersonal kepada standard interaksi antara syarikat. Adalah baik jika anda boleh menulis antara muka untuk menerima data atau meletakkan orang di pejabat yang akan memasukkan maklumat ini ke sumber yang lebih mudah, tetapi selalunya kemungkinan ini mungkin tidak ada. Tugas khusus yang saya hadapi ialah menyambungkan sistem CRM yang terkenal ke gudang data, dan kemudian ke sistem OLAP. Dari segi sejarah, untuk syarikat kami, penggunaan sistem ini adalah mudah dalam bidang perniagaan tertentu. Oleh itu, semua orang benar-benar mahu dapat beroperasi dengan data daripada sistem pihak ketiga ini juga. Pertama sekali, sudah tentu, kemungkinan mendapatkan data daripada API terbuka telah diterokai. Malangnya, API tidak meliputi mendapatkan semua data yang diperlukan, dan, secara ringkas, ia sebahagian besarnya bengkok, dan sokongan teknikal tidak mahu atau tidak dapat memenuhi separuh jalan untuk menyediakan fungsi yang lebih komprehensif. Tetapi sistem ini memberi peluang untuk menerima data yang hilang secara berkala melalui e-mel dalam bentuk pautan untuk memuat turun arkib.
Perlu diingatkan bahawa ini bukan satu-satunya kes di mana perniagaan ingin mengumpul data daripada e-mel atau utusan segera. Walau bagaimanapun, dalam kes ini, kami tidak boleh mempengaruhi syarikat pihak ketiga yang menyediakan sebahagian daripada data hanya dengan cara ini.
Aliran Udara Apache
Untuk membina proses ETL, kami paling kerap menggunakan Apache Airflow. Agar pembaca yang tidak biasa dengan teknologi ini untuk lebih memahami bagaimana ia kelihatan dalam konteks dan secara umum, saya akan menerangkan beberapa pengenalan.
Apache Airflow ialah platform percuma yang digunakan untuk membina, menjalankan dan memantau proses ETL (Extract-Transform-Loading) dalam Python. Konsep utama dalam Aliran Udara ialah graf akiklik terarah, di mana bucu graf adalah proses tertentu, dan tepi graf ialah aliran kawalan atau maklumat. Proses boleh memanggil mana-mana fungsi Python, atau ia boleh mempunyai logik yang lebih kompleks untuk memanggil beberapa fungsi secara berurutan dalam konteks kelas. Untuk operasi yang paling biasa, sudah terdapat banyak pembangunan siap sedia yang boleh digunakan sebagai proses. Perkembangan tersebut termasuk:
- pengendali - untuk memindahkan data dari satu tempat ke tempat lain, sebagai contoh, dari jadual pangkalan data ke gudang data;
- penderia - untuk menunggu kejadian tertentu dan mengarahkan aliran kawalan ke bucu graf berikutnya;
- cangkuk - untuk operasi peringkat rendah, contohnya, untuk mendapatkan semula data daripada jadual pangkalan data (digunakan dalam penyata);
- dan lain-lain
Adalah tidak sesuai untuk menerangkan Apache Airflow secara terperinci dalam artikel ini. Pengenalan ringkas boleh dilihat
Cangkuk untuk mendapatkan data
Pertama sekali, untuk menyelesaikan masalah kita perlu menulis cangkuk yang kita boleh:
- sambung ke e-mel;
- cari surat yang anda perlukan;
- menerima data daripada surat.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Logiknya begini: kita sambung, cari huruf terakhir yang paling relevan, jika ada yang lain, kita abaikan mereka. Fungsi ini digunakan kerana mesej kemudian mengandungi semua data daripada yang terdahulu. Jika ini tidak berlaku, maka anda boleh mengembalikan tatasusunan semua huruf atau memproses yang pertama dan yang lain pada pas seterusnya. Secara umum, seperti biasa, semuanya bergantung pada tugas.
Kami menambah dua fungsi tambahan pada cangkuk: untuk memuat turun fail dan untuk memuat turun fail menggunakan pautan dari surat. Dengan cara ini, mereka boleh dimasukkan ke dalam pengendali, ia bergantung pada kekerapan penggunaan fungsi ini. Apa lagi yang perlu ditambahkan pada cangkuk, sekali lagi, bergantung pada tugas: jika fail diterima serta-merta dalam surat, maka anda boleh memuat turun lampiran pada surat itu, jika data datang dalam surat, maka anda perlu menghuraikan surat itu, dsb. Dalam kes saya, surat itu tiba dengan satu pautan ke arkib, yang perlu saya letakkan di tempat tertentu dan mula memproses selanjutnya.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Kod ini mudah, jadi ia tidak memerlukan sebarang penjelasan tambahan. Saya hanya akan memberitahu anda tentang garis ajaib imap_conn_id. Apache Airflow menyimpan parameter sambungan (log masuk, kata laluan, alamat dan parameter lain), yang boleh diakses oleh pengecam rentetan. Secara visual, pengurusan sambungan kelihatan seperti ini
Sensor untuk menunggu data
Memandangkan kami sudah tahu cara menyambung dan menerima data daripada mel, kami kini boleh menulis penderia untuk menunggunya. Dalam kes saya, menulis dengan segera pengendali yang akan memproses data, jika ada, tidak berfungsi, kerana proses lain berfungsi berdasarkan data yang diterima daripada mel, termasuk yang mengambil data berkaitan daripada sumber lain (API, telefon, metrik web, dll.) dll.). Biar saya berikan satu contoh. Pengguna baharu telah muncul dalam sistem CRM dan kami masih belum mengetahui tentang UUIDnya. Kemudian, apabila kami cuba menerima data daripada telefon SIP, kami akan menerima panggilan yang terikat pada UUIDnya, tetapi kami tidak akan dapat menyimpan dan menggunakannya dengan betul. Dalam soalan sedemikian, adalah penting untuk mengingati pergantungan data, terutamanya jika ia adalah daripada sumber yang berbeza. Ini, sudah tentu, langkah-langkah yang tidak mencukupi untuk mengekalkan integriti data, tetapi dalam beberapa kes ia adalah perlu. Dan meminjam sumber semasa terbiar juga tidak rasional.
Oleh itu, penderia kami akan melancarkan bucu graf berikutnya jika terdapat maklumat baharu dalam mel, dan juga menandakan maklumat sebelumnya sebagai tidak berkaitan.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Menerima dan menggunakan data
Untuk menerima dan memproses data, anda boleh menulis operator yang berasingan, atau anda boleh menggunakan yang sudah siap. Oleh kerana logiknya masih remeh - untuk mengambil data dari surat, maka sebagai contoh saya mencadangkan PythonOperator standard
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Dengan cara ini, jika e-mel korporat anda juga berada di mail.ru, maka anda tidak akan dapat mencari surat mengikut subjek, pengirim, dsb. Mereka berjanji untuk memperkenalkannya kembali pada 2016, tetapi nampaknya mengubah fikiran mereka. Saya menyelesaikan masalah ini dengan mencipta folder berasingan untuk huruf yang diperlukan dan menyediakan penapis dalam antara muka web mel untuk huruf yang diperlukan. Oleh itu, hanya huruf yang diperlukan dan syarat carian, dalam kes saya, hanya (GHAIB) masuk ke dalam folder ini.
Untuk meringkaskan, kami mempunyai urutan berikut: kami menyemak sama ada terdapat surat baru yang memenuhi syarat; jika ada, maka kami memuat turun arkib menggunakan pautan dari huruf terakhir.
Di bawah elips terakhir, arkib ini ditinggalkan akan dibongkar, data daripada arkib akan dibersihkan dan diproses, dan akibatnya, semuanya akan pergi lebih jauh ke saluran paip proses ETL, tetapi ini sudah di luar skop daripada artikel itu. Jika ia ternyata menarik dan berguna, maka saya dengan senang hati akan terus menerangkan penyelesaian ETL dan bahagiannya untuk Apache Airflow.
Sumber: www.habr.com