Simplificarea API-ului Check Point cu SDK-ul Python

Simplificarea API-ului Check Point cu SDK-ul PythonÎntreaga putere a interacțiunii cu API-urile este dezvăluită atunci când este utilizat împreună cu codul programului, atunci când devine posibilă generarea dinamică a solicitărilor API și a instrumentelor pentru analiza răspunsurilor API. Cu toate acestea, rămâne încă de neobservat Kit de dezvoltare software Python (denumit în continuare Python SDK) pentru Check Point Management API, dar în zadar. Simplifică semnificativ viața dezvoltatorilor și a pasionaților de automatizare. Python a câștigat o popularitate enormă în ultima vreme și am decis să umplu golul și să revizuiesc principalele caracteristici. Check Point API Python Development Kit. Acest articol servește ca o completare excelentă la un alt articol despre Habré API Check Point R80.10. Management prin CLI, scripturi și multe altele. Vom analiza cum să scriem scripturi folosind SDK-ul Python și vom arunca o privire mai atentă asupra noii funcționalități Management API din versiunea 1.6 (acceptată începând de la R80.40). Pentru a înțelege articolul, veți avea nevoie de cunoștințe de bază despre lucrul cu API-uri și Python.

Check Point dezvoltă activ API-ul și în acest moment au fost lansate următoarele:

SDK-ul Python acceptă în prezent doar interacțiunea cu API-ul de gestionare și Gaia API. Ne vom uita la cele mai importante clase, metode și variabile din acest modul.

Simplificarea API-ului Check Point cu SDK-ul Python

Instalarea modulului

Modul cpapi se instaleaza rapid si usor din depozitul oficial Check Point pe github via țâfnă. Instrucțiuni detaliate de instalare sunt disponibile în README.md. Acest modul este adaptat să funcționeze cu versiunile Python 2.7 și 3.7. În acest articol, vor fi date exemple folosind Python 3.7. Cu toate acestea, SDK-ul Python poate fi rulat direct de pe serverul Check Point Management (Smart Management), dar acceptă doar Python 2.7, astfel că ultima secțiune va furniza cod pentru versiunea 2.7. Imediat după instalarea modulului, vă recomand să vă uitați la exemplele din directoare exemple_python2 и exemple_python3.

Noțiuni de bază

Pentru a putea lucra cu componentele modulului cpapi, trebuie să importăm din modul cpapi cel puțin două clase obligatorii:

APIClient и APIClientArgs

from cpapi import APIClient, APIClientArgs

clasă APIClientArgs este responsabil pentru parametrii de conectare la serverul API și la clasă APIClient este responsabil pentru interacțiunea cu API-ul.

Determinarea parametrilor de conectare

Pentru a defini diferiți parametri pentru conectarea la API, trebuie să creați o instanță a clasei APIClientArgs. În principiu, parametrii săi sunt predefiniți și atunci când rulează scriptul pe serverul de control, nu trebuie să fie specificați.

client_args = APIClientArgs()

Dar atunci când rulați pe o gazdă terță parte, trebuie să specificați cel puțin adresa IP sau numele de gazdă al serverului API (cunoscut și ca server de management). În exemplul de mai jos, definim parametrul de conectare la server și îi atribuim adresa IP a serverului de management ca șir.

client_args = APIClientArgs(server='192.168.47.241')

Să ne uităm la toți parametrii și valorile lor implicite care pot fi utilizați la conectarea la serverul API:

Argumente ale metodei __init__ a clasei APIClientArgs

class APIClientArgs:
    """
    This class provides arguments for APIClient configuration.
    All the arguments are configured with their default values.
    """

    # port is set to None by default, but it gets replaced with 443 if not specified
    # context possible values - web_api (default) or gaia_api
    def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
                 api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
                 api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
        self.port = port
        # management server fingerprint
        self.fingerprint = fingerprint
        # session-id.
        self.sid = sid
        # management server name or IP-address
        self.server = server
        # debug level
        self.http_debug_level = http_debug_level
        # an array with all the api calls (for debug purposes)
        self.api_calls = api_calls if api_calls else []
        # name of debug file. If left empty, debug data will not be saved to disk.
        self.debug_file = debug_file
        # HTTP proxy server address (without "http://")
        self.proxy_host = proxy_host
        # HTTP proxy port
        self.proxy_port = proxy_port
        # Management server's API version
        self.api_version = api_version
        # Indicates that the client should not check the server's certificate
        self.unsafe = unsafe
        # Indicates that the client should automatically accept and save the server's certificate
        self.unsafe_auto_accept = unsafe_auto_accept
        # The context of using the client - defaults to web_api
        self.context = context

Consider că argumentele care pot fi folosite în instanțe ale clasei APIClientArgs sunt intuitive pentru administratorii Check Point și nu necesită comentarii suplimentare.

Conectarea prin APIClient și manager de context

clasă APIClient Cel mai convenabil mod de a-l folosi este prin intermediul managerului de context. Tot ceea ce trebuie să fie transmis unei instanțe a clasei APIClient sunt parametrii de conexiune care au fost definiți în pasul anterior.

with APIClient(client_args) as client:

Managerul de context nu va efectua automat un apel de conectare la serverul API, dar va efectua un apel de deconectare la ieșirea din acesta. Dacă din anumite motive nu este necesară deconectarea după ce ați terminat de lucrat cu apelurile API, trebuie să începeți să lucrați fără a utiliza managerul de context:

client = APIClient(clieng_args)

Verificarea conexiunii

Cel mai simplu mod de a verifica dacă conexiunea îndeplinește parametrii specificați este utilizarea metodei verifica_amprenta. Dacă verificarea sumei hash sha1 pentru amprenta certificatului API al serverului eșuează (metoda returnată Fals), atunci acest lucru este cauzat de obicei de probleme de conexiune și putem opri execuția programului (sau oferim utilizatorului posibilitatea de a corecta datele de conectare):

    if client.check_fingerprint() is False:
        print("Could not get the server's fingerprint - Check connectivity with the server.")
        exit(1)

Vă rugăm să rețineți că în viitor clasa APIClient va verifica fiecare apel API (metode api_call и api_query, vom vorbi despre ele puțin mai departe) sha1 certificat de amprentă pe serverul API. Dar dacă, la verificarea amprentei sha1 a certificatului serverului API, este detectată o eroare (certificatul este necunoscut sau a fost modificat), metoda verifica_amprenta va oferi posibilitatea de a adăuga/modifica automat informații despre acesta pe mașina locală. Această verificare poate fi dezactivată complet (dar aceasta poate fi recomandată numai dacă scripturile sunt executate pe serverul API însuși, atunci când vă conectați la 127.0.0.1), folosind argumentul APIClientArgs - unsafe_auto_accept (vezi mai multe despre APIClientArgs mai devreme în „Definirea parametrilor de conexiune”).

client_args = APIClientArgs(unsafe_auto_accept=True)

Conectați-vă la serverul API

У APIClient există până la 3 metode de conectare la serverul API și fiecare dintre ele înțelege semnificația sid(session-id), care este utilizat automat în fiecare apel API ulterior din antet (numele din antetul acestui parametru este X-chkp-sid), deci nu este nevoie să procesați în continuare acest parametru.

metoda de conectare

Opțiune care utilizează autentificare și parola (în exemplu, numele de utilizator admin și parola 1q2w3e sunt transmise ca argumente poziționale):

     login = client.login('admin', '1q2w3e')  

Parametrii opționali suplimentari sunt, de asemenea, disponibili în metoda de conectare, aici sunt numele și valorile implicite:

continue_last_session=False, domain=None, read_only=False, payload=None

Metoda Login_with_api_key

Opțiune care utilizează o cheie API (acceptată începând cu versiunea de management R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" aceasta este valoarea cheii API pentru unul dintre utilizatorii de pe serverul de management cu metoda de autorizare a cheii API):

     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 

In metoda login_with_api_key sunt disponibili aceiași parametri opționali ca și în metodă Logare.

metoda login_as_root

Opțiunea de conectare la o mașină locală cu un server API:

     login = client.login_as_root()

Există doar doi parametri opționali disponibili pentru această metodă:

domain=None, payload=None

Și în cele din urmă API-ul se numește

Avem două opțiuni pentru a efectua apeluri API prin metode api_call и api_query. Să ne dăm seama care este diferența dintre ele.

api_call

Această metodă este aplicabilă pentru orice apel. Trebuie să trecem ultima parte pentru apelul API și încărcarea utilă în corpul cererii, dacă este necesar. Dacă sarcina utilă este goală, atunci nu poate fi transmisă deloc:

api_versions = client.api_call('show-api-versions') 

Ieșire pentru această solicitare sub tăietură:

In [23]: api_versions                                                           
Out[23]: 
APIResponse({
    "data": {
        "current-version": "1.6",
        "supported-versions": [
            "1",
            "1.1",
            "1.2",
            "1.3",
            "1.4",
            "1.5",
            "1.6"
        ]
    },
    "res_obj": {
        "data": {
            "current-version": "1.6",
            "supported-versions": [
                "1",
                "1.1",
                "1.2",
                "1.3",
                "1.4",
                "1.5",
                "1.6"
            ]
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})
show_host = client.api_call('show-host', {'name' : 'h_8.8.8.8'})

Ieșire pentru această solicitare sub tăietură:

In [25]: show_host                                                              
Out[25]: 
APIResponse({
    "data": {
        "color": "black",
        "comments": "",
        "domain": {
            "domain-type": "domain",
            "name": "SMC User",
            "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
        },
        "groups": [],
        "icon": "Objects/host",
        "interfaces": [],
        "ipv4-address": "8.8.8.8",
        "meta-info": {
            "creation-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "creator": "admin",
            "last-modifier": "admin",
            "last-modify-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "lock": "unlocked",
            "validation-state": "ok"
        },
        "name": "h_8.8.8.8",
        "nat-settings": {
            "auto-rule": false
        },
        "read-only": false,
        "tags": [],
        "type": "host",
        "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
    },
    "res_obj": {
        "data": {
            "color": "black",
            "comments": "",
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "groups": [],
            "icon": "Objects/host",
            "interfaces": [],
            "ipv4-address": "8.8.8.8",
            "meta-info": {
                "creation-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "creator": "admin",
                "last-modifier": "admin",
                "last-modify-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "lock": "unlocked",
                "validation-state": "ok"
            },
            "name": "h_8.8.8.8",
            "nat-settings": {
                "auto-rule": false
            },
            "read-only": false,
            "tags": [],
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

api_query

Permiteți-mi să fac imediat o rezervare că această metodă este aplicabilă numai pentru apelurile a căror ieșire implică compensare. O astfel de inferență apare atunci când conține sau poate conține o cantitate mare de informații. De exemplu, aceasta ar putea fi o solicitare pentru o listă a tuturor obiectelor gazdă create pe serverul de management. Pentru astfel de solicitări, API-ul returnează o listă de 50 de obiecte în mod implicit (puteți crește limita la 500 de obiecte în răspuns). Și pentru a nu extrage informațiile de mai multe ori, schimbând parametrul de offset din cererea API, există o metodă api_query care face acest lucru automat. Exemple de apeluri în care este necesară această metodă: show-sessions, show-hosts, show-networks, show-wildcards, show-groups, show-adress-ranges, show-simple-gateways, show-simple-clusters, show-access-roles, show-trusted-clients, pachete de prezentare. De fapt, vedem cuvinte la plural în numele acestor apeluri API, astfel încât aceste apeluri vor fi mai ușor de gestionat. api_query

show_hosts = client.api_query('show-hosts') 

Ieșire pentru această solicitare sub tăietură:

In [21]: show_hosts                                                             
Out[21]: 
APIResponse({
    "data": [
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "192.168.47.1",
            "name": "h_192.168.47.1",
            "type": "host",
            "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
        },
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "8.8.8.8",
            "name": "h_8.8.8.8",
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        }
    ],
    "res_obj": {
        "data": {
            "from": 1,
            "objects": [
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "192.168.47.1",
                    "name": "h_192.168.47.1",
                    "type": "host",
                    "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
                },
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "8.8.8.8",
                    "name": "h_8.8.8.8",
                    "type": "host",
                    "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
                }
            ],
            "to": 2,
            "total": 2
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

Procesarea rezultatelor apelurilor API

După aceasta puteți utiliza variabilele și metodele clasei APIRăspuns(atât în ​​interiorul managerului de context, cât și în exterior). La clasa APIRăspuns Sunt predefinite 4 metode și 5 variabile, ne vom opri mai detaliat asupra celor mai importante.

Simplificarea API-ului Check Point cu SDK-ul Python

succes

Pentru început, ar fi o idee bună să vă asigurați că apelul API a avut succes și a returnat un rezultat. Există o metodă pentru asta succes:

In [49]: api_versions.success                                                   
Out[49]: True

Returnează True dacă apelul API a avut succes (codul de răspuns - 200) și False dacă nu a reușit (orice alt cod de răspuns). Este convenabil să utilizați imediat după un apel API pentru a afișa informații diferite în funcție de codul de răspuns.

if api_ver.success: 
    print(api_versions.data) 
else: 
    print(api_versions.err_message) 

cod de stare

Returnează codul de răspuns după ce a fost efectuat un apel API.

In [62]: api_versions.status_code                                               
Out[62]: 400

Coduri de răspuns posibile: 200,400,401,403,404,409,500,501.

set_success_status

În acest caz, poate fi necesară modificarea valorii stării de succes. Din punct de vedere tehnic, puteți pune orice acolo, chiar și un șir obișnuit. Dar un exemplu real ar fi resetarea acestui parametru la Fals în anumite condiții însoțitoare. Mai jos, acordați atenție exemplului când există sarcini care rulează pe serverul de management, dar vom considera această solicitare nereușită (vom seta variabila de succes la Fals, în ciuda faptului că apelul API a avut succes și a returnat codul 200).

for task in task_result.data["tasks"]:
    if task["status"] == "failed" or task["status"] == "partially succeeded":
        task_result.set_success_status(False)
        break

raspuns()

Metoda de răspuns vă permite să vizualizați dicționarul cu codul de răspuns (status_code) și corpul răspunsului (body).

In [94]: api_versions.response()                                                
Out[94]: 
{'status_code': 200,
 'data': {'current-version': '1.6',
  'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}}

de date

Vă permite să vedeți numai corpul răspunsului (corpul) fără informații inutile.

In [93]: api_versions.data                                                      
Out[93]: 
{'current-version': '1.6',
 'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}

mesaj de eroare

Aceste informații sunt disponibile numai atunci când a apărut o eroare în timpul procesării solicitării API (codul de răspuns nu 200). Exemplu de ieșire

In [107]: api_versions.error_message                                            
Out[107]: 'code: generic_err_invalid_parameter_namenmessage: Unrecognized parameter [1]n'

Exemple utile

Următoarele sunt exemple care utilizează apelurile API care au fost adăugate în Management API 1.6.

Mai întâi, să vedem cum funcționează apelurile add-host и add-address-range. Să presupunem că trebuie să creăm toate adresele IP ale subrețelei 192.168.0.0/24, din care ultimul octet este 5, ca obiecte de tip gazdă și să scriem toate celelalte adrese IP ca obiecte de tipul intervalului de adrese. În acest caz, excludeți adresa de subrețea și adresa de difuzare.

Deci, mai jos este un script care rezolvă această problemă și creează 50 de obiecte de tip gazdă și 51 de obiecte de tip interval de adrese. Pentru a rezolva problema, sunt necesare 101 apeluri API (fără a lua în calcul apelul de publicare final). De asemenea, folosind modulul timeit, calculăm timpul necesar pentru a executa scriptul până când modificările sunt publicate.

Script folosind add-host și add-address-range

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

first_ip = 1
last_ip = 4

client_args = APIClientArgs(server="192.168.47.240")

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     for ip in range(5,255,5):
         add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
     while last_ip < 255:
         add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
         first_ip+=5
         last_ip+=5
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

În mediul meu de laborator, acest script durează între 30 și 50 de secunde pentru a se executa, în funcție de încărcarea serverului de management.

Acum să vedem cum să rezolvăm aceeași problemă folosind un apel API adaugă-obiecte-lot, suport pentru care a fost adăugat în versiunea API 1.6. Acest apel vă permite să creați mai multe obiecte simultan într-o singură solicitare API. Mai mult, acestea pot fi obiecte de diferite tipuri (de exemplu, gazde, subrețele și intervale de adrese). Astfel, sarcina noastră poate fi rezolvată în cadrul unui apel API.

Script folosind add-objects-batch

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
    objects_list_ip.append(data)
    
first_ip = 1
last_ip = 4


while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}


with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Și rularea acestui script în mediul meu de laborator durează de la 3 la 7 secunde, în funcție de încărcarea serverului de management. Adică, în medie, pe 101 obiecte API, un apel de tip lot rulează de 10 ori mai repede. Pe un număr mai mare de obiecte diferența va fi și mai impresionantă.

Acum să vedem cum să lucrăm cu set-obiecte-lot. Folosind acest apel API, putem schimba în bloc orice parametru. Să setăm prima jumătate a adreselor din exemplul anterior (până la .124 de gazde și de asemenea intervale) la culoarea sienna și să atribuim culoarea kaki celei de-a doua jumătate a adreselor.

Schimbarea culorii obiectelor create în exemplul anterior

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []

for ip in range(5,125,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
    objects_list_ip_first.append(data)
    
for ip in range(125,255,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
    objects_list_ip_second.append(data)
    
first_ip = 1
last_ip = 4
while last_ip < 125:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
    objects_list_range_first.append(data)
    first_ip+=5
    last_ip+=5
    
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
    objects_list_range_second.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch_first  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_first
}, {
    "type" : "address-range",
    "list" : objects_list_range_first
  }]
}

data_for_batch_second  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_second
}, {
    "type" : "address-range",
    "list" : objects_list_range_second
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 
     set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
     set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
     publish = client.api_call("publish")

Puteți șterge mai multe obiecte într-un apel API folosind şterge-obiecte-lot. Acum să ne uităm la un exemplu de cod care șterge toate gazdele create anterior prin adaugă-obiecte-lot.

Ștergerea obiectelor folosind delete-objects-batch

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}'}
    objects_list_ip.append(data)

first_ip = 1
last_ip = 4
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
     publish = client.api_call("publish")

print(delete_objects_batch.data)

Toate funcțiile care apar în noile versiuni ale software-ului Check Point primesc imediat apeluri API. Astfel, în R80.40 au apărut astfel de „funcții” precum Revenire la revizuire și Sarcină inteligentă, iar apelurile API corespunzătoare au fost pregătite imediat pentru ele. În plus, toate funcționalitățile la trecerea de la consolele vechi în modul Politică unificată dobândesc, de asemenea, suport API. De exemplu, actualizarea mult așteptată în versiunea software R80.40 a fost mutarea politicii de inspecție HTTPS din modul Legacy în modul Politică unificată, iar această funcționalitate a primit imediat apeluri API. Iată un exemplu de cod care adaugă o regulă la poziția de sus a politicii de inspecție HTTPS care exclude 3 categorii de la inspecție (Sănătate, Finanțe, Servicii guvernamentale), cărora le este interzisă inspecția în conformitate cu legea într-o serie de țări.

Adăugați o regulă la politica de inspecție HTTPS

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

data = {
  "layer" : "Default Layer",
  "position" : "top",
  "name" : "Legal Requirements",
  "action": "bypass",
  "site-category": ["Health", "Government / Military", "Financial Services"]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_https_rule = client.api_call("add-https-rule", data)
     publish = client.api_call("publish")

Rularea scripturilor Python pe serverul de management Check Point

Totul este la fel README.md conține informații despre cum să rulați scripturi Python direct de pe serverul de control. Acest lucru poate fi convenabil atunci când nu vă puteți conecta la serverul API de pe o altă mașină. Am înregistrat un videoclip de șase minute în care mă uit la instalarea modulului cpapi și caracteristici de rulare a scripturilor Python pe serverul de control. De exemplu, se rulează un script care automatizează configurarea unui nou gateway pentru o sarcină precum auditarea rețelei Verificare de securitate. Printre caracteristicile cu care a trebuit să mă ocup: funcția nu a apărut încă în Python 2.7 intrare, deci pentru a procesa informațiile pe care utilizatorul le introduce, se folosește o funcție raw_input. În caz contrar, codul este același ca pentru lansarea de pe alte mașini, doar că este mai convenabil să utilizați funcția login_as_root, pentru a nu specifica din nou propriul nume de utilizator, parola și adresa IP a serverului de management.

Rulează video

Script pentru configurarea rapidă a Security CheckUp

from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs

def main():
    with APIClient() as client:
       # if client.check_fingerprint() is False:
       #     print("Could not get the server's fingerprint - Check connectivity with the server.")
       #     exit(1)
        login_res = client.login_as_root()

        if login_res.success is False:
            print("Login failed:n{}".format(login_res.error_message))
            exit(1)

        gw_name = raw_input("Enter the gateway name:")
        gw_ip = raw_input("Enter the gateway IP address:")
        if sys.stdin.isatty():
            sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
        else:
            print("Attention! Your password will be shown on the screen!")
            sic = raw_input("Enter one-time password for the gateway(SIC): ")
        version = raw_input("Enter the gateway version(like RXX.YY):")
        add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
        if add_gw.success and add_gw.data['sic-state'] != "communicating":
            print("Secure connection with the gateway hasn't established!")
            exit(1)
        elif add_gw.success:
            print("The gateway was added successfully.")
            gw_uid = add_gw.data['uid']
            gw_name = add_gw.data['name']
        else:
            print("Failed to add the gateway - {}".format(add_gw.error_message))
            exit(1)

        change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
        if change_policy.success:
            print("The policy has been changed successfully")
        else:
            print("Failed to change the policy- {}".format(change_policy.error_message))
        change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
        if change_rule.success:
            print("The cleanup rule has been changed successfully")
        else:
            print("Failed to change the cleanup rule- {}".format(change_rule.error_message))

        # publish the result
        publish_res = client.api_call("publish", {})
        if publish_res.success:
            print("The changes were published successfully.")
        else:
                print("Failed to publish the changes - {}".format(install_tp_policy.error_message))

        install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true',  "threat-prevention" : 'false', "targets" : gw_uid})
        if install_access_policy.success:
            print("The access policy has been installed")
        else:
                print("Failed to install access policy - {}".format(install_tp_policy.error_message))

        install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false',  "threat-prevention" : 'true', "targets" : gw_uid})
        if install_tp_policy.success:
            print("The threat prevention policy has been installed")
        else:
            print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
        
        # add passwords and passphrases to dictionary
        with open('additional_pass.conf') as f:
            line_num = 0
            for line in f:
                line_num += 1
                add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
                if add_password_dictionary.success:
                    print("The password dictionary line {} was added successfully".format(line_num))
                else:
                    print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))

main()

Un exemplu de fișier cu un dicționar de parole additional_pass.conf
{
"passwords" : ["malware","malicious","infected","Infected"],
"phrases" : ["password","Password","Pass","pass","codigo","key","pwd","пароль","Пароль","Ключ","ключ","шифр","Шифр"]
}

Concluzie

Acest articol examinează numai posibilitățile de bază de muncă Python SDK și modul cpapi(după cum probabil ați ghicit, acestea sunt de fapt sinonime), iar studiind codul din acest modul veți descoperi și mai multe oportunități de a lucra cu el. Este posibil să doriți să îl completați cu propriile clase, funcții, metode și variabile. Puteți oricând să vă împărtășiți munca și să vizualizați alte scripturi pentru Check Point în secțiune CodeHub in comunitate CheckMates, care reunește atât dezvoltatorii de produse, cât și utilizatorii.

Codare fericită și mulțumesc pentru citit până la sfârșit!

Sursa: www.habr.com

Cumpărați găzduire de încredere pentru site-uri cu protecție DDoS, servere VPS VDS 🔥 Cumpără găzduire web fiabilă cu protecție DDoS, servere VPS VDS | ProHoster