ลดความซับซ้อนของ Check Point API ด้วย Python SDK

ลดความซับซ้อนของ Check Point API ด้วย Python SDKพลังเต็มของการโต้ตอบกับ API จะถูกเปิดเผยเมื่อใช้ร่วมกับโค้ดโปรแกรม เมื่อเป็นไปได้ที่จะสร้างคำขอ API และเครื่องมือสำหรับการวิเคราะห์การตอบสนองของ API แบบไดนามิก อย่างไรก็ตาม มันยังคงไม่มีใครสังเกตเห็น ชุดพัฒนาซอฟต์แวร์ Python (ต่อไปนี้จะเรียกว่า Python SDK) สำหรับ API การจัดการจุดตรวจสอบแต่เปล่าประโยชน์ มันทำให้ชีวิตของนักพัฒนาและผู้ชื่นชอบระบบอัตโนมัติง่ายขึ้นอย่างมาก Python ได้รับความนิยมอย่างมากเมื่อเร็ว ๆ นี้ และฉันตัดสินใจที่จะเติมเต็มช่องว่างและตรวจสอบคุณสมบัติหลัก ๆ ชุดพัฒนา Check Point API Python. บทความนี้เป็นส่วนเสริมที่ดีเยี่ยมสำหรับบทความอื่นเกี่ยวกับHabré ตรวจสอบจุด R80.10 API การจัดการผ่าน CLI สคริปต์ และอื่นๆ. เราจะมาดูวิธีเขียนสคริปต์โดยใช้ Python SDK และดูฟังก์ชัน Management API ใหม่ในเวอร์ชัน 1.6 ให้ละเอียดยิ่งขึ้น (รองรับตั้งแต่ R80.40 เป็นต้นไป) เพื่อให้เข้าใจบทความนี้ คุณจะต้องมีความรู้พื้นฐานเกี่ยวกับการทำงานกับ API และ Python

Check Point กำลังพัฒนา API อย่างแข็งขัน และในขณะนี้ ได้เผยแพร่สิ่งต่อไปนี้แล้ว:

ปัจจุบัน Python SDK รองรับเฉพาะการโต้ตอบกับ Management API และ Gaia API. เราจะดูคลาส วิธีการ และตัวแปรที่สำคัญที่สุดในโมดูลนี้

ลดความซับซ้อนของ Check Point API ด้วย Python SDK

การติดตั้งโมดูล

โมดึล คาปาปิ ติดตั้งได้อย่างรวดเร็วและง่ายดายจาก พื้นที่เก็บข้อมูล Check Point อย่างเป็นทางการบน GitHub ด้วย จุดเล็ก ๆ. คำแนะนำการติดตั้งโดยละเอียดมีอยู่ใน README.md. โมดูลนี้ได้รับการปรับให้ทำงานกับ Python เวอร์ชัน 2.7 และ 3.7 ในบทความนี้ เราจะยกตัวอย่างโดยใช้ Python 3.7 อย่างไรก็ตาม Python SDK สามารถเรียกใช้ได้โดยตรงจาก Check Point Management Server (การจัดการอัจฉริยะ) แต่รองรับเฉพาะ Python 2.7 เท่านั้น ดังนั้นส่วนสุดท้ายจะให้โค้ดสำหรับเวอร์ชัน 2.7 ทันทีหลังจากติดตั้งโมดูล ฉันขอแนะนำให้ดูตัวอย่างในไดเร็กทอรี examples_python2 и examples_python3.

เริ่มต้นใช้งาน

เพื่อให้เราสามารถทำงานกับส่วนประกอบของโมดูล cpapi ได้ เราจำเป็นต้องนำเข้าจากโมดูล คาปาปิ ต้องมีคลาสอย่างน้อยสองคลาส:

APIClient и APIClientArgs

from cpapi import APIClient, APIClientArgs

ชั้นเรียน APIClientArgs รับผิดชอบพารามิเตอร์การเชื่อมต่อกับเซิร์ฟเวอร์ API และคลาส APIClient มีหน้าที่รับผิดชอบในการโต้ตอบกับ API

การกำหนดพารามิเตอร์การเชื่อมต่อ

หากต้องการกำหนดพารามิเตอร์ต่างๆ สำหรับการเชื่อมต่อกับ API คุณต้องสร้างอินสแตนซ์ของคลาส APIClientArgs. โดยหลักการแล้ว พารามิเตอร์จะถูกกำหนดไว้ล่วงหน้า และเมื่อรันสคริปต์บนเซิร์ฟเวอร์ควบคุม ก็ไม่จำเป็นต้องระบุพารามิเตอร์เหล่านั้น

client_args = APIClientArgs()

แต่เมื่อทำงานบนโฮสต์ของบริษัทอื่น คุณจะต้องระบุที่อยู่ IP หรือชื่อโฮสต์ของเซิร์ฟเวอร์ API เป็นอย่างน้อย (หรือที่เรียกว่าเซิร์ฟเวอร์การจัดการ) ในตัวอย่างด้านล่าง เรากำหนดพารามิเตอร์การเชื่อมต่อเซิร์ฟเวอร์และกำหนดที่อยู่ IP ของเซิร์ฟเวอร์การจัดการเป็นสตริง

client_args = APIClientArgs(server='192.168.47.241')

ลองดูพารามิเตอร์ทั้งหมดและค่าเริ่มต้นที่สามารถใช้เมื่อเชื่อมต่อกับเซิร์ฟเวอร์ API:

อาร์กิวเมนต์ของเมธอด __init__ ของคลาส APIClientArgs

class APIClientArgs:
    """
    This class provides arguments for APIClient configuration.
    All the arguments are configured with their default values.
    """

    # port is set to None by default, but it gets replaced with 443 if not specified
    # context possible values - web_api (default) or gaia_api
    def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
                 api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
                 api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
        self.port = port
        # management server fingerprint
        self.fingerprint = fingerprint
        # session-id.
        self.sid = sid
        # management server name or IP-address
        self.server = server
        # debug level
        self.http_debug_level = http_debug_level
        # an array with all the api calls (for debug purposes)
        self.api_calls = api_calls if api_calls else []
        # name of debug file. If left empty, debug data will not be saved to disk.
        self.debug_file = debug_file
        # HTTP proxy server address (without "http://")
        self.proxy_host = proxy_host
        # HTTP proxy port
        self.proxy_port = proxy_port
        # Management server's API version
        self.api_version = api_version
        # Indicates that the client should not check the server's certificate
        self.unsafe = unsafe
        # Indicates that the client should automatically accept and save the server's certificate
        self.unsafe_auto_accept = unsafe_auto_accept
        # The context of using the client - defaults to web_api
        self.context = context

ฉันเชื่อว่าอาร์กิวเมนต์ที่สามารถใช้ได้ในกรณีของคลาส APIClientArgs นั้นใช้งานง่ายสำหรับผู้ดูแลระบบ Check Point และไม่ต้องการความคิดเห็นเพิ่มเติม

การเชื่อมต่อผ่าน APIClient และตัวจัดการบริบท

ชั้นเรียน APIClient วิธีที่สะดวกที่สุดในการใช้งานคือผ่านตัวจัดการบริบท สิ่งที่ต้องส่งผ่านไปยังอินสแตนซ์ของคลาส APIClient คือพารามิเตอร์การเชื่อมต่อที่กำหนดไว้ในขั้นตอนก่อนหน้า

with APIClient(client_args) as client:

ตัวจัดการบริบทจะไม่ทำการเรียกล็อกอินไปยังเซิร์ฟเวอร์ API โดยอัตโนมัติ แต่จะทำการเรียกออกจากระบบเมื่อออกจากระบบ หากไม่จำเป็นต้องออกจากระบบด้วยเหตุผลบางประการหลังจากทำงานกับการเรียก API เสร็จแล้ว คุณต้องเริ่มทำงานโดยไม่ต้องใช้ตัวจัดการบริบท:

client = APIClient(clieng_args)

การทดสอบการเชื่อมต่อ

วิธีที่ง่ายที่สุดในการตรวจสอบว่าการเชื่อมต่อตรงตามพารามิเตอร์ที่ระบุกำลังใช้วิธีนี้หรือไม่ ตรวจสอบ_ลายนิ้วมือ. หากการตรวจสอบผลรวมแฮช sha1 สำหรับลายนิ้วมือของใบรับรอง API ของเซิร์ฟเวอร์ล้มเหลว (วิธีการส่งคืน เท็จ) ซึ่งมักเกิดจากปัญหาการเชื่อมต่อและเราสามารถหยุดการทำงานของโปรแกรมได้ (หรือให้โอกาสผู้ใช้แก้ไขข้อมูลการเชื่อมต่อ):

    if client.check_fingerprint() is False:
        print("Could not get the server's fingerprint - Check connectivity with the server.")
        exit(1)

โปรดทราบว่าในอนาคตชั้นเรียน APIClient จะตรวจสอบทุกการเรียก API (methods api_call и api_queryเราจะพูดถึงพวกเขาเพิ่มเติมอีกเล็กน้อย) ใบรับรองลายนิ้วมือ sha1 บนเซิร์ฟเวอร์ API แต่หากเมื่อตรวจสอบลายนิ้วมือ sha1 ของใบรับรองเซิร์ฟเวอร์ API แล้วตรวจพบข้อผิดพลาด (ไม่ทราบใบรับรองหรือมีการเปลี่ยนแปลง) วิธีการ ตรวจสอบ_ลายนิ้วมือ จะให้โอกาสในการเพิ่ม/เปลี่ยนแปลงข้อมูลเกี่ยวกับมันบนเครื่องท้องถิ่นโดยอัตโนมัติ การตรวจสอบนี้สามารถปิดใช้งานได้อย่างสมบูรณ์ (แต่สามารถแนะนำได้เฉพาะในกรณีที่สคริปต์ทำงานบนเซิร์ฟเวอร์ API เมื่อเชื่อมต่อกับ 127.0.0.1) โดยใช้อาร์กิวเมนต์ APIClientArgs - unsafe_auto_accept (ดูเพิ่มเติมเกี่ยวกับ APIClientArgs ก่อนหน้านี้ใน “การกำหนดพารามิเตอร์การเชื่อมต่อ”)

client_args = APIClientArgs(unsafe_auto_accept=True)

เข้าสู่ระบบเซิร์ฟเวอร์ API

У APIClient มีมากถึง 3 วิธีในการเข้าสู่เซิร์ฟเวอร์ API และแต่ละวิธีก็เข้าใจความหมาย SID(session-id) ซึ่งใช้โดยอัตโนมัติในการเรียก API แต่ละครั้งในส่วนหัว (ชื่อในส่วนหัวของพารามิเตอร์นี้คือ X-chkp-sid) ดังนั้นจึงไม่จำเป็นต้องประมวลผลพารามิเตอร์นี้เพิ่มเติม

วิธีการเข้าสู่ระบบ

ตัวเลือกที่ใช้ล็อกอินและรหัสผ่าน (ในตัวอย่าง ชื่อผู้ใช้ ผู้ดูแลระบบ และรหัสผ่าน 1q2w3e ถูกส่งเป็นอาร์กิวเมนต์ตำแหน่ง):

     login = client.login('admin', '1q2w3e')  

พารามิเตอร์ทางเลือกเพิ่มเติมยังมีให้ใช้งานในวิธีการเข้าสู่ระบบ ต่อไปนี้คือชื่อและค่าเริ่มต้น:

continue_last_session=False, domain=None, read_only=False, payload=None

วิธีการ Login_with_api_key

ตัวเลือกที่ใช้คีย์ API (รองรับตั้งแต่เวอร์ชันการจัดการ R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" นี่คือค่าคีย์ API สำหรับผู้ใช้รายหนึ่งบนเซิร์ฟเวอร์การจัดการที่มีวิธีการอนุญาตคีย์ API):

     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 

ในวิธีการ login_with_api_key มีพารามิเตอร์เสริมเหมือนกันเช่นเดียวกับในวิธีการ เข้าสู่ระบบ.

วิธีล็อกอิน_as_root

ตัวเลือกในการเข้าสู่ระบบเครื่องท้องถิ่นด้วยเซิร์ฟเวอร์ API:

     login = client.login_as_root()

มีพารามิเตอร์ทางเลือกเพียงสองตัวเท่านั้นสำหรับวิธีนี้:

domain=None, payload=None

และในที่สุด API ก็เรียกตัวเองว่า

เรามีสองตัวเลือกในการเรียก API ผ่านวิธีการต่างๆ api_call и api_query. เรามาดูกันว่าความแตกต่างระหว่างพวกเขาคืออะไร

api_call

วิธีการนี้ใช้ได้กับทุกการโทร เราจำเป็นต้องส่งส่วนสุดท้ายสำหรับการเรียก api และเพย์โหลดในเนื้อหาคำขอหากจำเป็น หากเพย์โหลดว่างเปล่า จะไม่สามารถถ่ายโอนได้เลย:

api_versions = client.api_call('show-api-versions') 

ผลลัพธ์สำหรับคำขอนี้อยู่ต่ำกว่าการตัด:

In [23]: api_versions                                                           
Out[23]: 
APIResponse({
    "data": {
        "current-version": "1.6",
        "supported-versions": [
            "1",
            "1.1",
            "1.2",
            "1.3",
            "1.4",
            "1.5",
            "1.6"
        ]
    },
    "res_obj": {
        "data": {
            "current-version": "1.6",
            "supported-versions": [
                "1",
                "1.1",
                "1.2",
                "1.3",
                "1.4",
                "1.5",
                "1.6"
            ]
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})
show_host = client.api_call('show-host', {'name' : 'h_8.8.8.8'})

ผลลัพธ์สำหรับคำขอนี้อยู่ต่ำกว่าการตัด:

In [25]: show_host                                                              
Out[25]: 
APIResponse({
    "data": {
        "color": "black",
        "comments": "",
        "domain": {
            "domain-type": "domain",
            "name": "SMC User",
            "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
        },
        "groups": [],
        "icon": "Objects/host",
        "interfaces": [],
        "ipv4-address": "8.8.8.8",
        "meta-info": {
            "creation-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "creator": "admin",
            "last-modifier": "admin",
            "last-modify-time": {
                "iso-8601": "2020-05-01T21:49+0300",
                "posix": 1588358973517
            },
            "lock": "unlocked",
            "validation-state": "ok"
        },
        "name": "h_8.8.8.8",
        "nat-settings": {
            "auto-rule": false
        },
        "read-only": false,
        "tags": [],
        "type": "host",
        "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
    },
    "res_obj": {
        "data": {
            "color": "black",
            "comments": "",
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "groups": [],
            "icon": "Objects/host",
            "interfaces": [],
            "ipv4-address": "8.8.8.8",
            "meta-info": {
                "creation-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "creator": "admin",
                "last-modifier": "admin",
                "last-modify-time": {
                    "iso-8601": "2020-05-01T21:49+0300",
                    "posix": 1588358973517
                },
                "lock": "unlocked",
                "validation-state": "ok"
            },
            "name": "h_8.8.8.8",
            "nat-settings": {
                "auto-rule": false
            },
            "read-only": false,
            "tags": [],
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

api_query

ฉันขอจองทันทีว่าวิธีนี้ใช้ได้เฉพาะกับการโทรที่มีเอาต์พุตเกี่ยวข้องกับการชดเชยเท่านั้น การอนุมานดังกล่าวเกิดขึ้นเมื่อข้อมูลนั้นมีหรืออาจมีข้อมูลจำนวนมาก ตัวอย่างเช่น นี่อาจเป็นคำขอรายการออบเจ็กต์โฮสต์ที่สร้างขึ้นทั้งหมดบนเซิร์ฟเวอร์การจัดการ สำหรับคำขอดังกล่าว API จะส่งคืนรายการออบเจ็กต์ 50 รายการตามค่าเริ่มต้น (คุณสามารถเพิ่มขีดจำกัดเป็น 500 ออบเจ็กต์ในการตอบกลับได้) และเพื่อไม่ให้ดึงข้อมูลหลายครั้งโดยเปลี่ยนพารามิเตอร์ออฟเซ็ตในคำขอ API จึงมีวิธี api_query ที่ทำงานโดยอัตโนมัติ ตัวอย่างการโทรที่จำเป็นต้องใช้วิธีนี้: show-sessions, show-hosts, show-networks, show-wildcards, show-groups, show-address-ranges, show-simple-gateways, show-simple-clusters, แสดงบทบาทการเข้าถึง, แสดงลูกค้าที่เชื่อถือได้ แสดงแพ็คเกจ. ที่จริงแล้ว เราเห็นคำพหูพจน์ในชื่อของการเรียก API เหล่านี้ ดังนั้นการเรียกเหล่านี้จะจัดการได้ง่ายขึ้น api_query

show_hosts = client.api_query('show-hosts') 

ผลลัพธ์สำหรับคำขอนี้อยู่ต่ำกว่าการตัด:

In [21]: show_hosts                                                             
Out[21]: 
APIResponse({
    "data": [
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "192.168.47.1",
            "name": "h_192.168.47.1",
            "type": "host",
            "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
        },
        {
            "domain": {
                "domain-type": "domain",
                "name": "SMC User",
                "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
            },
            "ipv4-address": "8.8.8.8",
            "name": "h_8.8.8.8",
            "type": "host",
            "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
        }
    ],
    "res_obj": {
        "data": {
            "from": 1,
            "objects": [
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "192.168.47.1",
                    "name": "h_192.168.47.1",
                    "type": "host",
                    "uid": "5d7d7086-d70b-4995-971a-0583b15a2bfc"
                },
                {
                    "domain": {
                        "domain-type": "domain",
                        "name": "SMC User",
                        "uid": "41e821a0-3720-11e3-aa6e-0800200c9fde"
                    },
                    "ipv4-address": "8.8.8.8",
                    "name": "h_8.8.8.8",
                    "type": "host",
                    "uid": "c210af07-1939-49d3-a351-953a9c471d9e"
                }
            ],
            "to": 2,
            "total": 2
        },
        "status_code": 200
    },
    "status_code": 200,
    "success": true
})

การประมวลผลผลลัพธ์ของการเรียก API

หลังจากนี้คุณสามารถใช้ตัวแปรและวิธีการของคลาสได้ APIResponse(ทั้งภายในตัวจัดการบริบทและภายนอก) ในชั้นเรียน APIResponse มีการกำหนดไว้ล่วงหน้า 4 วิธีและ 5 ตัวแปร เราจะอาศัยรายละเอียดที่สำคัญที่สุดเพิ่มเติม

ลดความซับซ้อนของ Check Point API ด้วย Python SDK

ความสำเร็จ

ในการเริ่มต้น เป็นความคิดที่ดีที่จะตรวจสอบให้แน่ใจว่าการเรียก API สำเร็จและส่งกลับผลลัพธ์ มีวิธีการนี้ ความสำเร็จ:

In [49]: api_versions.success                                                   
Out[49]: True

คืนค่าเป็นจริงหากการเรียก API สำเร็จ (รหัสตอบกลับ - 200) และคืนค่าเป็นเท็จหากไม่สำเร็จ (โค้ดตอบกลับอื่น ๆ) สะดวกในการใช้งานทันทีหลังจากการเรียก API เพื่อแสดงข้อมูลที่แตกต่างกันขึ้นอยู่กับโค้ดตอบกลับ

if api_ver.success: 
    print(api_versions.data) 
else: 
    print(api_versions.err_message) 

รหัสสถานะ

ส่งคืนโค้ดตอบกลับหลังจากการเรียก API

In [62]: api_versions.status_code                                               
Out[62]: 400

รหัสตอบกลับที่เป็นไปได้: 200,400,401,403,404,409,500,501.

set_success_status

ในกรณีนี้ อาจจำเป็นต้องเปลี่ยนค่าของสถานะความสำเร็จ ในทางเทคนิคแล้ว คุณสามารถใส่อะไรก็ได้ลงไป แม้แต่เชือกธรรมดาก็ตาม แต่ตัวอย่างที่แท้จริงคือการรีเซ็ตพารามิเตอร์นี้เป็น False ภายใต้เงื่อนไขประกอบบางประการ ด้านล่างนี้ ให้ใส่ใจกับตัวอย่างเมื่อมีงานที่ทำงานบนเซิร์ฟเวอร์การจัดการ แต่เราจะถือว่าคำขอนี้ไม่สำเร็จ (เราจะตั้งค่าตัวแปรความสำเร็จเป็น เท็จแม้ว่าการเรียก API จะสำเร็จและส่งคืนรหัส 200)

for task in task_result.data["tasks"]:
    if task["status"] == "failed" or task["status"] == "partially succeeded":
        task_result.set_success_status(False)
        break

การตอบสนอง()

วิธีการตอบกลับช่วยให้คุณดูพจนานุกรมพร้อมรหัสตอบกลับ (status_code) และเนื้อหาการตอบกลับ (เนื้อหา)

In [94]: api_versions.response()                                                
Out[94]: 
{'status_code': 200,
 'data': {'current-version': '1.6',
  'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}}

ข้อมูล

ให้คุณเห็นเฉพาะเนื้อความของการตอบสนอง (body) โดยไม่มีข้อมูลที่ไม่จำเป็น

In [93]: api_versions.data                                                      
Out[93]: 
{'current-version': '1.6',
 'supported-versions': ['1', '1.1', '1.2', '1.3', '1.4', '1.5', '1.6']}

ข้อความผิดพลาด

ข้อมูลนี้จะใช้ได้เฉพาะเมื่อมีข้อผิดพลาดเกิดขึ้นขณะประมวลผลคำขอ API (รหัสตอบกลับ ไม่ 200) ตัวอย่างเอาต์พุต

In [107]: api_versions.error_message                                            
Out[107]: 'code: generic_err_invalid_parameter_namenmessage: Unrecognized parameter [1]n'

ตัวอย่างที่เป็นประโยชน์

ต่อไปนี้เป็นตัวอย่างที่ใช้การเรียก API ที่เพิ่มใน Management API 1.6

ก่อนอื่น มาดูกันว่าการโทรทำงานอย่างไร เพิ่มโฮสต์ и เพิ่ม-ที่อยู่-ช่วง. สมมติว่าเราจำเป็นต้องสร้างที่อยู่ IP ทั้งหมดของซับเน็ต 192.168.0.0/24 ซึ่งออคเต็ตสุดท้ายคือ 5 เป็นอ็อบเจ็กต์ของประเภทโฮสต์ และเขียนที่อยู่ IP อื่น ๆ ทั้งหมดเป็นอ็อบเจ็กต์ของประเภทช่วงที่อยู่ ในกรณีนี้ ให้แยกที่อยู่ซับเน็ตและที่อยู่การออกอากาศออก

ด้านล่างนี้เป็นสคริปต์ที่ช่วยแก้ปัญหานี้และสร้างอ็อบเจ็กต์ประเภทโฮสต์ 50 รายการและอ็อบเจ็กต์ประเภทช่วงที่อยู่ 51 รายการ ในการแก้ปัญหานี้ จำเป็นต้องมีการเรียก API 101 ครั้ง (ไม่นับการเรียกเผยแพร่ครั้งสุดท้าย) นอกจากนี้ เมื่อใช้โมดูล timeit เราจะคำนวณเวลาที่ใช้ในการรันสคริปต์จนกว่าการเปลี่ยนแปลงจะถูกเผยแพร่

สคริปต์ที่ใช้ add-host และ add-address-range

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

first_ip = 1
last_ip = 4

client_args = APIClientArgs(server="192.168.47.240")

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     for ip in range(5,255,5):
         add_host = client.api_call("add-host", {"name" : f"h_192.168.0.{ip}", "ip-address": f'192.168.0.{ip}'})
     while last_ip < 255:
         add_range = client.api_call("add-address-range", {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"})
         first_ip+=5
         last_ip+=5
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

ในสภาพแวดล้อมแล็บของฉัน สคริปต์นี้ใช้เวลาประมาณ 30 ถึง 50 วินาทีในการดำเนินการ ขึ้นอยู่กับโหลดบนเซิร์ฟเวอร์การจัดการ

ตอนนี้เรามาดูวิธีแก้ปัญหาเดียวกันโดยใช้การเรียก API เพิ่มวัตถุชุดการสนับสนุนที่เพิ่มเข้ามาใน API เวอร์ชัน 1.6 การเรียกนี้ช่วยให้คุณสร้างออบเจ็กต์จำนวนมากพร้อมกันในคำขอ API เดียว นอกจากนี้ สิ่งเหล่านี้อาจเป็นออบเจ็กต์ประเภทต่างๆ (เช่น โฮสต์ ซับเน็ต และช่วงที่อยู่) ดังนั้นงานของเราจึงสามารถแก้ไขได้ภายในกรอบของการเรียก API เพียงครั้งเดียว

สคริปต์โดยใช้ add-objects-batch

import timeit
from cpapi import APIClient, APIClientArgs

start = timeit.default_timer()

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
    objects_list_ip.append(data)
    
first_ip = 1
last_ip = 4


while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}


with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
     stop = timeit.default_timer() 
     publish = client.api_call("publish")
     
print(f'Time to execute batch request: {stop - start} seconds')

และการเรียกใช้สคริปต์นี้ในสภาพแวดล้อมห้องปฏิบัติการของฉันจะใช้เวลา 3 ถึง 7 วินาที ขึ้นอยู่กับภาระงานบนเซิร์ฟเวอร์การจัดการ กล่าวคือ โดยเฉลี่ยแล้วบนออบเจ็กต์ API 101 รายการ การเรียกประเภทแบตช์จะทำงานเร็วขึ้น 10 เท่า บนวัตถุจำนวนมาก ความแตกต่างจะยิ่งน่าประทับใจยิ่งขึ้น

ตอนนี้เรามาดูวิธีการทำงานด้วย ชุดวัตถุชุด. เมื่อใช้การเรียก API นี้ เราสามารถเปลี่ยนพารามิเตอร์ใดๆ เป็นจำนวนมากได้ มาตั้งค่าครึ่งแรกของที่อยู่จากตัวอย่างก่อนหน้า (สูงสุด .124 โฮสต์ และช่วงด้วย) ให้เป็นสีเซียนนา และกำหนดสีกากีให้กับครึ่งหลังของที่อยู่

การเปลี่ยนสีของวัตถุที่สร้างขึ้นในตัวอย่างก่อนหน้า

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip_first = []
objects_list_range_first = []
objects_list_ip_second = []
objects_list_range_second = []

for ip in range(5,125,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "sienna"}
    objects_list_ip_first.append(data)
    
for ip in range(125,255,5):
    data = {"name": f'h_192.168.0.{ip}', "color": "khaki"}
    objects_list_ip_second.append(data)
    
first_ip = 1
last_ip = 4
while last_ip < 125:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "sienna"}
    objects_list_range_first.append(data)
    first_ip+=5
    last_ip+=5
    
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "color": "khaki"}
    objects_list_range_second.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch_first  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_first
}, {
    "type" : "address-range",
    "list" : objects_list_range_first
  }]
}

data_for_batch_second  = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip_second
}, {
    "type" : "address-range",
    "list" : objects_list_range_second
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==') 
     set_objects_batch_first = client.api_call("set-objects-batch", data_for_batch_first)
     set_objects_batch_second = client.api_call("set-objects-batch", data_for_batch_second)
     publish = client.api_call("publish")

คุณสามารถลบออบเจ็กต์หลายรายการในการเรียก API ครั้งเดียวได้โดยใช้ ลบวัตถุชุด. ตอนนี้เรามาดูตัวอย่างโค้ดที่จะลบโฮสต์ทั้งหมดที่สร้างขึ้นก่อนหน้านี้ผ่าน เพิ่มวัตถุชุด.

การลบวัตถุโดยใช้ Delete-objects-batch

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

objects_list_ip = []
objects_list_range = []

for ip in range(5,255,5):
    data = {"name": f'h_192.168.0.{ip}'}
    objects_list_ip.append(data)

first_ip = 1
last_ip = 4
while last_ip < 255:
    data = {"name": f"r_192.168.0.{first_ip}-{last_ip}"}
    objects_list_range.append(data)
    first_ip+=5
    last_ip+=5

data_for_batch = {
  "objects" : [ {
    "type" : "host",
    "list" : objects_list_ip
}, {
    "type" : "address-range",
    "list" : objects_list_range
  }]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     delete_objects_batch = client.api_call("delete-objects-batch", data_for_batch)
     publish = client.api_call("publish")

print(delete_objects_batch.data)

ฟังก์ชันทั้งหมดที่ปรากฏในซอฟต์แวร์ Check Point รุ่นใหม่จะได้รับการเรียก API ทันที ดังนั้นใน R80.40 “คุณสมบัติ” เช่น เปลี่ยนกลับเป็นการแก้ไขและ Smart Task ปรากฏขึ้น และการเรียก API ที่เกี่ยวข้องก็ได้รับการจัดเตรียมไว้สำหรับพวกเขาทันที นอกจากนี้ ฟังก์ชันการทำงานทั้งหมดเมื่อย้ายจากคอนโซลแบบเดิมไปยังโหมด Unified Policy ยังได้รับการสนับสนุน API อีกด้วย ตัวอย่างเช่น การอัปเดตที่รอคอยมานานในซอฟต์แวร์เวอร์ชัน R80.40 คือการย้ายนโยบายการตรวจสอบ HTTPS จากโหมดดั้งเดิมไปเป็นโหมดนโยบายแบบรวม และฟังก์ชันนี้ได้รับการเรียก API ทันที นี่คือตัวอย่างโค้ดที่เพิ่มกฎไว้ที่ตำแหน่งบนสุดของนโยบายการตรวจสอบ HTTPS ซึ่งไม่รวมการตรวจสอบ 3 หมวดหมู่ (สุขภาพ การเงิน บริการภาครัฐ) ซึ่งถูกห้ามไม่ให้ตรวจสอบตามกฎหมายในหลายประเทศ

เพิ่มกฎลงในนโยบายการตรวจสอบ HTTPS

from cpapi import APIClient, APIClientArgs

client_args = APIClientArgs(server="192.168.47.240")

data = {
  "layer" : "Default Layer",
  "position" : "top",
  "name" : "Legal Requirements",
  "action": "bypass",
  "site-category": ["Health", "Government / Military", "Financial Services"]
}

with APIClient(client_args) as client: 
     login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
     add_https_rule = client.api_call("add-https-rule", data)
     publish = client.api_call("publish")

การรันสคริปต์ Python บนเซิร์ฟเวอร์การจัดการ Check Point

ทุกอย่างเหมือนกัน README.md มีข้อมูลเกี่ยวกับวิธีการเรียกใช้สคริปต์ Python โดยตรงจากเซิร์ฟเวอร์ควบคุม ซึ่งจะสะดวกเมื่อคุณไม่สามารถเชื่อมต่อกับเซิร์ฟเวอร์ API จากเครื่องอื่นได้ ฉันบันทึกวิดีโอความยาวหกนาทีโดยดูการติดตั้งโมดูล คาปาปิ และคุณสมบัติการรันสคริปต์ Python บนเซิร์ฟเวอร์ควบคุม ตามตัวอย่าง มีการรันสคริปต์ที่ทำให้การกำหนดค่าเกตเวย์ใหม่สำหรับงาน เช่น การตรวจสอบเครือข่าย เป็นแบบอัตโนมัติ การตรวจสอบความปลอดภัย. ในบรรดาคุณสมบัติที่ฉันต้องจัดการ: ฟังก์ชั่นยังไม่ปรากฏใน Python 2.7 อินพุตดังนั้นในการประมวลผลข้อมูลที่ผู้ใช้ป้อนจึงมีการใช้ฟังก์ชัน raw_input. มิฉะนั้นรหัสจะเหมือนกับการเปิดจากเครื่องอื่น ๆ เพียงแต่จะสะดวกกว่าในการใช้งานฟังก์ชั่น เข้าสู่ระบบ_as_rootเพื่อไม่ให้ระบุชื่อผู้ใช้ รหัสผ่าน และที่อยู่ IP ของเซิร์ฟเวอร์การจัดการของคุณเองอีก

สคริปต์สำหรับการตั้งค่าการตรวจสอบความปลอดภัยอย่างรวดเร็ว

from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs

def main():
    with APIClient() as client:
       # if client.check_fingerprint() is False:
       #     print("Could not get the server's fingerprint - Check connectivity with the server.")
       #     exit(1)
        login_res = client.login_as_root()

        if login_res.success is False:
            print("Login failed:n{}".format(login_res.error_message))
            exit(1)

        gw_name = raw_input("Enter the gateway name:")
        gw_ip = raw_input("Enter the gateway IP address:")
        if sys.stdin.isatty():
            sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
        else:
            print("Attention! Your password will be shown on the screen!")
            sic = raw_input("Enter one-time password for the gateway(SIC): ")
        version = raw_input("Enter the gateway version(like RXX.YY):")
        add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
        if add_gw.success and add_gw.data['sic-state'] != "communicating":
            print("Secure connection with the gateway hasn't established!")
            exit(1)
        elif add_gw.success:
            print("The gateway was added successfully.")
            gw_uid = add_gw.data['uid']
            gw_name = add_gw.data['name']
        else:
            print("Failed to add the gateway - {}".format(add_gw.error_message))
            exit(1)

        change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
        if change_policy.success:
            print("The policy has been changed successfully")
        else:
            print("Failed to change the policy- {}".format(change_policy.error_message))
        change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
        if change_rule.success:
            print("The cleanup rule has been changed successfully")
        else:
            print("Failed to change the cleanup rule- {}".format(change_rule.error_message))

        # publish the result
        publish_res = client.api_call("publish", {})
        if publish_res.success:
            print("The changes were published successfully.")
        else:
                print("Failed to publish the changes - {}".format(install_tp_policy.error_message))

        install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true',  "threat-prevention" : 'false', "targets" : gw_uid})
        if install_access_policy.success:
            print("The access policy has been installed")
        else:
                print("Failed to install access policy - {}".format(install_tp_policy.error_message))

        install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false',  "threat-prevention" : 'true', "targets" : gw_uid})
        if install_tp_policy.success:
            print("The threat prevention policy has been installed")
        else:
            print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
        
        # add passwords and passphrases to dictionary
        with open('additional_pass.conf') as f:
            line_num = 0
            for line in f:
                line_num += 1
                add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
                if add_password_dictionary.success:
                    print("The password dictionary line {} was added successfully".format(line_num))
                else:
                    print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))

main()

ไฟล์ตัวอย่างที่มีพจนานุกรมรหัสผ่านเพิ่มเติม_pass.conf
{
"passwords" : ["malware","malicious","infected","Infected"],
"phrases" : ["password","Password","Pass","pass","codigo","key","pwd","пароль","Пароль","Ключ","ключ","шифр","Шифр"] }

ข้อสรุป

บทความนี้จะตรวจสอบเฉพาะความเป็นไปได้พื้นฐานของงานเท่านั้น งูหลาม SDK และโมดูล คาปาปิ(ตามที่คุณอาจจะเดาได้ จริงๆ แล้วสิ่งเหล่านี้เป็นคำพ้องความหมาย) และโดยการศึกษาโค้ดในโมดูลนี้ คุณจะค้นพบความเป็นไปได้ในการทำงานกับโมดูลนี้มากยิ่งขึ้น เป็นไปได้ว่าคุณจะต้องการเสริมด้วยคลาส ฟังก์ชัน เมธอด และตัวแปรของคุณเอง คุณสามารถแบ่งปันงานของคุณและดูสคริปต์อื่น ๆ สำหรับ Check Point ได้ตลอดเวลาในส่วนนี้ โค้ดฮับ ในสังคม เช็คเมทส์ซึ่งรวบรวมทั้งผู้พัฒนาผลิตภัณฑ์และผู้ใช้

ขอให้มีความสุขกับการเขียนโค้ดและขอขอบคุณที่อ่านจนจบ!

ที่มา: will.com

เพิ่มความคิดเห็น