Python SDK ilə Check Point API-nin sadələşdirilməsi

Python SDK ilə Check Point API-nin sadələşdirilməsiAPI ilə qarşılıqlı əlaqənin tam gücü proqram kodu ilə birlikdə istifadə edildikdə, API sorğularını və API cavablarını təhlil etmək üçün alətləri dinamik şəkildə yaratmaq mümkün olduqda ortaya çıxır. Bununla belə, hələ də gözə dəyməz olaraq qalır Python Proqram İnkişafı Kiti (bundan sonra Python SDK adlandırılacaq) üçün Check Point Management API, amma boş yerə. O, tərtibatçıların və avtomatlaşdırma həvəskarlarının həyatını əhəmiyyətli dərəcədə asanlaşdırır. Python son zamanlarda böyük populyarlıq qazandı və mən boşluğu doldurmaq və əsas xüsusiyyətləri nəzərdən keçirmək qərarına gəldim. Check Point API Python İnkişaf Dəsti. Bu məqalə Habré haqqında başqa bir məqaləyə əla əlavə kimi xidmət edir Check Point R80.10 API. CLI, skriptlər və s. vasitəsilə nəzarət. Python SDK-dan istifadə edərək skriptlərin necə yazılmasına baxacağıq və 1.6 versiyasında (R80.40-dan başlayaraq dəstəklənir) yeni İdarəetmə API funksionallığına daha yaxından nəzər salacağıq. Məqaləni başa düşmək üçün sizə API və Python ilə işləmək üçün əsas bilik lazımdır.

Check Point API-ni fəal şəkildə inkişaf etdirir və hazırda aşağıdakılar buraxılmışdır:

Python SDK hazırda yalnız İdarəetmə API ilə qarşılıqlı əlaqəni dəstəkləyir və Gaia API. Bu modulda ən vacib siniflərə, metodlara və dəyişənlərə baxacağıq.

Python SDK ilə Check Point API-nin sadələşdirilməsi

Modulun quraşdırılması

Modul cpapi tez və asanlıqla quraşdırır github-da rəsmi Check Point deposu köməyi ilə çəyirdək. Ətraflı quraşdırma təlimatları burada mövcuddur README.md. Bu modul Python 2.7 və 3.7 versiyaları ilə işləmək üçün uyğunlaşdırılmışdır. Bu yazıda Python 3.7 istifadə edərək nümunələr veriləcək. Bununla belə, Python SDK birbaşa Check Point İdarəetmə Serverindən (Smart Management) işə salına bilər, lakin onlar yalnız Python 2.7-ni dəstəkləyir, ona görə də sonuncu bölmə 2.7 versiyası üçün kod təqdim edəcək. Modulu quraşdırdıqdan dərhal sonra kataloqlardakı nümunələrə baxmağı məsləhət görürəm misallar_python2 и misallar_python3.

Başlarken

Cpapi modulunun komponentləri ilə işləyə bilməyimiz üçün moduldan idxal etməliyik. cpapi ən azı iki tələb olunan sinif:

APIClient и APIClientArgs

from cpapi import APIClient, APIClientArgs

Sinif APIClientArgs API serverinə və sinfə qoşulma parametrlərinə cavabdehdir APIClient API ilə qarşılıqlı əlaqəyə cavabdehdir.

Bağlantı parametrlərinin müəyyən edilməsi

API-yə qoşulmaq üçün müxtəlif parametrləri müəyyən etmək üçün siz sinfin nümunəsini yaratmalısınız APIClientArgs. Prinsipcə, onun parametrləri əvvəlcədən müəyyən edilir və skripti idarəetmə serverində işləyərkən onların dəqiqləşdirilməsinə ehtiyac yoxdur.

client_args = APIClientArgs()

Ancaq üçüncü tərəf hostunda işləyərkən ən azı API serverinin IP ünvanını və ya host adını (həmçinin idarəetmə serveri kimi tanınır) göstərməlisiniz. Aşağıdakı nümunədə biz server bağlantısı parametrini təyin edirik və ona idarəetmə serverinin IP ünvanını sətir kimi təyin edirik.

client_args = APIClientArgs(server='192.168.47.241')

API serverinə qoşulduqda istifadə edilə bilən bütün parametrlərə və onların standart dəyərlərinə baxaq:

APIClientArgs sinfinin __init__ metodunun arqumentləri

class APIClientArgs:
    """
    This class provides arguments for APIClient configuration.
    All the arguments are configured with their default values.
    """

    # port is set to None by default, but it gets replaced with 443 if not specified
    # context possible values - web_api (default) or gaia_api
    def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
                 api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
                 api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
        self.port = port
        # management server fingerprint
        self.fingerprint = fingerprint
        # session-id.
        self.sid = sid
        # management server name or IP-address
        self.server = server
        # debug level
        self.http_debug_level = http_debug_level
        # an array with all the api calls (for debug purposes)
        self.api_calls = api_calls if api_calls else []
        # name of debug file. If left empty, debug data will not be saved to disk.
        self.debug_file = debug_file
        # HTTP proxy server address (without "http://")
        self.proxy_host = proxy_host
        # HTTP proxy port
        self.proxy_port = proxy_port
        # Management server's API version
        self.api_version = api_version
        # Indicates that the client should not check the server's certificate
        self.unsafe = unsafe
        # Indicates that the client should automatically accept and save the server's certificate
        self.unsafe_auto_accept = unsafe_auto_accept
        # The context of using the client - defaults to web_api
        self.context = context

Mən inanıram ki, APIClientArgs sinfinin nümunələrində istifadə oluna bilən arqumentlər Check Point administratorları üçün intuitivdir və əlavə şərhlər tələb etmir.

APIClient və kontekst meneceri vasitəsilə əlaqə

Sinif APIClient Onu istifadə etməyin ən əlverişli yolu kontekst meneceri vasitəsilədir. APIClient sinifinin nümunəsinə ötürülməsi lazım olanların hamısı əvvəlki addımda müəyyən edilmiş əlaqə parametrləridir.

with APIClient(client_args) as client:

Kontekst meneceri avtomatik olaraq API serverinə giriş zəngi etməyəcək, lakin ondan çıxan zaman çıxış zəngi edəcək. API zəngləri ilə işi bitirdikdən sonra nədənsə çıxış tələb olunmursa, kontekst menecerindən istifadə etmədən işə başlamalısınız:

client = APIClient(clieng_args)

Bağlantı yoxlanılır

Bağlantının göstərilən parametrlərə uyğun olub olmadığını yoxlamağın ən asan yolu metoddan istifadə etməkdir barmaq izini yoxlayın. Server API sertifikatının barmaq izi üçün sha1 hash məbləğinin yoxlanılması uğursuz olarsa (metod qaytarıldı Saxta), onda bu adətən əlaqə problemlərindən qaynaqlanır və biz proqramın icrasını dayandıra bilərik (və ya istifadəçiyə əlaqə məlumatlarını düzəltmək imkanı verə bilərik):

    if client.check_fingerprint() is False:
        print("Could not get the server's fingerprint - Check connectivity with the server.")
        exit(1)

Nəzərə alın ki, gələcəkdə sinif APIClient hər API çağırışını yoxlayacaq (metod api_call и api_query, biz onlar haqqında bir az daha danışacağıq) API serverində sha1 barmaq izi sertifikatı. Lakin API server sertifikatının sha1 barmaq izini yoxlayarkən xəta aşkar edilərsə (sertifikat naməlumdur və ya dəyişdirilib), metod barmaq izini yoxlayın bu barədə məlumatı avtomatik olaraq yerli maşına əlavə etmək/dəyişiklik etmək imkanı verəcək. APIClientArgs arqumentindən istifadə edərək, bu yoxlama tamamilə deaktiv edilə bilər (lakin bu, yalnız skriptlər API serverinin özündə, 127.0.0.1-ə qoşulduqda işə salındıqda tövsiyə oluna bilər) - təhlükəsiz_avtomatik_qəbul edin (əvvəllər “Bağlantı parametrlərinin müəyyən edilməsi” bölməsində APIClientArgs haqqında daha çox məlumat əldə edin).

client_args = APIClientArgs(unsafe_auto_accept=True)

API serverinə daxil olun

У APIClient API serverinə daxil olmaq üçün 3 üsul var və onların hər biri mənasını başa düşür sid(sessiya-id), başlıqdakı hər bir sonrakı API çağırışında avtomatik olaraq istifadə olunur (bu parametrin başlığında olan ad X-chkp-sid), buna görə də bu parametri əlavə emal etməyə ehtiyac yoxdur.

giriş üsulu

Giriş və paroldan istifadə edən seçim (məsələn, istifadəçi adı admin və parol 1q2w3e mövqe arqumentləri kimi ötürülür):

     login = client.login('admin', '1q2w3e')  

Giriş metodunda əlavə isteğe bağlı parametrlər də mövcuddur; onların adları və standart dəyərləri bunlardır:

continue_last_session=False, domain=None, read_only=False, payload=None

Metod login_with_api_key

Api açarından istifadə edən seçim (idarəetmə versiyası R80.40/Management API v1.6-dan başlayaraq dəstəklənir, "3TsbPJ8ZKjaJGvFyoFqHFA==" bu, API açarı icazə metodu ilə idarəetmə serverindəki istifadəçilərdən biri üçün API açarı dəyəridir):

     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 

Metodda api_açarı ilə daxil olun metodda olduğu kimi eyni isteğe bağlı parametrlər mövcuddur daxil ol.

login_as_root metodu

API serveri ilə yerli maşına daxil olmaq üçün seçim:

     login = client.login_as_root()

Bu üsul üçün yalnız iki əlavə parametr mövcuddur:

domain=None, payload=None

Və nəhayət, API özlərini çağırır

Metodlar vasitəsilə API zəngləri etmək üçün iki seçimimiz var api_call и api_query. Onların arasındakı fərqin nə olduğunu anlayaq.

api_call

Bu üsul istənilən zənglər üçün uyğundur. Lazım gələrsə, tələb orqanında api çağırışı və faydalı yük üçün son hissəni keçməliyik. Faydalı yük boşdursa, o, ümumiyyətlə ötürülə bilməz:

api_versions = client.api_call('show-api-versions') 

Kesimin altındakı bu sorğu üçün çıxış:

In [23]: api_versions                                                           
Out[23]: 
APIResponse({
    "data": {
        "current-version": "1.6",
        "supported-versions": [
            "1",
            "1.1",
            "1.2",
            "1.3",
            "1.4",
            "1.5",
            "1.6"
        ]
    },
    "res_obj": {
        "data": {
            "current-version": "1.6",
            "supported-versions": [
                "1",
                "1.1",
                "1.2",
                "1.3",
                "1.4",
                "1.5",
                "1.6"
            ]
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})
show_host = client.api_call('show-host', {'name' : 'h_8.8.8.8'})

Kesimin altındakı bu sorğu üçün çıxış:

In [25]: show_host                                                              
Out[25]: 
APIResponse({
    "data": {
        "color": "black",
        "comments": "",
        "domain": {
            "domain-type": "domain",
            "name": "SMC User",
            "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
        },
        "groups": [],
        "icon": "Objects/host",
        "interfaces": [],
        "ipv4-address": "8.8.8.8",
        "meta-info": {
            "creation-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "creator": "admin",
            "last-modifier": "admin",
            "last-modify-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "lock": "unlocked",
            "validation-state": "ok"
        },
        "name": "h_8.8.8.8",
        "nat-settings": {
            "auto-rule": false
        },
        "read-only": false,
        "tags": [],
        "type": "host",
        "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
    },
    "res_obj": {
        "data": {
            "color": "black",
            "comments": "",
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "groups": [],
            "icon": "Objects/host",
            "interfaces": [],
            "ipv4-address": "8.8.8.8",
            "meta-info": {
                "creation-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "creator": "admin",
                "last-modifier": "admin",
                "last-modify-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "lock": "unlocked",
                "validation-state": "ok"
            },
            "name": "h_8.8.8.8",
            "nat-settings": {
                "auto-rule": false
            },
            "read-only": false,
            "tags": [],
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

api_query

İcazə verin, dərhal qeyd edim ki, bu metod yalnız çıxışı ofsetdən ibarət olan zənglər üçün tətbiq oluna bilər. Belə bir nəticə o zaman baş verir ki, o, böyük miqdarda məlumatı ehtiva edir və ya ehtiva edir. Məsələn, bu, idarəetmə serverində yaradılmış bütün host obyektlərinin siyahısı üçün sorğu ola bilər. Bu cür sorğular üçün API standart olaraq 50 obyektin siyahısını qaytarır (cavabda limiti 500 obyektə qədər artıra bilərsiniz). API sorğusunda ofset parametrini dəyişdirərək məlumatı bir neçə dəfə çəkməmək üçün bu işi avtomatik edən api_query metodu var. Bu metodun lazım olduğu çağırışların nümunələri: şou-sessiyalar, şou-aparıcılar, şou-şəbəkələr, şou-wildcards, şou-qruplar, şou-ünvan diapazonları, şou-sadə şlüzlər, şou-sadə-klasterlər, şou-giriş-rollar, şou-etibarlı müştərilər, şou-paketlər. Əslində, biz bu API zənglərinin adında cəm sözləri görürük, ona görə də bu zəngləri idarə etmək daha asan olacaq api_query

show_hosts = client.api_query('show-hosts') 

Kesimin altındakı bu sorğu üçün çıxış:

In [21]: show_hosts                                                             
Out[21]: 
APIResponse({
    "data": [
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "192.168.47.1",
            "name": "h_192.168.47.1",
            "type": "host",
            "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
        },
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "8.8.8.8",
            "name": "h_8.8.8.8",
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        }
    ],
    "res_obj": {
        "data": {
            "from": 1,
            "objects": [
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "192.168.47.1",
                    "name": "h_192.168.47.1",
                    "type": "host",
                    "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
                },
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "8.8.8.8",
                    "name": "h_8.8.8.8",
                    "type": "host",
                    "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
                }
            ],
            "to": 2,
            "total": 2
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

API zənglərinin nəticələrinin işlənməsi

Bundan sonra siz sinfin dəyişənlərindən və metodlarından istifadə edə bilərsiniz APIResponse(həm kontekst meneceri daxilində, həm də xaricdə). Sinifdə APIResponse 4 üsul və 5 dəyişən əvvəlcədən müəyyən edilmişdir, ən vacibləri üzərində daha ətraflı dayanacağıq.

Python SDK ilə Check Point API-nin sadələşdirilməsi

müvəffəqiyyət

Başlamaq üçün, API çağırışının uğurlu olduğuna və nəticə verdiyinə əmin olmaq yaxşı olardı. Bunun üçün bir üsul var müvəffəqiyyət:

In [49]: api_versions.success                                                   
Out[49]: True

API çağırışı uğurlu olarsa True (cavab kodu - 200) və uğursuz olarsa False (hər hansı digər cavab kodu) qaytarır. Cavab kodundan asılı olaraq müxtəlif məlumatları göstərmək üçün API çağırışından dərhal sonra istifadə etmək rahatdır.

if api_ver.success: 
    print(api_versions.data) 
else: 
    print(api_versions.err_message) 

status kodu

API çağırışı edildikdən sonra cavab kodunu qaytarır.

In [62]: api_versions.status_code                                               
Out[62]: 400

Mümkün cavab kodları: 200,400,401,403,404,409,500,501.

uğur_statusunu təyin edin

Bu halda, müvəffəqiyyət statusunun dəyərini dəyişdirmək lazım ola bilər. Texniki cəhətdən oraya hər hansı bir şeyi, hətta adi bir simi də qoya bilərsiniz. Ancaq real nümunə, müəyyən müşayiət olunan şərtlər altında bu parametrin False-ə sıfırlanması ola bilər. Aşağıda, idarəetmə serverində işləyən tapşırıqlar olduqda nümunəyə diqqət yetirin, lakin biz bu sorğunu uğursuz hesab edəcəyik (uğur dəyişənini təyin edəcəyik Saxta, API çağırışının uğurlu olmasına və 200 kodunu qaytarmasına baxmayaraq).

for task in task_result.data["tasks"]:
    if task["status"] == "failed" or task["status"] == "partially succeeded":
        task_result.set_success_status(False)
        break

cavab()

Cavab metodu lüğətə cavab kodu (status_code) və cavab orqanı (bədən) ilə baxmağa imkan verir.

In [94]: api_versions.response()                                                
Out[94]: 
{'status_code': 200,
 'data': {'current-version': '1.6',
  'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}}

məlumat

Lazımsız məlumat olmadan yalnız cavabın gövdəsini (bədənini) görməyə imkan verir.

In [93]: api_versions.data                                                      
Out[93]: 
{'current-version': '1.6',
 'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}

xəta_mesajı

Bu məlumat yalnız API sorğusunu emal edərkən xəta baş verdikdə mövcuddur (cavab kodu heç bir 200). Nümunə çıxışı

In [107]: api_versions.error_message                                            
Out[107]: 'code: generic_err_invalid_parameter_namenmessage: Unrecognized parameter [1]n'

Faydalı nümunələr

Aşağıda İdarəetmə API 1.6-da əlavə edilmiş API zənglərindən istifadə edən nümunələr verilmişdir.

Əvvəlcə zənglərin necə işlədiyinə baxaq əlavə host и əlavə ünvan diapazonu. Deyək ki, son oktet 192.168.0.0 olan 24/5 alt şəbəkəsinin bütün İP ünvanlarını host tipli obyektlər kimi yaratmalı və bütün digər IP ünvanlarını ünvan diapazonu tipli obyektlər kimi yazmalıyıq. Bu halda, alt şəbəkə ünvanını və yayım ünvanını istisna edin.

Beləliklə, aşağıda bu problemi həll edən və host tipli 50 obyekt və ünvan diapazonu tipli 51 obyekt yaradan bir skript var. Problemi həll etmək üçün 101 API çağırışı tələb olunur (son nəşr çağırışı nəzərə alınmadan). Həmçinin timeit modulundan istifadə edərək dəyişikliklər dərc olunana qədər skriptin icrası üçün lazım olan vaxtı hesablayırıq.

Əlavə host və əlavə ünvan diapazonundan istifadə edərək skript

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

first_ip = 1
last_ip = 4

client_args = APIClientArgs(server="192.168.47.240")

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     for ip in range(5,255,5):
         add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
     while last_ip < 255:
         add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
         first_ip+=5
         last_ip+=5
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Laboratoriya mühitimdə bu skriptin icrası idarəetmə serverindəki yükdən asılı olaraq 30 ilə 50 saniyə arasında vaxt aparır.

İndi API çağırışından istifadə edərək eyni problemi necə həll edəcəyimizi görək əlavə-obyekt-top, dəstək API 1.6 versiyasında əlavə edilmişdir. Bu zəng sizə bir API sorğusunda birdən çox obyekt yaratmağa imkan verir. Bundan əlavə, bunlar müxtəlif növ obyektlər ola bilər (məsələn, hostlar, alt şəbəkələr və ünvan diapazonları). Beləliklə, tapşırığımız bir API çağırışı çərçivəsində həll edilə bilər.

Add-objects-batch istifadə edərək skript

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
    objects_list_ip.append(data)
    
first_ip = 1
last_ip = 4


while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}


with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Və bu skripti mənim laboratoriya mühitimdə işlətmək idarəetmə serverindəki yükdən asılı olaraq 3 ilə 7 saniyə çəkir. Yəni orta hesabla 101 API obyektində toplu tipli zəng 10 dəfə daha sürətli işləyir. Daha çox sayda obyektdə fərq daha da təsir edici olacaqdır.

İndi gəlin onunla necə işləməyə baxaq dəst-obyektlər-top. Bu API çağırışından istifadə edərək istənilən parametri toplu şəkildə dəyişə bilərik. Gəlin əvvəlki misaldakı ünvanların birinci yarısını (.124 host və diapazona qədər) sienna rənginə təyin edək və ünvanların ikinci yarısına xaki rəngini təyin edək.

Əvvəlki nümunədə yaradılmış obyektlərin rənginin dəyişdirilməsi

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []

for ip in range(5,125,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
    objects_list_ip_first.append(data)
    
for ip in range(125,255,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
    objects_list_ip_second.append(data)
    
first_ip = 1
last_ip = 4
while last_ip < 125:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
    objects_list_range_first.append(data)
    first_ip+=5
    last_ip+=5
    
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
    objects_list_range_second.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch_first  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_first
}, {
    "type" : "address-range",
    "list" : objects_list_range_first
  }]
}

data_for_batch_second  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_second
}, {
    "type" : "address-range",
    "list" : objects_list_range_second
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 
     set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
     set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
     publish = client.api_call("publish")

Bir API çağırışında birdən çox obyekti silə bilərsiniz sil-obyektləri-top. İndi əvvəllər vasitəsilə yaradılmış bütün hostları silən kod nümunəsinə baxaq əlavə-obyekt-top.

Delete-objects-batch istifadə edərək obyektlərin silinməsi

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}'}
    objects_list_ip.append(data)

first_ip = 1
last_ip = 4
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
     publish = client.api_call("publish")

print(delete_objects_batch.data)

Check Point proqramının yeni buraxılışlarında görünən bütün funksiyalar dərhal API zənglərini əldə edir. Beləliklə, R80.40-da Revizyona qayıt və Smart Task kimi "xüsusiyyətlər" meydana çıxdı və onlar üçün dərhal müvafiq API zəngləri hazırlanmışdı. Bundan əlavə, köhnə konsollardan Vahid Siyasət rejiminə keçərkən bütün funksionallıq API dəstəyini də əldə edir. Məsələn, R80.40 proqram versiyasında çoxdan gözlənilən yeniləmə HTTPS Təftiş siyasətinin Legacy rejimindən Vahid Siyasət rejiminə keçməsi idi və bu funksionallıq dərhal API zənglərini qəbul etdi. Budur, bir sıra ölkələrdə qanuna uyğun olaraq yoxlaması qadağan edilən 3 kateqoriyanı (Səhiyyə, Maliyyə, Dövlət Xidmətləri) yoxlamadan kənarlaşdıran HTTPS Təftiş siyasətinin yuxarı mövqeyinə qayda əlavə edən kod nümunəsi.

HTTPS Təftiş siyasətinə qayda əlavə edin

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

data = {
  "layer" : "Default Layer",
  "position" : "top",
  "name" : "Legal Requirements",
  "action": "bypass",
  "site-category": ["Health", "Government / Military", "Financial Services"]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_https_rule = client.api_call("add-https-rule", data)
     publish = client.api_call("publish")

Check Point idarəetmə serverində Python skriptlərinin işlədilməsi

Hər şey eynidir README.md Python skriptlərini birbaşa idarəetmə serverindən necə işə salmaq barədə məlumatları ehtiva edir. Başqa maşından API serverinə qoşula bilməyəndə bu, rahat ola bilər. Mən modulun quraşdırılmasına baxdığım altı dəqiqəlik bir video yazdım cpapi və idarəetmə serverində Python skriptlərinin işlədilməsi xüsusiyyətləri. Nümunə olaraq, şəbəkə auditi kimi tapşırıq üçün yeni şlüzün konfiqurasiyasını avtomatlaşdıran skript işə salınır. Təhlükəsizlik Yoxlanışı. Mənim həll etməli olduğum xüsusiyyətlər arasında: funksiya hələ Python 2.7-də görünməyib giriş, buna görə də istifadəçinin daxil etdiyi məlumatı emal etmək üçün funksiyadan istifadə olunur xam_giriş. Əks halda, kod digər maşınlardan işə salmaqla eynidir, yalnız funksiyadan istifadə etmək daha rahatdır login_as_root, öz istifadəçi adınızı, parolunuzu və idarəetmə serverinin IP ünvanını bir daha göstərməmək üçün.

Təhlükəsizlik Yoxlanışının sürətli qurulması üçün skript

from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs

def main():
    with APIClient() as client:
       # if client.check_fingerprint() is False:
       #     print("Could not get the server's fingerprint - Check connectivity with the server.")
       #     exit(1)
        login_res = client.login_as_root()

        if login_res.success is False:
            print("Login failed:n{}".format(login_res.error_message))
            exit(1)

        gw_name = raw_input("Enter the gateway name:")
        gw_ip = raw_input("Enter the gateway IP address:")
        if sys.stdin.isatty():
            sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
        else:
            print("Attention! Your password will be shown on the screen!")
            sic = raw_input("Enter one-time password for the gateway(SIC): ")
        version = raw_input("Enter the gateway version(like RXX.YY):")
        add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
        if add_gw.success and add_gw.data['sic-state'] != "communicating":
            print("Secure connection with the gateway hasn't established!")
            exit(1)
        elif add_gw.success:
            print("The gateway was added successfully.")
            gw_uid = add_gw.data['uid']
            gw_name = add_gw.data['name']
        else:
            print("Failed to add the gateway - {}".format(add_gw.error_message))
            exit(1)

        change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
        if change_policy.success:
            print("The policy has been changed successfully")
        else:
            print("Failed to change the policy- {}".format(change_policy.error_message))
        change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
        if change_rule.success:
            print("The cleanup rule has been changed successfully")
        else:
            print("Failed to change the cleanup rule- {}".format(change_rule.error_message))

        # publish the result
        publish_res = client.api_call("publish", {})
        if publish_res.success:
            print("The changes were published successfully.")
        else:
                print("Failed to publish the changes - {}".format(install_tp_policy.error_message))

        install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true',  "threat-prevention" : 'false', "targets" : gw_uid})
        if install_access_policy.success:
            print("The access policy has been installed")
        else:
                print("Failed to install access policy - {}".format(install_tp_policy.error_message))

        install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false',  "threat-prevention" : 'true', "targets" : gw_uid})
        if install_tp_policy.success:
            print("The threat prevention policy has been installed")
        else:
            print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
        
        # add passwords and passphrases to dictionary
        with open('additional_pass.conf') as f:
            line_num = 0
            for line in f:
                line_num += 1
                add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
                if add_password_dictionary.success:
                    print("The password dictionary line {} was added successfully".format(line_num))
                else:
                    print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))

main()

Əlavə_pass.conf parol lüğəti ilə nümunə fayl
{
"passwords" : ["malware","malicious","infected","Infected"],
"phrases" : ["password","Password","Pass","pass","codigo","key","pwd","пароль","Пароль","Ключ","ключ","шифр","Шифр"] }

Nəticə

Bu məqalə yalnız işin əsas imkanlarını araşdırır Python SDK və modul cpapi(təxmin etdiyiniz kimi, bunlar əslində sinonimlərdir) və bu modulda kodu öyrənməklə siz onunla işləmək üçün daha çox imkanlar kəşf edəcəksiniz. Mümkündür ki, siz onu öz sinifləriniz, funksiyalarınız, metodlarınız və dəyişənlərinizlə əlavə etmək istəyəcəksiniz. Bölmədə hər zaman işinizi paylaşa və Check Point üçün digər skriptlərə baxa bilərsiniz CodeHub cəmiyyətdə CheckMates, həm məhsul tərtibatçılarını, həm də istifadəçiləri bir araya gətirir.

Xoşbəxt kodlaşdırma və sona qədər oxuduğunuz üçün təşəkkür edirik!

Mənbə: www.habr.com

Добавить комментарий