Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Merhaba Habr! Bu makalede, örneğin kurumsal bir DWH'nin veya DataLake'inizin altyapısında toplu veri işleme süreçleri geliştirmeye yönelik harika bir araçtan bahsetmek istiyorum. Apache Airflow (bundan sonra Airflow olarak anılacaktır) hakkında konuşacağız. Habré'de haksız yere dikkatlerden mahrum bırakılıyor ve esas olarak sizi ETL/ELT süreçleriniz için bir zamanlayıcı seçerken en azından Airflow'un dikkate alınmaya değer olduğuna ikna etmeye çalışacağım.

Daha önce Tinkoff Bank'ta çalışırken DWH konusuyla ilgili bir dizi makale yazmıştım. Artık Mail.Ru Grup ekibinin bir parçası oldum ve oyun alanında veri analizi için bir platform geliştiriyorum. Aslında haberler ve ilginç çözümler ortaya çıktıkça ekibim ve ben burada veri analitiği platformumuz hakkında konuşacağız.

prolog

Öyleyse başlayalım. Hava Akışı Nedir? Bu bir kütüphanedir (veya kütüphane seti) iş süreçlerini geliştirmek, planlamak ve izlemek. Airflow'un ana özelliği: Python kodu süreçleri tanımlamak (geliştirmek) için kullanılır. Bunun, projenizi ve geliştirmenizi organize etmek için birçok avantajı vardır: özünde, (örneğin) ETL projeniz sadece bir Python projesidir ve bunu altyapının özelliklerini, ekip boyutunu ve özelliklerini dikkate alarak istediğiniz gibi organize edebilirsiniz. diğer gereklilikler. Enstrümantal olarak her şey basittir. Örneğin PyCharm + Git'i kullanın. Harika ve çok kullanışlı!

Şimdi Airflow'un ana öğelerine bakalım. Bunların özünü ve amacını anlayarak süreç mimarinizi en iyi şekilde organize edebilirsiniz. Belki de ana varlık Yönlendirilmiş Asiklik Grafiktir (bundan sonra DAG olarak anılacaktır).

DAG

DAG, belirli bir programa göre kesin olarak tanımlanmış bir sırayla tamamlamak istediğiniz görevlerinizin anlamlı bir birleşimidir. Airflow, DAG'ler ve diğer varlıklarla çalışmak için kullanışlı bir web arayüzü sağlar:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

DAG şöyle görünebilir:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Geliştirici, bir DAG tasarlarken, DAG içindeki görevlerin üzerine inşa edileceği bir dizi operatör belirler. Burada bir başka önemli varlığa geliyoruz: Hava Akış Operatörü.

operatörler

Operatör, bir iş örneğinin yürütülmesi sırasında ne olacağını açıklayan, hangi iş örneklerinin oluşturulduğunu temel alan bir varlıktır. GitHub'dan Airflow yayınları zaten kullanıma hazır bir dizi operatör içerir. Örnekler:

  • BashOperator - bash komutunu yürütmek için kullanılan operatör.
  • PythonOperator - Python kodunu çağırmak için kullanılan operatör.
  • EmailOperator — e-posta gönderme operatörü.
  • HTTPOperator - http istekleriyle çalışmak için kullanılan operatör.
  • SqlOperator - SQL kodunu yürütmek için operatör.
  • Sensör, bir olayı beklemek için kullanılan bir operatördür (gerekli zamanın gelmesi, gerekli dosyanın görünümü, veritabanındaki bir satır, API'den bir yanıt vb.).

Daha spesifik operatörler vardır: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Siz de kendi özelliklerinize göre operatörler geliştirip projenizde kullanabilirsiniz. Örneğin, belgeleri MongoDB'den Hive'a aktarmak için bir operatör olan MongoDBToHiveViaHdfsTransfer'ı ve MongoDB ile çalışmak için çeşitli operatörleri oluşturduk. Tıklama Evi: CHLoadFromHiveOperator ve CHTableLoaderOperator. Temel olarak, bir proje temel ifadeler üzerine kurulu kodu sıklıkla kullanmaya başladıktan sonra onu yeni bir ifadeye dönüştürmeyi düşünebilirsiniz. Bu, daha fazla geliştirmeyi kolaylaştıracak ve projedeki operatör kütüphanenizi genişleteceksiniz.

Daha sonra, tüm bu görev örneklerinin yürütülmesi gerekiyor ve şimdi zamanlayıcı hakkında konuşacağız.

zamanlayıcı

Airflow'un görev zamanlayıcısı temel alınmıştır Kereviz. Kereviz, bir kuyruğu organize etmenize ve görevlerin asenkron ve dağıtılmış yürütülmesine olanak tanıyan bir Python kütüphanesidir. Hava Akışı tarafında tüm görevler havuzlara bölünmüştür. Havuzlar manuel olarak oluşturulur. Tipik olarak amaçları, kaynakla çalışmanın iş yükünü sınırlamak veya DWH içindeki görevleri belirlemektir. Havuzlar web arayüzü üzerinden yönetilebilir:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Her havuzun slot sayısında bir sınırı vardır. DAG oluştururken ona bir havuz verilir:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG düzeyinde tanımlanan bir havuz, görev düzeyinde geçersiz kılınabilir.
Ayrı bir süreç olan Zamanlayıcı, Airflow'taki tüm görevlerin zamanlanmasından sorumludur. Aslında Zamanlayıcı, görevlerin yürütülmesi için ayarlanmasına ilişkin tüm mekanizmalarla ilgilenir. Görev yürütülmeden önce birkaç aşamadan geçer:

  1. DAG'da önceki görevler tamamlandı; yenisi sıraya alınabilir.
  2. Sıra, görevlerin önceliğine göre sıralanır (öncelikler de kontrol edilebilir) ve havuzda boş slot varsa görev devreye alınabilir.
  3. Ücretsiz işçi kereviz varsa görev ona gönderilir; problemde programladığınız çalışma şu veya bu operatörü kullanarak başlar.

Yeterince basit.

Zamanlayıcı, tüm DAG'lerin ve DAG'lerin içindeki tüm görevlerin kümesinde çalışır.

Zamanlayıcının DAG ile çalışmaya başlaması için DAG'ın bir zamanlama ayarlaması gerekir:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Bir dizi hazır ön ayar vardır: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Cron ifadelerini de kullanabilirsiniz:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Gerçekleşme tarihi

Hava Akışının nasıl çalıştığını anlamak için bir DAG'nin Yürütme Tarihinin ne olduğunu anlamak önemlidir. Airflow'ta DAG'nin bir Yürütme Tarihi boyutu vardır; yani DAG'ın çalışma programına bağlı olarak her Yürütme Tarihi için görev örnekleri oluşturulur. Ve her Yürütme Tarihi için görevler yeniden yürütülebilir veya örneğin bir DAG, birden fazla Yürütme Tarihinde aynı anda çalışabilir. Bu burada açıkça gösterilmektedir:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Ne yazık ki (veya belki de neyse ki: duruma bağlıdır), eğer görevin DAG'deki uygulaması düzeltilirse, önceki Yürütme Tarihindeki yürütme, ayarlamalar dikkate alınarak devam edecektir. Yeni bir algoritma kullanarak geçmiş dönemlerdeki verileri yeniden hesaplamanız gerekiyorsa bu iyidir, ancak sonucun tekrarlanabilirliği kaybolduğu için kötüdür (tabii ki kimse kaynak kodun gerekli sürümünü Git'ten döndürmeniz ve ne olacağını hesaplamanız konusunda sizi rahatsız etmez) bir kereye ihtiyacın var, ihtiyacın olduğu şekilde).

Görevler oluşturuluyor

DAG'ın uygulanması Python'daki koddur; bu nedenle, örneğin parçalanmış kaynaklarla çalışırken kod miktarını azaltmanın çok uygun bir yoluna sahibiz. Diyelim ki kaynak olarak üç MySQL parçanız var, her birine girip bazı veriler almanız gerekiyor. Üstelik bağımsız ve paralel olarak. DAG'daki Python kodu şöyle görünebilir:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG şuna benzer:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Bu durumda, yalnızca ayarları düzenleyerek ve DAG'ı güncelleyerek bir parçayı ekleyebilir veya kaldırabilirsiniz. Rahat!

Ayrıca daha karmaşık kod oluşturmayı da kullanabilirsiniz, örneğin, bir veritabanı biçimindeki kaynaklarla çalışabilir veya bir tablo yapısını, bir tabloyla çalışmak için bir algoritmayı tanımlayabilir ve DWH altyapısının özelliklerini dikkate alarak bir süreç oluşturabilirsiniz. N tabloyu depolama alanınıza yüklemek için. Veya, örneğin liste biçimindeki bir parametreyle çalışmayı desteklemeyen bir API ile çalışırken, bu listeden bir DAG'da N görev oluşturabilir, API'deki isteklerin paralelliğini bir havuzla sınırlayabilir ve kazıyabilirsiniz. API'den gerekli veriler. Esnek!

depo

Airflow'un kendi arka uç deposu, görevlerin durumlarını, DAG'leri, bağlantı ayarlarını, global değişkenleri vb. saklayan bir veritabanı (MySQL veya Postgres olabilir, bizde Postgres var) vardır. Burada şunu söyleyebilirim ki Airflow'daki depo çok basittir (yaklaşık 20 tablo) ve bunun üzerine kendi süreçlerinizden herhangi birini oluşturmak istiyorsanız kullanışlıdır. Informatica deposundaki, nasıl sorgu oluşturulacağını anlamadan önce uzun süre üzerinde çalışılması gereken 100500 tabloyu hatırlıyorum.

İzleme

Havuzun basitliği göz önüne alındığında, size uygun bir görev izleme süreci oluşturabilirsiniz. Zeppelin'de görevlerin durumuna baktığımız bir not defteri kullanıyoruz:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Bu aynı zamanda Airflow'un web arayüzü de olabilir:

Airflow, toplu veri işleme süreçlerini rahat ve hızlı bir şekilde geliştirmeye ve sürdürmeye yönelik bir araçtır

Airflow kodu açık kaynak olduğundan Telegram'a uyarı ekledik. Bir görevin çalışan her örneği, bir hata meydana gelirse, tüm geliştirme ve destek ekibinin bulunduğu Telegram'daki gruba spam gönderir.

Telegram aracılığıyla (gerekirse) anında yanıt alıyoruz ve Zeppelin aracılığıyla Airflow'taki görevlerin genel bir resmini alıyoruz.

Toplam

Hava akışı öncelikle açık kaynaktır ve ondan mucizeler beklememelisiniz. İşe yarayan bir çözüm oluşturmak için zaman ve çaba harcamaya hazır olun. Hedefe ulaşılabilir, inanın bana, buna değer. Geliştirme hızı, esneklik, yeni süreçler ekleme kolaylığı - beğeneceksiniz. Elbette projenin organizasyonuna, Hava Akışının stabilitesine çok dikkat etmeniz gerekiyor: mucizeler olmaz.

Artık Airflow günlük olarak çalışıyor yaklaşık 6,5 bin görev. Karakter olarak oldukça farklılar. Ana DWH'ye birçok farklı ve çok özel kaynaktan veri yükleme görevleri vardır, ana DWH içindeki vitrinleri hesaplama görevleri vardır, verileri hızlı bir DWH'ye yayınlama görevleri vardır, pek çok farklı görev vardır - ve Airflow gün be gün hepsini çiğniyor. Rakamlarla konuşursak, bu 2,3 binlerce DWH (Hadoop) içerisinde değişen karmaşıklığa sahip ELT görevleri, yaklaşık. 2,5 yüz veritabanı kaynaklar, bu bir ekip 4 ETL geliştiricisiDWH'de ETL veri işleme ve DWH içinde ELT veri işleme ve elbette daha fazlası olarak ikiye ayrılır bir yönetici, hizmetin altyapısıyla ilgilenen kişi.

Gelecek için planlar

Süreçlerin sayısı kaçınılmaz olarak artıyor ve Airflow altyapısı açısından yapacağımız en önemli şey ölçeklendirme olacak. Bir Airflow kümesi oluşturmak, Kereviz işçilerine bir çift bacak tahsis etmek ve iş planlama süreçleri ve bir depo ile kendi kendini kopyalayan bir kafa yapmak istiyoruz.

Sonuç bölümü

Elbette Airflow hakkında anlatmak istediğim her şey bu değil ama ana noktaları vurgulamaya çalıştım. İştah yemekle gelir, deneyin, beğeneceksiniz :)

Kaynak: habr.com

Yorum ekle