Απλοποίηση του Check Point API με το Python SDK

Απλοποίηση του Check Point API με το Python SDKΗ πλήρης δύναμη της αλληλεπίδρασης με τα API αποκαλύπτεται όταν χρησιμοποιείται μαζί με τον κώδικα προγράμματος, όταν καθίσταται δυνατή η δυναμική δημιουργία αιτημάτων API και εργαλείων για την ανάλυση των αποκρίσεων API. Ωστόσο, εξακολουθεί να παραμένει απαρατήρητο Κιτ ανάπτυξης λογισμικού Python (εφεξής θα αναφέρεται ως Python SDK) για Check Point Management API, αλλά μάταια. Απλοποιεί σημαντικά τη ζωή των προγραμματιστών και των λάτρεις του αυτοματισμού. Η Python έχει αποκτήσει τεράστια δημοτικότητα τον τελευταίο καιρό και αποφάσισα να καλύψω το κενό και να αναθεωρήσω τα κύρια χαρακτηριστικά. Check Point API Python Development Kit. Αυτό το άρθρο χρησιμεύει ως εξαιρετική προσθήκη σε ένα άλλο άρθρο για το Habré Check Point R80.10 API. Διαχείριση μέσω CLI, scripts και άλλα. Θα εξετάσουμε πώς να γράφουμε σενάρια χρησιμοποιώντας το Python SDK και θα ρίξουμε μια πιο προσεκτική ματιά στη νέα λειτουργικότητα του Management API στην έκδοση 1.6 (υποστηρίζεται από το R80.40). Για να κατανοήσετε το άρθρο, θα χρειαστείτε βασικές γνώσεις εργασίας με API και Python.

Το Check Point αναπτύσσει ενεργά το API και αυτή τη στιγμή έχουν κυκλοφορήσει τα ακόλουθα:

Το Python SDK προς το παρόν υποστηρίζει μόνο την αλληλεπίδραση με το Management API και Gaia API. Θα εξετάσουμε τις πιο σημαντικές κλάσεις, μεθόδους και μεταβλητές σε αυτήν την ενότητα.

Απλοποίηση του Check Point API με το Python SDK

Εγκατάσταση μονάδας

Ενότητα cpapi εγκαθίσταται γρήγορα και εύκολα από επίσημο αποθετήριο Check Point στο github μέσω κουκούτσι. Λεπτομερείς οδηγίες εγκατάστασης είναι διαθέσιμες στο README.md. Αυτή η ενότητα είναι προσαρμοσμένη για να λειτουργεί με τις εκδόσεις Python 2.7 και 3.7. Σε αυτό το άρθρο, θα δοθούν παραδείγματα χρησιμοποιώντας την Python 3.7. Ωστόσο, το Python SDK μπορεί να εκτελεστεί απευθείας από τον Check Point Management Server (Smart Management), αλλά υποστηρίζει μόνο την Python 2.7, επομένως η τελευταία ενότητα θα παρέχει κώδικα για την έκδοση 2.7. Αμέσως μετά την εγκατάσταση της λειτουργικής μονάδας, προτείνω να δείτε τα παραδείγματα στους καταλόγους παραδείγματα_python2 и παραδείγματα_python3.

Ξεκινώντας

Για να μπορέσουμε να εργαστούμε με τα στοιχεία της μονάδας cpapi, πρέπει να κάνουμε εισαγωγή από τη λειτουργική μονάδα cpapi τουλάχιστον δύο απαιτούμενες τάξεις:

APICclient и APIClientArgs

from cpapi import APIClient, APIClientArgs

Κατηγορία APIClientArgs είναι υπεύθυνος για τις παραμέτρους σύνδεσης με τον διακομιστή API και την κλάση APICclient είναι υπεύθυνος για την αλληλεπίδραση με το API.

Προσδιορισμός παραμέτρων σύνδεσης

Για να ορίσετε διάφορες παραμέτρους για τη σύνδεση στο API, πρέπει να δημιουργήσετε μια παρουσία της κλάσης APIClientArgs. Κατ 'αρχήν, οι παράμετροί του είναι προκαθορισμένες και κατά την εκτέλεση του σεναρίου στον διακομιστή ελέγχου, δεν χρειάζεται να καθοριστούν.

client_args = APIClientArgs()

Αλλά όταν εκτελείται σε έναν κεντρικό υπολογιστή τρίτου κατασκευαστή, πρέπει να καθορίσετε τουλάχιστον τη διεύθυνση IP ή το όνομα κεντρικού υπολογιστή του διακομιστή API (γνωστός και ως διακομιστής διαχείρισης). Στο παρακάτω παράδειγμα, ορίζουμε την παράμετρο σύνδεσης διακομιστή και της εκχωρούμε τη διεύθυνση IP του διακομιστή διαχείρισης ως συμβολοσειρά.

client_args = APIClientArgs(server='192.168.47.241')

Ας δούμε όλες τις παραμέτρους και τις προεπιλεγμένες τιμές τους που μπορούν να χρησιμοποιηθούν κατά τη σύνδεση στον διακομιστή API:

Ορίσματα της μεθόδου __init__ της κλάσης APIClientArgs

class APIClientArgs:
    """
    This class provides arguments for APIClient configuration.
    All the arguments are configured with their default values.
    """

    # port is set to None by default, but it gets replaced with 443 if not specified
    # context possible values - web_api (default) or gaia_api
    def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
                 api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
                 api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
        self.port = port
        # management server fingerprint
        self.fingerprint = fingerprint
        # session-id.
        self.sid = sid
        # management server name or IP-address
        self.server = server
        # debug level
        self.http_debug_level = http_debug_level
        # an array with all the api calls (for debug purposes)
        self.api_calls = api_calls if api_calls else []
        # name of debug file. If left empty, debug data will not be saved to disk.
        self.debug_file = debug_file
        # HTTP proxy server address (without "http://")
        self.proxy_host = proxy_host
        # HTTP proxy port
        self.proxy_port = proxy_port
        # Management server's API version
        self.api_version = api_version
        # Indicates that the client should not check the server's certificate
        self.unsafe = unsafe
        # Indicates that the client should automatically accept and save the server's certificate
        self.unsafe_auto_accept = unsafe_auto_accept
        # The context of using the client - defaults to web_api
        self.context = context

Πιστεύω ότι τα ορίσματα που μπορούν να χρησιμοποιηθούν σε παρουσίες της κλάσης APIClientArgs είναι διαισθητικά για τους διαχειριστές του Check Point και δεν απαιτούν πρόσθετα σχόλια.

Σύνδεση μέσω APIClient και διαχείρισης περιβάλλοντος

Κατηγορία APICclient Ο πιο βολικός τρόπος να το χρησιμοποιήσετε είναι μέσω του διαχειριστή περιβάλλοντος. Το μόνο που χρειάζεται να μεταβιβαστεί σε μια παρουσία της κλάσης APIClient είναι οι παράμετροι σύνδεσης που ορίστηκαν στο προηγούμενο βήμα.

with APIClient(client_args) as client:

Ο διαχειριστής περιβάλλοντος δεν θα πραγματοποιήσει αυτόματα κλήση σύνδεσης στον διακομιστή API, αλλά θα πραγματοποιήσει κλήση αποσύνδεσης κατά την έξοδο από αυτόν. Εάν για κάποιο λόγο δεν απαιτείται αποσύνδεση μετά την ολοκλήρωση της εργασίας με κλήσεις API, πρέπει να ξεκινήσετε να εργάζεστε χωρίς να χρησιμοποιείτε τη διαχείριση περιβάλλοντος:

client = APIClient(clieng_args)

Δοκιμή σύνδεσης

Ο ευκολότερος τρόπος για να ελέγξετε εάν η σύνδεση πληροί τις καθορισμένες παραμέτρους είναι να χρησιμοποιήσετε τη μέθοδο check_fingerprint. Εάν η επαλήθευση του αθροίσματος κατακερματισμού sha1 για το δακτυλικό αποτύπωμα του πιστοποιητικού API διακομιστή αποτύχει (επιστράφηκε η μέθοδος Ψευδής), τότε αυτό συνήθως προκαλείται από προβλήματα σύνδεσης και μπορούμε να σταματήσουμε την εκτέλεση του προγράμματος (ή να δώσουμε στον χρήστη την ευκαιρία να διορθώσει τα δεδομένα σύνδεσης):

    if client.check_fingerprint() is False:
        print("Could not get the server's fingerprint - Check connectivity with the server.")
        exit(1)

Σημειώστε ότι στο μέλλον η τάξη APICclient θα ελέγχει κάθε κλήση API (μέθοδοι api_call и api_query, θα μιλήσουμε για αυτά λίγο περαιτέρω) sha1 πιστοποιητικό δακτυλικών αποτυπωμάτων στον διακομιστή API. Αλλά εάν, κατά τον έλεγχο του δακτυλικού αποτυπώματος sha1 του πιστοποιητικού διακομιστή API, εντοπιστεί σφάλμα (το πιστοποιητικό είναι άγνωστο ή έχει αλλάξει), η μέθοδος check_fingerprint θα παρέχει την ευκαιρία να προσθέσετε/αλλάξετε πληροφορίες σχετικά με αυτό στο τοπικό μηχάνημα αυτόματα. Αυτός ο έλεγχος μπορεί να απενεργοποιηθεί εντελώς (αλλά αυτό μπορεί να προταθεί μόνο εάν εκτελούνται σενάρια στον ίδιο τον διακομιστή API, κατά τη σύνδεση στην 127.0.0.1), χρησιμοποιώντας το όρισμα APIClientArgs - unsafe_auto_accept (δείτε περισσότερα για το APIClientArgs νωρίτερα στην ενότητα «Ορισμός παραμέτρων σύνδεσης»).

client_args = APIClientArgs(unsafe_auto_accept=True)

Συνδεθείτε στον διακομιστή API

У APICclient υπάρχουν έως και 3 μέθοδοι για να συνδεθείτε στον διακομιστή API και καθεμία από αυτές κατανοεί το νόημα sid(session-id), το οποίο χρησιμοποιείται αυτόματα σε κάθε επόμενη κλήση API στην κεφαλίδα (το όνομα στην κεφαλίδα αυτής της παραμέτρου είναι X-chkp-sid), επομένως δεν χρειάζεται περαιτέρω επεξεργασία αυτής της παραμέτρου.

μέθοδος σύνδεσης

Επιλογή με χρήση σύνδεσης και κωδικού πρόσβασης (στο παράδειγμα, το όνομα χρήστη admin και ο κωδικός πρόσβασης 1q2w3e μεταβιβάζονται ως ορίσματα θέσης):

     login = client.login('admin', '1q2w3e')  

Πρόσθετες προαιρετικές παράμετροι είναι επίσης διαθέσιμες στη μέθοδο σύνδεσης· εδώ είναι τα ονόματα και οι προεπιλεγμένες τιμές τους:

continue_last_session=False, domain=None, read_only=False, payload=None

Μέθοδος Login_with_api_key

Επιλογή με χρήση κλειδιού api (υποστηρίζεται ξεκινώντας από την έκδοση διαχείρισης R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" αυτή είναι η τιμή κλειδιού API για έναν από τους χρήστες στον διακομιστή διαχείρισης με τη μέθοδο εξουσιοδότησης κλειδιού API):

     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 

Στη μέθοδο login_with_api_key είναι διαθέσιμες οι ίδιες προαιρετικές παράμετροι όπως στη μέθοδο Σύνδεση.

μέθοδος login_as_root

Επιλογή σύνδεσης σε τοπικό μηχάνημα με διακομιστή API:

     login = client.login_as_root()

Υπάρχουν μόνο δύο προαιρετικές διαθέσιμες παράμετροι για αυτήν τη μέθοδο:

domain=None, payload=None

Και τελικά το API καλεί τον εαυτό τους

Έχουμε δύο επιλογές για να πραγματοποιήσουμε κλήσεις API μέσω μεθόδων api_call и api_query. Ας καταλάβουμε ποια είναι η διαφορά μεταξύ τους.

api_call

Αυτή η μέθοδος ισχύει για οποιεσδήποτε κλήσεις. Πρέπει να περάσουμε το τελευταίο μέρος για την κλήση του api και το ωφέλιμο φορτίο στο σώμα του αιτήματος εάν είναι απαραίτητο. Εάν το ωφέλιμο φορτίο είναι κενό, τότε δεν μπορεί να μεταφερθεί καθόλου:

api_versions = client.api_call('show-api-versions') 

Έξοδος για αυτό το αίτημα κάτω από την περικοπή:

In [23]: api_versions                                                           
Out[23]: 
APIResponse({
    "data": {
        "current-version": "1.6",
        "supported-versions": [
            "1",
            "1.1",
            "1.2",
            "1.3",
            "1.4",
            "1.5",
            "1.6"
        ]
    },
    "res_obj": {
        "data": {
            "current-version": "1.6",
            "supported-versions": [
                "1",
                "1.1",
                "1.2",
                "1.3",
                "1.4",
                "1.5",
                "1.6"
            ]
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})
show_host = client.api_call('show-host', {'name' : 'h_8.8.8.8'})

Έξοδος για αυτό το αίτημα κάτω από την περικοπή:

In [25]: show_host                                                              
Out[25]: 
APIResponse({
    "data": {
        "color": "black",
        "comments": "",
        "domain": {
            "domain-type": "domain",
            "name": "SMC User",
            "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
        },
        "groups": [],
        "icon": "Objects/host",
        "interfaces": [],
        "ipv4-address": "8.8.8.8",
        "meta-info": {
            "creation-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "creator": "admin",
            "last-modifier": "admin",
            "last-modify-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "lock": "unlocked",
            "validation-state": "ok"
        },
        "name": "h_8.8.8.8",
        "nat-settings": {
            "auto-rule": false
        },
        "read-only": false,
        "tags": [],
        "type": "host",
        "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
    },
    "res_obj": {
        "data": {
            "color": "black",
            "comments": "",
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "groups": [],
            "icon": "Objects/host",
            "interfaces": [],
            "ipv4-address": "8.8.8.8",
            "meta-info": {
                "creation-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "creator": "admin",
                "last-modifier": "admin",
                "last-modify-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "lock": "unlocked",
                "validation-state": "ok"
            },
            "name": "h_8.8.8.8",
            "nat-settings": {
                "auto-rule": false
            },
            "read-only": false,
            "tags": [],
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

api_query

Επιτρέψτε μου να κάνω μια κράτηση αμέσως ότι αυτή η μέθοδος ισχύει μόνο για κλήσεις των οποίων η έξοδος περιλαμβάνει μετατόπιση. Ένα τέτοιο συμπέρασμα προκύπτει όταν περιέχει ή μπορεί να περιέχει μεγάλο όγκο πληροφοριών. Για παράδειγμα, αυτό θα μπορούσε να είναι ένα αίτημα για μια λίστα με όλα τα δημιουργημένα αντικείμενα κεντρικού υπολογιστή στον διακομιστή διαχείρισης. Για τέτοια αιτήματα, το API επιστρέφει μια λίστα με 50 αντικείμενα από προεπιλογή (μπορείτε να αυξήσετε το όριο στα 500 αντικείμενα στην απάντηση). Και για να μην τραβήξετε τις πληροφορίες πολλές φορές, αλλάζοντας την παράμετρο offset στο αίτημα API, υπάρχει μια μέθοδος api_query που κάνει αυτό το έργο αυτόματα. Παραδείγματα κλήσεων όπου απαιτείται αυτή η μέθοδος: συνεδρίες εκπομπής, παρουσιαστές εκπομπών, δίκτυα εκπομπής, χαρακτήρες μπαλαντέρ εκπομπών, ομάδες εκπομπής, εύρη διευθύνσεων εκπομπής, απλές πύλες εκπομπής, συμπλέγματα εμφάνισης, πρόσβαση σε ρόλους, προβολή έμπιστων πελατών, εκθέσεις-πακέτα. Στην πραγματικότητα, βλέπουμε πληθυντικές λέξεις στο όνομα αυτών των κλήσεων API, επομένως αυτές οι κλήσεις θα είναι πιο εύκολο να χειριστούν api_query

show_hosts = client.api_query('show-hosts') 

Έξοδος για αυτό το αίτημα κάτω από την περικοπή:

In [21]: show_hosts                                                             
Out[21]: 
APIResponse({
    "data": [
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "192.168.47.1",
            "name": "h_192.168.47.1",
            "type": "host",
            "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
        },
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "8.8.8.8",
            "name": "h_8.8.8.8",
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        }
    ],
    "res_obj": {
        "data": {
            "from": 1,
            "objects": [
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "192.168.47.1",
                    "name": "h_192.168.47.1",
                    "type": "host",
                    "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
                },
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "8.8.8.8",
                    "name": "h_8.8.8.8",
                    "type": "host",
                    "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
                }
            ],
            "to": 2,
            "total": 2
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

Επεξεργασία των αποτελεσμάτων των κλήσεων API

Μετά από αυτό μπορείτε να χρησιμοποιήσετε τις μεταβλητές και τις μεθόδους της τάξης APIresponse(τόσο εντός του διαχειριστή περιβάλλοντος όσο και εκτός). Στην τάξη APIresponse 4 μέθοδοι και 5 μεταβλητές είναι προκαθορισμένες· θα σταθούμε στις πιο σημαντικές με περισσότερες λεπτομέρειες.

Απλοποίηση του Check Point API με το Python SDK

επιτυχία

Αρχικά, θα ήταν καλή ιδέα να βεβαιωθείτε ότι η κλήση API ήταν επιτυχής και επέστρεψε ένα αποτέλεσμα. Υπάρχει μια μέθοδος για αυτό επιτυχία:

In [49]: api_versions.success                                                   
Out[49]: True

Επιστρέφει True εάν η κλήση API ήταν επιτυχής (κωδικός απόκρισης - 200) και False εάν δεν ήταν επιτυχής (οποιοσδήποτε άλλος κωδικός απόκρισης). Είναι βολικό να χρησιμοποιείται αμέσως μετά από μια κλήση API για την εμφάνιση διαφορετικών πληροφοριών ανάλογα με τον κωδικό απόκρισης.

if api_ver.success: 
    print(api_versions.data) 
else: 
    print(api_versions.err_message) 

κωδικός κατάστασης

Επιστρέφει τον κωδικό απόκρισης μετά την πραγματοποίηση μιας κλήσης API.

In [62]: api_versions.status_code                                               
Out[62]: 400

Πιθανοί κωδικοί απάντησης: 200,400,401,403,404,409,500,501.

set_success_status

Σε αυτήν την περίπτωση, μπορεί να χρειαστεί να αλλάξετε την τιμή της κατάστασης επιτυχίας. Τεχνικά, μπορείτε να βάλετε οτιδήποτε, ακόμα και ένα κανονικό κορδόνι. Αλλά ένα πραγματικό παράδειγμα θα ήταν η επαναφορά αυτής της παραμέτρου σε False υπό ορισμένες συνοδευτικές συνθήκες. Παρακάτω, δώστε προσοχή στο παράδειγμα όταν υπάρχουν εργασίες που εκτελούνται στον διακομιστή διαχείρισης, αλλά θα θεωρήσουμε αυτό το αίτημα ανεπιτυχές (θα ορίσουμε τη μεταβλητή επιτυχίας σε Ψευδής, παρά το γεγονός ότι η κλήση API ήταν επιτυχής και επέστρεψε τον κωδικό 200).

for task in task_result.data["tasks"]:
    if task["status"] == "failed" or task["status"] == "partially succeeded":
        task_result.set_success_status(False)
        break

απάντηση()

Η μέθοδος απόκρισης σάς επιτρέπει να προβάλετε το λεξικό με τον κωδικό απόκρισης (status_code) και το σώμα απάντησης (body).

In [94]: api_versions.response()                                                
Out[94]: 
{'status_code': 200,
 'data': {'current-version': '1.6',
  'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}}

ημερομηνία

Σας επιτρέπει να βλέπετε μόνο το σώμα της απάντησης (σώμα) χωρίς περιττές πληροφορίες.

In [93]: api_versions.data                                                      
Out[93]: 
{'current-version': '1.6',
 'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}

μήνυμα λάθους

Αυτές οι πληροφορίες είναι διαθέσιμες μόνο όταν παρουσιάστηκε σφάλμα κατά την επεξεργασία του αιτήματος API (κωδικός απόκρισης όχι 200). Παράδειγμα εξόδου

In [107]: api_versions.error_message                                            
Out[107]: 'code: generic_err_invalid_parameter_namenmessage: Unrecognized parameter [1]n'

Χρήσιμα παραδείγματα

Τα παρακάτω είναι παραδείγματα που χρησιμοποιούν τις κλήσεις API που προστέθηκαν στο Management API 1.6.

Αρχικά, ας δούμε πώς λειτουργούν οι κλήσεις add-host и προσθήκη-διεύθυνση-εύρος. Ας υποθέσουμε ότι πρέπει να δημιουργήσουμε όλες τις διευθύνσεις IP του υποδικτύου 192.168.0.0/24, η τελευταία οκτάδα του οποίου είναι 5, ως αντικείμενα του τύπου κεντρικού υπολογιστή, και να γράψουμε όλες τις άλλες διευθύνσεις IP ως αντικείμενα του τύπου εύρους διευθύνσεων. Σε αυτήν την περίπτωση, εξαιρέστε τη διεύθυνση υποδικτύου και τη διεύθυνση εκπομπής.

Έτσι, παρακάτω είναι ένα σενάριο που επιλύει αυτό το πρόβλημα και δημιουργεί 50 αντικείμενα του τύπου κεντρικού υπολογιστή και 51 αντικείμενα του τύπου εύρους διευθύνσεων. Για να λυθεί το πρόβλημα, απαιτούνται 101 κλήσεις API (χωρίς να υπολογίζεται η τελική κλήση δημοσίευσης). Επίσης, χρησιμοποιώντας την ενότητα timeit, υπολογίζουμε το χρόνο που χρειάζεται για την εκτέλεση του σεναρίου μέχρι να δημοσιευτούν οι αλλαγές.

Σενάριο χρησιμοποιώντας add-host και add-address-range

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

first_ip = 1
last_ip = 4

client_args = APIClientArgs(server="192.168.47.240")

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     for ip in range(5,255,5):
         add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
     while last_ip < 255:
         add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
         first_ip+=5
         last_ip+=5
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Στο περιβάλλον του εργαστηρίου μου, αυτό το σενάριο διαρκεί από 30 έως 50 δευτερόλεπτα για να εκτελεστεί, ανάλογα με το φόρτο του διακομιστή διαχείρισης.

Τώρα ας δούμε πώς να λύσετε το ίδιο πρόβλημα χρησιμοποιώντας μια κλήση API προσθήκη-αντικείμενα-παρτίδα, υποστήριξη για το οποίο προστέθηκε στην έκδοση API 1.6. Αυτή η κλήση σάς επιτρέπει να δημιουργήσετε πολλά αντικείμενα ταυτόχρονα σε ένα αίτημα API. Επιπλέον, αυτά μπορεί να είναι αντικείμενα διαφορετικών τύπων (για παράδειγμα, κεντρικοί υπολογιστές, υποδίκτυα και εύρη διευθύνσεων). Έτσι, η εργασία μας μπορεί να λυθεί στο πλαίσιο μιας κλήσης API.

Σενάριο χρησιμοποιώντας add-objects-batch

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
    objects_list_ip.append(data)
    
first_ip = 1
last_ip = 4


while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}


with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

Και η εκτέλεση αυτού του σεναρίου στο περιβάλλον του εργαστηρίου μου διαρκεί από 3 έως 7 δευτερόλεπτα, ανάλογα με το φορτίο στον διακομιστή διαχείρισης. Δηλαδή, κατά μέσο όρο, σε 101 αντικείμενα API, μια κλήση τύπου δέσμης εκτελείται 10 φορές πιο γρήγορα. Σε μεγαλύτερο αριθμό αντικειμένων η διαφορά θα είναι ακόμα πιο εντυπωσιακή.

Τώρα ας δούμε πώς να εργαστείτε με σύνολο-αντικείμενα-παρτίδα. Χρησιμοποιώντας αυτήν την κλήση API, μπορούμε να αλλάξουμε μαζικά οποιαδήποτε παράμετρο. Ας ορίσουμε το πρώτο μισό των διευθύνσεων από το προηγούμενο παράδειγμα (έως 124 κεντρικούς υπολογιστές και εύρη επίσης) στο χρώμα sienna και ας αντιστοιχίσουμε το χρώμα χακί στο δεύτερο μισό των διευθύνσεων.

Αλλαγή του χρώματος των αντικειμένων που δημιουργήθηκαν στο προηγούμενο παράδειγμα

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []

for ip in range(5,125,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
    objects_list_ip_first.append(data)
    
for ip in range(125,255,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
    objects_list_ip_second.append(data)
    
first_ip = 1
last_ip = 4
while last_ip < 125:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
    objects_list_range_first.append(data)
    first_ip+=5
    last_ip+=5
    
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
    objects_list_range_second.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch_first  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_first
}, {
    "type" : "address-range",
    "list" : objects_list_range_first
  }]
}

data_for_batch_second  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_second
}, {
    "type" : "address-range",
    "list" : objects_list_range_second
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 
     set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
     set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
     publish = client.api_call("publish")

Μπορείτε να διαγράψετε πολλά αντικείμενα σε μία κλήση API χρησιμοποιώντας διαγραφή-αντικείμενα-παρτίδα. Τώρα ας δούμε ένα παράδειγμα κώδικα που διαγράφει όλους τους κεντρικούς υπολογιστές που δημιουργήθηκαν προηγουμένως μέσω προσθήκη-αντικείμενα-παρτίδα.

Διαγραφή αντικειμένων χρησιμοποιώντας delete-objects-batch

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}'}
    objects_list_ip.append(data)

first_ip = 1
last_ip = 4
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
     publish = client.api_call("publish")

print(delete_objects_batch.data)

Όλες οι λειτουργίες που εμφανίζονται σε νέες εκδόσεις του λογισμικού Check Point αποκτούν αμέσως κλήσεις API. Έτσι, στο R80.40 εμφανίστηκαν «χαρακτηριστικά» όπως το Revert to revision και το Smart Task, και αμέσως προετοιμάστηκαν για αυτά αντίστοιχες κλήσεις API. Επιπλέον, όλες οι λειτουργίες κατά τη μετάβαση από κονσόλες παλαιού τύπου στη λειτουργία Ενοποιημένης πολιτικής αποκτούν επίσης υποστήριξη API. Για παράδειγμα, η πολυαναμενόμενη ενημέρωση στην έκδοση λογισμικού R80.40 ήταν η μετακίνηση της πολιτικής επιθεώρησης HTTPS από τη λειτουργία παλαιού τύπου στη λειτουργία Ενοποιημένης πολιτικής και αυτή η λειτουργία έλαβε αμέσως κλήσεις API. Ακολουθεί ένα παράδειγμα κώδικα που προσθέτει έναν κανόνα στην κορυφαία θέση της πολιτικής επιθεώρησης HTTPS που εξαιρεί 3 κατηγορίες από την επιθεώρηση (Υγεία, Οικονομικά, Κυβερνητικές Υπηρεσίες), οι οποίες απαγορεύεται να επιθεωρούνται σύμφωνα με τη νομοθεσία σε ορισμένες χώρες.

Προσθέστε έναν κανόνα στην πολιτική επιθεώρησης HTTPS

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

data = {
  "layer" : "Default Layer",
  "position" : "top",
  "name" : "Legal Requirements",
  "action": "bypass",
  "site-category": ["Health", "Government / Military", "Financial Services"]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_https_rule = client.api_call("add-https-rule", data)
     publish = client.api_call("publish")

Εκτέλεση σεναρίων Python στον διακομιστή διαχείρισης σημείου ελέγχου

Όλα είναι ίδια README.md περιέχει πληροφορίες σχετικά με τον τρόπο εκτέλεσης σεναρίων Python απευθείας από τον διακομιστή ελέγχου. Αυτό μπορεί να είναι βολικό όταν δεν μπορείτε να συνδεθείτε στον διακομιστή API από άλλο μηχάνημα. Ηχογράφω ένα βίντεο έξι λεπτών στο οποίο κοιτάζω την εγκατάσταση της μονάδας cpapi και δυνατότητες εκτέλεσης σεναρίων Python στον διακομιστή ελέγχου. Για παράδειγμα, εκτελείται ένα σενάριο που αυτοματοποιεί τη διαμόρφωση μιας νέας πύλης για μια εργασία όπως ο έλεγχος δικτύου Έλεγχος ασφαλείας. Μεταξύ των δυνατοτήτων που έπρεπε να αντιμετωπίσω: η συνάρτηση δεν έχει εμφανιστεί ακόμα στην Python 2.7 εισαγωγή, επομένως για την επεξεργασία των πληροφοριών που εισάγει ο χρήστης, χρησιμοποιείται μια συνάρτηση raw_input. Διαφορετικά, ο κωδικός είναι ο ίδιος όπως για την εκκίνηση από άλλα μηχανήματα, μόνο που είναι πιο βολικό να χρησιμοποιήσετε τη λειτουργία login_as_root, για να μην προσδιορίσετε ξανά το δικό σας όνομα χρήστη, κωδικό πρόσβασης και διεύθυνση IP του διακομιστή διαχείρισης.

Σενάριο για γρήγορη ρύθμιση του Security CheckUp

from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs

def main():
    with APIClient() as client:
       # if client.check_fingerprint() is False:
       #     print("Could not get the server's fingerprint - Check connectivity with the server.")
       #     exit(1)
        login_res = client.login_as_root()

        if login_res.success is False:
            print("Login failed:n{}".format(login_res.error_message))
            exit(1)

        gw_name = raw_input("Enter the gateway name:")
        gw_ip = raw_input("Enter the gateway IP address:")
        if sys.stdin.isatty():
            sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
        else:
            print("Attention! Your password will be shown on the screen!")
            sic = raw_input("Enter one-time password for the gateway(SIC): ")
        version = raw_input("Enter the gateway version(like RXX.YY):")
        add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
        if add_gw.success and add_gw.data['sic-state'] != "communicating":
            print("Secure connection with the gateway hasn't established!")
            exit(1)
        elif add_gw.success:
            print("The gateway was added successfully.")
            gw_uid = add_gw.data['uid']
            gw_name = add_gw.data['name']
        else:
            print("Failed to add the gateway - {}".format(add_gw.error_message))
            exit(1)

        change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
        if change_policy.success:
            print("The policy has been changed successfully")
        else:
            print("Failed to change the policy- {}".format(change_policy.error_message))
        change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
        if change_rule.success:
            print("The cleanup rule has been changed successfully")
        else:
            print("Failed to change the cleanup rule- {}".format(change_rule.error_message))

        # publish the result
        publish_res = client.api_call("publish", {})
        if publish_res.success:
            print("The changes were published successfully.")
        else:
                print("Failed to publish the changes - {}".format(install_tp_policy.error_message))

        install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true',  "threat-prevention" : 'false', "targets" : gw_uid})
        if install_access_policy.success:
            print("The access policy has been installed")
        else:
                print("Failed to install access policy - {}".format(install_tp_policy.error_message))

        install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false',  "threat-prevention" : 'true', "targets" : gw_uid})
        if install_tp_policy.success:
            print("The threat prevention policy has been installed")
        else:
            print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
        
        # add passwords and passphrases to dictionary
        with open('additional_pass.conf') as f:
            line_num = 0
            for line in f:
                line_num += 1
                add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
                if add_password_dictionary.success:
                    print("The password dictionary line {} was added successfully".format(line_num))
                else:
                    print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))

main()

Παράδειγμα αρχείου με λεξικό κωδικού πρόσβασης extra_pass.conf
{
"passwords" : ["malware","malicious","infected","Infected"],
"phrases" : ["password","Password","Pass","pass","codigo","key","pwd","пароль","Пароль","Ключ","ключ","шифр","Шифр"] }

Συμπέρασμα

Αυτό το άρθρο εξετάζει μόνο τις βασικές δυνατότητες εργασίας Python SDK και ενότητα cpapi(όπως ίσως μαντέψατε, αυτά είναι στην πραγματικότητα συνώνυμα) και μελετώντας τον κώδικα σε αυτήν την ενότητα θα ανακαλύψετε ακόμα περισσότερες ευκαιρίες για να εργαστείτε μαζί του. Είναι πιθανό να θέλετε να το συμπληρώσετε με τις δικές σας κλάσεις, συναρτήσεις, μεθόδους και μεταβλητές. Μπορείτε πάντα να μοιραστείτε την εργασία σας και να προβάλετε άλλα σενάρια για το Check Point στην ενότητα CodeHub στην κοινότητα CheckMates, το οποίο συγκεντρώνει τόσο τους προγραμματιστές προϊόντων όσο και τους χρήστες.

Καλή κωδικοποίηση και ευχαριστώ που διαβάσατε μέχρι το τέλος!

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο