Semplificazione dell'API Check Point con Python SDK

Semplificazione dell'API Check Point con Python SDKTutta la potenza dell'interazione con le API si rivela se utilizzata insieme al codice del programma, quando diventa possibile generare dinamicamente richieste API e strumenti per analizzare le risposte API. Tuttavia, rimane ancora impercettibile Kit di sviluppo software Python (di seguito denominato Python SDK) per API di gestione dei punti di controllo, ma invano. Semplifica notevolmente la vita degli sviluppatori e degli appassionati di automazione. Python ha guadagnato un'enorme popolarità ultimamente e ho deciso di colmare la lacuna e rivedere le caratteristiche principali. Kit di sviluppo Python API Check Point. Questo articolo costituisce un'eccellente aggiunta a un altro articolo su Habré API Check Point R80.10. Gestione tramite CLI, script e altro. Vedremo come scrivere script utilizzando Python SDK e daremo uno sguardo più da vicino alla nuova funzionalità Management API nella versione 1.6 (supportata a partire dalla R80.40). Per comprendere l'articolo, avrai bisogno di conoscenze di base su come lavorare con API e Python.

Check Point sta sviluppando attivamente le API e al momento sono state rilasciate:

L'SDK Python attualmente supporta solo l'interazione con l'API di gestione e API Gaia. In questo modulo esamineremo le classi, i metodi e le variabili più importanti.

Semplificazione dell'API Check Point con Python SDK

Installazione del modulo

modulo cpapi si installa rapidamente e facilmente da repository ufficiale di Check Point su github via seme. Le istruzioni dettagliate per l'installazione sono disponibili in README.md. Questo modulo è adattato per funzionare con le versioni Python 2.7 e 3.7. In questo articolo verranno forniti esempi utilizzando Python 3.7. Tuttavia, Python SDK può essere eseguito direttamente da Check Point Management Server (Smart Management), ma supporta solo Python 2.7, quindi l'ultima sezione fornirà il codice per la versione 2.7. Subito dopo aver installato il modulo, consiglio di guardare gli esempi nelle directory esempi_python2 и esempi_python3.

Guida introduttiva

Per poter lavorare con i componenti del modulo cpapi, dobbiamo importare dal modulo cpapi almeno due classi obbligatorie:

APIClient и APIClientArgs

from cpapi import APIClient, APIClientArgs

classe APIClientArgs è responsabile dei parametri di connessione al server API e della classe APIClient è responsabile dell'interazione con l'API.

Determinazione dei parametri di connessione

Per definire vari parametri per la connessione all'API, è necessario creare un'istanza della classe APIClientArgs. In linea di principio i suoi parametri sono predefiniti e non è necessario specificarli durante l'esecuzione dello script sul server di controllo.

client_args = APIClientArgs()

Ma quando si esegue su un host di terze parti, è necessario specificare almeno l'indirizzo IP o il nome host del server API (noto anche come server di gestione). Nell'esempio seguente definiamo il parametro di connessione al server e gli assegniamo l'indirizzo IP del server di gestione come stringa.

client_args = APIClientArgs(server='192.168.47.241')

Diamo un'occhiata a tutti i parametri e ai loro valori predefiniti che possono essere utilizzati durante la connessione al server API:

Argomenti del metodo __init__ della classe APIClientArgs

class APIClientArgs:
    """
    This class provides arguments for APIClient configuration.
    All the arguments are configured with their default values.
    """

    # port is set to None by default, but it gets replaced with 443 if not specified
    # context possible values - web_api (default) or gaia_api
    def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
                 api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
                 api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
        self.port = port
        # management server fingerprint
        self.fingerprint = fingerprint
        # session-id.
        self.sid = sid
        # management server name or IP-address
        self.server = server
        # debug level
        self.http_debug_level = http_debug_level
        # an array with all the api calls (for debug purposes)
        self.api_calls = api_calls if api_calls else []
        # name of debug file. If left empty, debug data will not be saved to disk.
        self.debug_file = debug_file
        # HTTP proxy server address (without "http://")
        self.proxy_host = proxy_host
        # HTTP proxy port
        self.proxy_port = proxy_port
        # Management server's API version
        self.api_version = api_version
        # Indicates that the client should not check the server's certificate
        self.unsafe = unsafe
        # Indicates that the client should automatically accept and save the server's certificate
        self.unsafe_auto_accept = unsafe_auto_accept
        # The context of using the client - defaults to web_api
        self.context = context

Credo che gli argomenti che possono essere utilizzati nelle istanze della classe APIClientArgs siano intuitivi per gli amministratori di Check Point e non richiedano commenti aggiuntivi.

Connessione tramite APIClient e gestore del contesto

classe APIClient Il modo più conveniente per usarlo è tramite il gestore del contesto. Tutto ciò che deve essere passato a un'istanza della classe APIClient sono i parametri di connessione definiti nel passaggio precedente.

with APIClient(client_args) as client:

Il gestore del contesto non effettuerà automaticamente una chiamata di accesso al server API, ma effettuerà una chiamata di disconnessione all'uscita da esso. Se per qualche motivo non è richiesto il logout dopo aver finito di lavorare con le chiamate API, è necessario iniziare a lavorare senza utilizzare il gestore contesto:

client = APIClient(clieng_args)

Controllo della connessione

Il modo più semplice per verificare se la connessione soddisfa i parametri specificati è utilizzare il metodo controlla_impronta digitale. Se la verifica della somma hash sha1 per l'impronta digitale del certificato API del server fallisce (il metodo restituito Falso), allora questo è solitamente causato da problemi di connessione e possiamo interrompere l'esecuzione del programma (o dare all'utente la possibilità di correggere i dati di connessione):

    if client.check_fingerprint() is False:
        print("Could not get the server's fingerprint - Check connectivity with the server.")
        exit(1)

Si prega di notare che in futuro la classe APIClient controllerà ogni chiamata API (metodi api_call и api_query, ne parleremo ancora un po') certificato di impronta digitale sha1 sul server API. Ma se, durante il controllo dell'impronta digitale sha1 del certificato del server API, viene rilevato un errore (il certificato è sconosciuto o è stato modificato), il metodo controlla_impronta digitale fornirà l'opportunità di aggiungere/modificare automaticamente le informazioni al riguardo sul computer locale. Questo controllo può essere disabilitato completamente (ma questo può essere consigliato solo se gli script vengono eseguiti sul server API stesso, quando ci si connette a 127.0.0.1), utilizzando l'argomento APIClientArgs - unsafe_auto_accept (vedere ulteriori informazioni su APIClientArgs in precedenza in "Definizione dei parametri di connessione").

client_args = APIClientArgs(unsafe_auto_accept=True)

Accedi al server API

У APIClient esistono fino a 3 metodi per accedere al server API e ognuno di essi ne comprende il significato sid(session-id), che viene utilizzato automaticamente in ogni successiva chiamata API nell'intestazione (il nome nell'intestazione di questo parametro è X-chkp-sid), quindi non è necessario elaborare ulteriormente questo parametro.

metodo di accesso

Opzione che utilizza login e password (nell'esempio, il nome utente admin e la password 1q2w3e vengono passati come argomenti posizionali):

     login = client.login('admin', '1q2w3e')  

Ulteriori parametri opzionali sono disponibili anche nel metodo di accesso; ecco i loro nomi e valori predefiniti:

continue_last_session=False, domain=None, read_only=False, payload=None

Metodo Login_with_api_key

Opzione tramite chiave API (supportata a partire dalla versione gestionale R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" questo è il valore della chiave API per uno degli utenti sul server di gestione con il metodo di autorizzazione della chiave API):

     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 

Nel metodo login_con_api_key sono disponibili gli stessi parametri opzionali del metodo accesso.

metodo login_as_root

Opzione per accedere a un computer locale con un server API:

     login = client.login_as_root()

Sono disponibili solo due parametri facoltativi per questo metodo:

domain=None, payload=None

E infine l'API si chiama da sola

Abbiamo due opzioni per effettuare chiamate API tramite metodi api_call и api_query. Scopriamo qual è la differenza tra loro.

api_call

Questo metodo è applicabile a qualsiasi chiamata. Se necessario, dobbiamo passare l'ultima parte per la chiamata API e il payload nel corpo della richiesta. Se il carico utile è vuoto, non può essere trasferito affatto:

api_versions = client.api_call('show-api-versions') 

Output per questa richiesta sotto il taglio:

In [23]: api_versions                                                           
Out[23]: 
APIResponse({
    "data": {
        "current-version": "1.6",
        "supported-versions": [
            "1",
            "1.1",
            "1.2",
            "1.3",
            "1.4",
            "1.5",
            "1.6"
        ]
    },
    "res_obj": {
        "data": {
            "current-version": "1.6",
            "supported-versions": [
                "1",
                "1.1",
                "1.2",
                "1.3",
                "1.4",
                "1.5",
                "1.6"
            ]
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})
show_host = client.api_call('show-host', {'name' : 'h_8.8.8.8'})

Output per questa richiesta sotto il taglio:

In [25]: show_host                                                              
Out[25]: 
APIResponse({
    "data": {
        "color": "black",
        "comments": "",
        "domain": {
            "domain-type": "domain",
            "name": "SMC User",
            "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
        },
        "groups": [],
        "icon": "Objects/host",
        "interfaces": [],
        "ipv4-address": "8.8.8.8",
        "meta-info": {
            "creation-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "creator": "admin",
            "last-modifier": "admin",
            "last-modify-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "lock": "unlocked",
            "validation-state": "ok"
        },
        "name": "h_8.8.8.8",
        "nat-settings": {
            "auto-rule": false
        },
        "read-only": false,
        "tags": [],
        "type": "host",
        "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
    },
    "res_obj": {
        "data": {
            "color": "black",
            "comments": "",
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "groups": [],
            "icon": "Objects/host",
            "interfaces": [],
            "ipv4-address": "8.8.8.8",
            "meta-info": {
                "creation-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "creator": "admin",
                "last-modifier": "admin",
                "last-modify-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "lock": "unlocked",
                "validation-state": "ok"
            },
            "name": "h_8.8.8.8",
            "nat-settings": {
                "auto-rule": false
            },
            "read-only": false,
            "tags": [],
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

api_query

Vorrei subito riservare che questo metodo sia applicabile solo per le chiamate il cui output comporta un offset. Tale inferenza si verifica quando contiene o può contenere una grande quantità di informazioni. Ad esempio, potrebbe trattarsi di una richiesta per un elenco di tutti gli oggetti host creati sul server di gestione. Per tali richieste, l'API restituisce un elenco di 50 oggetti per impostazione predefinita (puoi aumentare il limite a 500 oggetti nella risposta). E per non estrarre le informazioni più volte, modificando il parametro offset nella richiesta API, esiste un metodo api_query che fa questo lavoro automaticamente. Esempi di chiamate in cui è necessario questo metodo: mostra sessioni, mostra host, mostra reti, mostra caratteri jolly, mostra gruppi, mostra intervalli di indirizzi, mostra gateway semplici, mostra cluster semplici, mostra ruoli di accesso, mostra clienti attendibili, pacchetti-spettacolo. In effetti, vediamo parole plurali nel nome di queste chiamate API, quindi queste chiamate saranno più facili da gestire api_query

show_hosts = client.api_query('show-hosts') 

Output per questa richiesta sotto il taglio:

In [21]: show_hosts                                                             
Out[21]: 
APIResponse({
    "data": [
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "192.168.47.1",
            "name": "h_192.168.47.1",
            "type": "host",
            "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
        },
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "8.8.8.8",
            "name": "h_8.8.8.8",
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        }
    ],
    "res_obj": {
        "data": {
            "from": 1,
            "objects": [
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "192.168.47.1",
                    "name": "h_192.168.47.1",
                    "type": "host",
                    "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
                },
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "8.8.8.8",
                    "name": "h_8.8.8.8",
                    "type": "host",
                    "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
                }
            ],
            "to": 2,
            "total": 2
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

Elaborazione dei risultati delle chiamate API

Dopodiché puoi utilizzare le variabili e i metodi della classe APIResponse(sia all'interno del gestore del contesto che all'esterno). Alla lezione APIResponse Sono predefiniti 4 metodi e 5 variabili; ci soffermeremo più in dettaglio su quelle più importanti.

Semplificazione dell'API Check Point con Python SDK

il successo

Per cominciare, sarebbe una buona idea assicurarsi che la chiamata API abbia avuto successo e abbia restituito un risultato. C'è un metodo per questo il successo:

In [49]: api_versions.success                                                   
Out[49]: True

Restituisce True se la chiamata API ha avuto successo (codice di risposta - 200) e False se non ha avuto successo (qualsiasi altro codice di risposta). È conveniente utilizzarlo immediatamente dopo una chiamata API per visualizzare informazioni diverse a seconda del codice di risposta.

if api_ver.success: 
    print(api_versions.data) 
else: 
    print(api_versions.err_message) 

codice di stato

Restituisce il codice di risposta dopo che è stata effettuata una chiamata API.

In [62]: api_versions.status_code                                               
Out[62]: 400

Possibili codici di risposta: 200,400,401,403,404,409,500,501.

set_success_status

In questo caso potrebbe essere necessario modificare il valore dello stato di successo. Tecnicamente, puoi metterci qualsiasi cosa, anche una stringa normale. Ma un esempio reale potrebbe essere la reimpostazione di questo parametro su False in determinate condizioni concomitanti. Di seguito, presta attenzione all'esempio in cui ci sono attività in esecuzione sul server di gestione, ma considereremo questa richiesta non riuscita (imposteremo la variabile success su Falso, nonostante il fatto che la chiamata API abbia avuto successo e abbia restituito il codice 200).

for task in task_result.data["tasks"]:
    if task["status"] == "failed" or task["status"] == "partially succeeded":
        task_result.set_success_status(False)
        break

risposta()

Il metodo di risposta consente di visualizzare il dizionario con il codice della risposta (status_code) e il corpo della risposta (body).

In [94]: api_versions.response()                                                
Out[94]: 
{'status_code': 200,
 'data': {'current-version': '1.6',
  'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}}

dati

Permette di vedere solo il corpo della risposta (body) senza informazioni non necessarie.

In [93]: api_versions.data                                                      
Out[93]: 
{'current-version': '1.6',
 'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}

messaggio di errore

Queste informazioni sono disponibili solo quando si è verificato un errore durante l'elaborazione della richiesta API (codice di risposta no 200). Uscita di esempio

In [107]: api_versions.error_message                                            
Out[107]: 'code: generic_err_invalid_parameter_namenmessage: Unrecognized parameter [1]n'

Esempi utili

Di seguito sono riportati esempi che utilizzano le chiamate API aggiunte in Management API 1.6.

Innanzitutto, diamo un'occhiata a come funzionano le chiamate aggiungi-host и aggiungi-intervallo di indirizzi. Diciamo che dobbiamo creare tutti gli indirizzi IP della sottorete 192.168.0.0/24, il cui ultimo ottetto è 5, come oggetti di tipo host, e scrivere tutti gli altri indirizzi IP come oggetti di tipo intervallo di indirizzi. In questo caso, escludere l'indirizzo di sottorete e l'indirizzo di broadcast.

Quindi, di seguito è riportato uno script che risolve questo problema e crea 50 oggetti del tipo host e 51 oggetti del tipo intervallo di indirizzi. Per risolvere il problema sono necessarie 101 chiamate API (senza contare la chiamata di pubblicazione finale). Inoltre, utilizzando il modulo timeit, calcoliamo il tempo necessario per eseguire lo script fino alla pubblicazione delle modifiche.

Script che utilizza add-host e add-address-range

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

first_ip = 1
last_ip = 4

client_args = APIClientArgs(server="192.168.47.240")

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     for ip in range(5,255,5):
         add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
     while last_ip < 255:
         add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
         first_ip+=5
         last_ip+=5
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Nel mio ambiente di laboratorio, l'esecuzione di questo script richiede dai 30 ai 50 secondi, a seconda del carico sul server di gestione.

Vediamo ora come risolvere lo stesso problema utilizzando una chiamata API aggiungi-oggetti-batch, il cui supporto è stato aggiunto nella versione API 1.6. Questa chiamata ti consente di creare più oggetti contemporaneamente in una richiesta API. Inoltre, questi possono essere oggetti di diverso tipo (ad esempio host, sottoreti e intervalli di indirizzi). Pertanto, il nostro compito può essere risolto nell'ambito di una chiamata API.

Script che utilizza add-objects-batch

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
    objects_list_ip.append(data)
    
first_ip = 1
last_ip = 4


while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}


with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Inoltre, l'esecuzione di questo script nel mio ambiente di laboratorio richiede dai 3 ai 7 secondi, a seconda del carico sul server di gestione. Cioè, in media, su 101 oggetti API, una chiamata di tipo batch viene eseguita 10 volte più velocemente. Su un numero maggiore di oggetti la differenza sarà ancora più impressionante.

Ora vediamo come lavorare con set-oggetti-batch. Utilizzando questa chiamata API, possiamo modificare in blocco qualsiasi parametro. Impostiamo la prima metà degli indirizzi dell'esempio precedente (fino a .124 host e anche intervalli) sul colore terra di Siena e assegniamo il colore kaki alla seconda metà degli indirizzi.

Modifica del colore degli oggetti creati nell'esempio precedente

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []

for ip in range(5,125,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
    objects_list_ip_first.append(data)
    
for ip in range(125,255,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
    objects_list_ip_second.append(data)
    
first_ip = 1
last_ip = 4
while last_ip < 125:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
    objects_list_range_first.append(data)
    first_ip+=5
    last_ip+=5
    
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
    objects_list_range_second.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch_first  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_first
}, {
    "type" : "address-range",
    "list" : objects_list_range_first
  }]
}

data_for_batch_second  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_second
}, {
    "type" : "address-range",
    "list" : objects_list_range_second
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 
     set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
     set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
     publish = client.api_call("publish")

Puoi eliminare più oggetti in una chiamata API utilizzando elimina-oggetti-batch. Ora diamo un'occhiata a un esempio di codice che elimina tutti gli host creati in precedenza tramite aggiungi-oggetti-batch.

Eliminazione di oggetti utilizzando delete-objects-batch

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}'}
    objects_list_ip.append(data)

first_ip = 1
last_ip = 4
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
     publish = client.api_call("publish")

print(delete_objects_batch.data)

Tutte le funzioni presenti nelle nuove versioni del software Check Point acquisiscono immediatamente chiamate API. Pertanto, nella R80.40 sono apparse "funzionalità" come Ripristina revisione e Smart Task e per esse sono state immediatamente preparate le corrispondenti chiamate API. Inoltre, tutte le funzionalità quando si passa dalle console legacy alla modalità Unified Policy acquisiscono anche il supporto API. Ad esempio, l'aggiornamento tanto atteso nella versione software R80.40 è stato lo spostamento della policy di ispezione HTTPS dalla modalità Legacy alla modalità Unified Policy e questa funzionalità ha immediatamente ricevuto chiamate API. Ecco un esempio di codice che aggiunge una regola nella prima posizione della policy di ispezione HTTPS che esclude 3 categorie dall'ispezione (salute, finanza, servizi governativi), a cui è vietata l'ispezione in conformità con la legge in diversi paesi.

Aggiungi una regola alla policy di ispezione HTTPS

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

data = {
  "layer" : "Default Layer",
  "position" : "top",
  "name" : "Legal Requirements",
  "action": "bypass",
  "site-category": ["Health", "Government / Military", "Financial Services"]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_https_rule = client.api_call("add-https-rule", data)
     publish = client.api_call("publish")

Esecuzione di script Python sul server di gestione Check Point

Tutto è lo stesso README.md contiene informazioni su come eseguire gli script Python direttamente dal server di controllo. Ciò può essere utile quando non riesci a connetterti al server API da un'altra macchina. Ho registrato un video di sei minuti in cui guardo l'installazione del modulo cpapi e funzionalità di esecuzione di script Python sul server di controllo. Ad esempio, viene eseguito uno script che automatizza la configurazione di un nuovo gateway per un'attività come il controllo della rete Controllo di sicurezza. Tra le caratteristiche con cui ho dovuto occuparmi: la funzione non è ancora apparsa in Python 2.7 ingresso, quindi per elaborare le informazioni che l'utente inserisce, viene utilizzata una funzione input_grezzo. Per il resto il codice è lo stesso dell'avvio da altre macchine, solo che è più comodo utilizzare la funzione login_come_root, in modo da non specificare nuovamente il proprio nome utente, password e indirizzo IP del server di gestione.

Script per la configurazione rapida del Controllo sicurezza

from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs

def main():
    with APIClient() as client:
       # if client.check_fingerprint() is False:
       #     print("Could not get the server's fingerprint - Check connectivity with the server.")
       #     exit(1)
        login_res = client.login_as_root()

        if login_res.success is False:
            print("Login failed:n{}".format(login_res.error_message))
            exit(1)

        gw_name = raw_input("Enter the gateway name:")
        gw_ip = raw_input("Enter the gateway IP address:")
        if sys.stdin.isatty():
            sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
        else:
            print("Attention! Your password will be shown on the screen!")
            sic = raw_input("Enter one-time password for the gateway(SIC): ")
        version = raw_input("Enter the gateway version(like RXX.YY):")
        add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
        if add_gw.success and add_gw.data['sic-state'] != "communicating":
            print("Secure connection with the gateway hasn't established!")
            exit(1)
        elif add_gw.success:
            print("The gateway was added successfully.")
            gw_uid = add_gw.data['uid']
            gw_name = add_gw.data['name']
        else:
            print("Failed to add the gateway - {}".format(add_gw.error_message))
            exit(1)

        change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
        if change_policy.success:
            print("The policy has been changed successfully")
        else:
            print("Failed to change the policy- {}".format(change_policy.error_message))
        change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
        if change_rule.success:
            print("The cleanup rule has been changed successfully")
        else:
            print("Failed to change the cleanup rule- {}".format(change_rule.error_message))

        # publish the result
        publish_res = client.api_call("publish", {})
        if publish_res.success:
            print("The changes were published successfully.")
        else:
                print("Failed to publish the changes - {}".format(install_tp_policy.error_message))

        install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true',  "threat-prevention" : 'false', "targets" : gw_uid})
        if install_access_policy.success:
            print("The access policy has been installed")
        else:
                print("Failed to install access policy - {}".format(install_tp_policy.error_message))

        install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false',  "threat-prevention" : 'true', "targets" : gw_uid})
        if install_tp_policy.success:
            print("The threat prevention policy has been installed")
        else:
            print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
        
        # add passwords and passphrases to dictionary
        with open('additional_pass.conf') as f:
            line_num = 0
            for line in f:
                line_num += 1
                add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
                if add_password_dictionary.success:
                    print("The password dictionary line {} was added successfully".format(line_num))
                else:
                    print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))

main()

Un file di esempio con un dizionario delle password external_pass.conf
{
"passwords" : ["malware","malicious","infected","Infected"],
"phrases" : ["password","Password","Pass","pass","codigo","key","pwd","пароль","Пароль","Ключ","ключ","шифр","Шифр"] }

conclusione

Questo articolo esamina solo le possibilità di lavoro di base SDK Python e modulo cpapi(come avrai intuito, questi sono in realtà sinonimi) e studiando il codice in questo modulo scoprirai ancora più opportunità per lavorarci. È possibile che tu voglia integrarlo con le tue classi, funzioni, metodi e variabili. Puoi sempre condividere il tuo lavoro e visualizzare altri script per Check Point nella sezione CodiceHub nella comunità CheckMates, che riunisce sia gli sviluppatori di prodotti che gli utenti.

Buona programmazione e grazie per aver letto fino alla fine!

Fonte: habr.com

Aggiungi un commento