Merhaba Habr! Bu makalede, örneğin kurumsal bir DWH'nin veya DataLake'inizin altyapısında toplu veri işleme süreçleri geliştirmeye yönelik harika bir araçtan bahsetmek istiyorum. Apache Airflow (bundan sonra Airflow olarak anılacaktır) hakkında konuşacağız. Habré'de haksız yere dikkatlerden mahrum bırakılıyor ve esas olarak sizi ETL/ELT süreçleriniz için bir zamanlayıcı seçerken en azından Airflow'un dikkate alınmaya değer olduğuna ikna etmeye çalışacağım.
Daha önce Tinkoff Bank'ta çalışırken DWH konusuyla ilgili bir dizi makale yazmıştım. Artık Mail.Ru Grup ekibinin bir parçası oldum ve oyun alanında veri analizi için bir platform geliştiriyorum. Aslında haberler ve ilginç çözümler ortaya çıktıkça ekibim ve ben burada veri analitiği platformumuz hakkında konuşacağız.
prolog
Öyleyse başlayalım. Hava Akışı Nedir? Bu bir kütüphanedir (veya
Şimdi Airflow'un ana öğelerine bakalım. Bunların özünü ve amacını anlayarak süreç mimarinizi en iyi şekilde organize edebilirsiniz. Belki de ana varlık Yönlendirilmiş Asiklik Grafiktir (bundan sonra DAG olarak anılacaktır).
DAG
DAG, belirli bir programa göre kesin olarak tanımlanmış bir sırayla tamamlamak istediğiniz görevlerinizin anlamlı bir birleşimidir. Airflow, DAG'ler ve diğer varlıklarla çalışmak için kullanışlı bir web arayüzü sağlar:
DAG şöyle görünebilir:
Geliştirici, bir DAG tasarlarken, DAG içindeki görevlerin üzerine inşa edileceği bir dizi operatör belirler. Burada bir başka önemli varlığa geliyoruz: Hava Akış Operatörü.
operatörler
Operatör, bir iş örneğinin yürütülmesi sırasında ne olacağını açıklayan, hangi iş örneklerinin oluşturulduğunu temel alan bir varlıktır.
- BashOperator - bash komutunu yürütmek için kullanılan operatör.
- PythonOperator - Python kodunu çağırmak için kullanılan operatör.
- EmailOperator — e-posta gönderme operatörü.
- HTTPOperator - http istekleriyle çalışmak için kullanılan operatör.
- SqlOperator - SQL kodunu yürütmek için operatör.
- Sensör, bir olayı beklemek için kullanılan bir operatördür (gerekli zamanın gelmesi, gerekli dosyanın görünümü, veritabanındaki bir satır, API'den bir yanıt vb.).
Daha spesifik operatörler vardır: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Siz de kendi özelliklerinize göre operatörler geliştirip projenizde kullanabilirsiniz. Örneğin, belgeleri MongoDB'den Hive'a aktarmak için bir operatör olan MongoDBToHiveViaHdfsTransfer'ı ve MongoDB ile çalışmak için çeşitli operatörleri oluşturduk.
Daha sonra, tüm bu görev örneklerinin yürütülmesi gerekiyor ve şimdi zamanlayıcı hakkında konuşacağız.
zamanlayıcı
Airflow'un görev zamanlayıcısı temel alınmıştır
Her havuzun slot sayısında bir sınırı vardır. DAG oluştururken ona bir havuz verilir:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG düzeyinde tanımlanan bir havuz, görev düzeyinde geçersiz kılınabilir.
Ayrı bir süreç olan Zamanlayıcı, Airflow'taki tüm görevlerin zamanlanmasından sorumludur. Aslında Zamanlayıcı, görevlerin yürütülmesi için ayarlanmasına ilişkin tüm mekanizmalarla ilgilenir. Görev yürütülmeden önce birkaç aşamadan geçer:
- DAG'da önceki görevler tamamlandı; yenisi sıraya alınabilir.
- Sıra, görevlerin önceliğine göre sıralanır (öncelikler de kontrol edilebilir) ve havuzda boş slot varsa görev devreye alınabilir.
- Ücretsiz işçi kereviz varsa görev ona gönderilir; problemde programladığınız çalışma şu veya bu operatörü kullanarak başlar.
Yeterince basit.
Zamanlayıcı, tüm DAG'lerin ve DAG'lerin içindeki tüm görevlerin kümesinde çalışır.
Zamanlayıcının DAG ile çalışmaya başlaması için DAG'ın bir zamanlama ayarlaması gerekir:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Bir dizi hazır ön ayar vardır: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Cron ifadelerini de kullanabilirsiniz:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Gerçekleşme tarihi
Hava Akışının nasıl çalıştığını anlamak için bir DAG'nin Yürütme Tarihinin ne olduğunu anlamak önemlidir. Airflow'ta DAG'nin bir Yürütme Tarihi boyutu vardır; yani DAG'ın çalışma programına bağlı olarak her Yürütme Tarihi için görev örnekleri oluşturulur. Ve her Yürütme Tarihi için görevler yeniden yürütülebilir veya örneğin bir DAG, birden fazla Yürütme Tarihinde aynı anda çalışabilir. Bu burada açıkça gösterilmektedir:
Ne yazık ki (veya belki de neyse ki: duruma bağlıdır), eğer görevin DAG'deki uygulaması düzeltilirse, önceki Yürütme Tarihindeki yürütme, ayarlamalar dikkate alınarak devam edecektir. Yeni bir algoritma kullanarak geçmiş dönemlerdeki verileri yeniden hesaplamanız gerekiyorsa bu iyidir, ancak sonucun tekrarlanabilirliği kaybolduğu için kötüdür (tabii ki kimse kaynak kodun gerekli sürümünü Git'ten döndürmeniz ve ne olacağını hesaplamanız konusunda sizi rahatsız etmez) bir kereye ihtiyacın var, ihtiyacın olduğu şekilde).
Görevler oluşturuluyor
DAG'ın uygulanması Python'daki koddur; bu nedenle, örneğin parçalanmış kaynaklarla çalışırken kod miktarını azaltmanın çok uygun bir yoluna sahibiz. Diyelim ki kaynak olarak üç MySQL parçanız var, her birine girip bazı veriler almanız gerekiyor. Üstelik bağımsız ve paralel olarak. DAG'daki Python kodu şöyle görünebilir:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG şuna benzer:
Bu durumda, yalnızca ayarları düzenleyerek ve DAG'ı güncelleyerek bir parçayı ekleyebilir veya kaldırabilirsiniz. Rahat!
Ayrıca daha karmaşık kod oluşturmayı da kullanabilirsiniz, örneğin, bir veritabanı biçimindeki kaynaklarla çalışabilir veya bir tablo yapısını, bir tabloyla çalışmak için bir algoritmayı tanımlayabilir ve DWH altyapısının özelliklerini dikkate alarak bir süreç oluşturabilirsiniz. N tabloyu depolama alanınıza yüklemek için. Veya, örneğin liste biçimindeki bir parametreyle çalışmayı desteklemeyen bir API ile çalışırken, bu listeden bir DAG'da N görev oluşturabilir, API'deki isteklerin paralelliğini bir havuzla sınırlayabilir ve kazıyabilirsiniz. API'den gerekli veriler. Esnek!
depo
Airflow'un kendi arka uç deposu, görevlerin durumlarını, DAG'leri, bağlantı ayarlarını, global değişkenleri vb. saklayan bir veritabanı (MySQL veya Postgres olabilir, bizde Postgres var) vardır. Burada şunu söyleyebilirim ki Airflow'daki depo çok basittir (yaklaşık 20 tablo) ve bunun üzerine kendi süreçlerinizden herhangi birini oluşturmak istiyorsanız kullanışlıdır. Informatica deposundaki, nasıl sorgu oluşturulacağını anlamadan önce uzun süre üzerinde çalışılması gereken 100500 tabloyu hatırlıyorum.
İzleme
Havuzun basitliği göz önüne alındığında, size uygun bir görev izleme süreci oluşturabilirsiniz. Zeppelin'de görevlerin durumuna baktığımız bir not defteri kullanıyoruz:
Bu aynı zamanda Airflow'un web arayüzü de olabilir:
Airflow kodu açık kaynak olduğundan Telegram'a uyarı ekledik. Bir görevin çalışan her örneği, bir hata meydana gelirse, tüm geliştirme ve destek ekibinin bulunduğu Telegram'daki gruba spam gönderir.
Telegram aracılığıyla (gerekirse) anında yanıt alıyoruz ve Zeppelin aracılığıyla Airflow'taki görevlerin genel bir resmini alıyoruz.
Toplam
Hava akışı öncelikle açık kaynaktır ve ondan mucizeler beklememelisiniz. İşe yarayan bir çözüm oluşturmak için zaman ve çaba harcamaya hazır olun. Hedefe ulaşılabilir, inanın bana, buna değer. Geliştirme hızı, esneklik, yeni süreçler ekleme kolaylığı - beğeneceksiniz. Elbette projenin organizasyonuna, Hava Akışının stabilitesine çok dikkat etmeniz gerekiyor: mucizeler olmaz.
Artık Airflow günlük olarak çalışıyor yaklaşık 6,5 bin görev. Karakter olarak oldukça farklılar. Ana DWH'ye birçok farklı ve çok özel kaynaktan veri yükleme görevleri vardır, ana DWH içindeki vitrinleri hesaplama görevleri vardır, verileri hızlı bir DWH'ye yayınlama görevleri vardır, pek çok farklı görev vardır - ve Airflow gün be gün hepsini çiğniyor. Rakamlarla konuşursak, bu 2,3 binlerce DWH (Hadoop) içerisinde değişen karmaşıklığa sahip ELT görevleri, yaklaşık. 2,5 yüz veritabanı kaynaklar, bu bir ekip 4 ETL geliştiricisiDWH'de ETL veri işleme ve DWH içinde ELT veri işleme ve elbette daha fazlası olarak ikiye ayrılır bir yönetici, hizmetin altyapısıyla ilgilenen kişi.
Gelecek için planlar
Süreçlerin sayısı kaçınılmaz olarak artıyor ve Airflow altyapısı açısından yapacağımız en önemli şey ölçeklendirme olacak. Bir Airflow kümesi oluşturmak, Kereviz işçilerine bir çift bacak tahsis etmek ve iş planlama süreçleri ve bir depo ile kendi kendini kopyalayan bir kafa yapmak istiyoruz.
Sonuç bölümü
Elbette Airflow hakkında anlatmak istediğim her şey bu değil ama ana noktaları vurgulamaya çalıştım. İştah yemekle gelir, deneyin, beğeneceksiniz :)
Kaynak: habr.com