ชั้นเรียน APIClientArgs รับผิดชอบพารามิเตอร์การเชื่อมต่อกับเซิร์ฟเวอร์ API และคลาส APIClient มีหน้าที่รับผิดชอบในการโต้ตอบกับ API
การกำหนดพารามิเตอร์การเชื่อมต่อ
หากต้องการกำหนดพารามิเตอร์ต่างๆ สำหรับการเชื่อมต่อกับ API คุณต้องสร้างอินสแตนซ์ของคลาส APIClientArgs. โดยหลักการแล้ว พารามิเตอร์จะถูกกำหนดไว้ล่วงหน้า และเมื่อรันสคริปต์บนเซิร์ฟเวอร์ควบคุม ก็ไม่จำเป็นต้องระบุพารามิเตอร์เหล่านั้น
client_args = APIClientArgs()
แต่เมื่อทำงานบนโฮสต์ของบริษัทอื่น คุณจะต้องระบุที่อยู่ IP หรือชื่อโฮสต์ของเซิร์ฟเวอร์ API เป็นอย่างน้อย (หรือที่เรียกว่าเซิร์ฟเวอร์การจัดการ) ในตัวอย่างด้านล่าง เรากำหนดพารามิเตอร์การเชื่อมต่อเซิร์ฟเวอร์และกำหนดที่อยู่ IP ของเซิร์ฟเวอร์การจัดการเป็นสตริง
class APIClientArgs:
"""
This class provides arguments for APIClient configuration.
All the arguments are configured with their default values.
"""
# port is set to None by default, but it gets replaced with 443 if not specified
# context possible values - web_api (default) or gaia_api
def __init__(self, port=None, fingerprint=None, sid=None, server="127.0.0.1", http_debug_level=0,
api_calls=None, debug_file="", proxy_host=None, proxy_port=8080,
api_version=None, unsafe=False, unsafe_auto_accept=False, context="web_api"):
self.port = port
# management server fingerprint
self.fingerprint = fingerprint
# session-id.
self.sid = sid
# management server name or IP-address
self.server = server
# debug level
self.http_debug_level = http_debug_level
# an array with all the api calls (for debug purposes)
self.api_calls = api_calls if api_calls else []
# name of debug file. If left empty, debug data will not be saved to disk.
self.debug_file = debug_file
# HTTP proxy server address (without "http://")
self.proxy_host = proxy_host
# HTTP proxy port
self.proxy_port = proxy_port
# Management server's API version
self.api_version = api_version
# Indicates that the client should not check the server's certificate
self.unsafe = unsafe
# Indicates that the client should automatically accept and save the server's certificate
self.unsafe_auto_accept = unsafe_auto_accept
# The context of using the client - defaults to web_api
self.context = context
ฉันเชื่อว่าอาร์กิวเมนต์ที่สามารถใช้ได้ในกรณีของคลาส APIClientArgs นั้นใช้งานง่ายสำหรับผู้ดูแลระบบ Check Point และไม่ต้องการความคิดเห็นเพิ่มเติม
ตัวจัดการบริบทจะไม่ทำการเรียกล็อกอินไปยังเซิร์ฟเวอร์ API โดยอัตโนมัติ แต่จะทำการเรียกออกจากระบบเมื่อออกจากระบบ หากไม่จำเป็นต้องออกจากระบบด้วยเหตุผลบางประการหลังจากทำงานกับการเรียก API เสร็จแล้ว คุณต้องเริ่มทำงานโดยไม่ต้องใช้ตัวจัดการบริบท:
У APIClient มีมากถึง 3 วิธีในการเข้าสู่เซิร์ฟเวอร์ API และแต่ละวิธีก็เข้าใจความหมาย SID(session-id) ซึ่งใช้โดยอัตโนมัติในการเรียก API แต่ละครั้งในส่วนหัว (ชื่อในส่วนหัวของพารามิเตอร์นี้คือ X-chkp-sid) ดังนั้นจึงไม่จำเป็นต้องประมวลผลพารามิเตอร์นี้เพิ่มเติม
ตัวเลือกที่ใช้คีย์ API (รองรับตั้งแต่เวอร์ชันการจัดการ R80.40/Management API v1.6, "3TsbPJ8ZKjaJGvFyoFqHFA==" นี่คือค่าคีย์ API สำหรับผู้ใช้รายหนึ่งบนเซิร์ฟเวอร์การจัดการที่มีวิธีการอนุญาตคีย์ API):
for task in task_result.data["tasks"]:
if task["status"] == "failed" or task["status"] == "partially succeeded":
task_result.set_success_status(False)
break
ตอนนี้เรามาดูวิธีแก้ปัญหาเดียวกันโดยใช้การเรียก API เพิ่มวัตถุชุดการสนับสนุนที่เพิ่มเข้ามาใน API เวอร์ชัน 1.6 การเรียกนี้ช่วยให้คุณสร้างออบเจ็กต์จำนวนมากพร้อมกันในคำขอ API เดียว นอกจากนี้ สิ่งเหล่านี้อาจเป็นออบเจ็กต์ประเภทต่างๆ (เช่น โฮสต์ ซับเน็ต และช่วงที่อยู่) ดังนั้นงานของเราจึงสามารถแก้ไขได้ภายในกรอบของการเรียก API เพียงครั้งเดียว
สคริปต์โดยใช้ add-objects-batch
import timeit
from cpapi import APIClient, APIClientArgs
start = timeit.default_timer()
client_args = APIClientArgs(server="192.168.47.240")
objects_list_ip = []
objects_list_range = []
for ip in range(5,255,5):
data = {"name": f'h_192.168.0.{ip}', "ip-address": f'192.168.0.{ip}'}
objects_list_ip.append(data)
first_ip = 1
last_ip = 4
while last_ip < 255:
data = {"name": f"r_192.168.0.{first_ip}-{last_ip}", "ip-address-first": f"192.168.0.{first_ip}", "ip-address-last": f"192.168.0.{last_ip}"}
objects_list_range.append(data)
first_ip+=5
last_ip+=5
data_for_batch = {
"objects" : [ {
"type" : "host",
"list" : objects_list_ip
}, {
"type" : "address-range",
"list" : objects_list_range
}]
}
with APIClient(client_args) as client:
login = client.login_with_api_key('3TsbPJ8ZKjaJGvFyoFqHFA==')
add_objects_batch = client.api_call("add-objects-batch", data_for_batch)
stop = timeit.default_timer()
publish = client.api_call("publish")
print(f'Time to execute batch request: {stop - start} seconds')
from __future__ import print_function
import getpass
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from cpapi import APIClient, APIClientArgs
def main():
with APIClient() as client:
# if client.check_fingerprint() is False:
# print("Could not get the server's fingerprint - Check connectivity with the server.")
# exit(1)
login_res = client.login_as_root()
if login_res.success is False:
print("Login failed:n{}".format(login_res.error_message))
exit(1)
gw_name = raw_input("Enter the gateway name:")
gw_ip = raw_input("Enter the gateway IP address:")
if sys.stdin.isatty():
sic = getpass.getpass("Enter one-time password for the gateway(SIC): ")
else:
print("Attention! Your password will be shown on the screen!")
sic = raw_input("Enter one-time password for the gateway(SIC): ")
version = raw_input("Enter the gateway version(like RXX.YY):")
add_gw = client.api_call("add-simple-gateway", {'name' : gw_name, 'ipv4-address' : gw_ip, 'one-time-password' : sic, 'version': version.capitalize(), 'application-control' : 'true', 'url-filtering' : 'true', 'ips' : 'true', 'anti-bot' : 'true', 'anti-virus' : 'true', 'threat-emulation' : 'true'})
if add_gw.success and add_gw.data['sic-state'] != "communicating":
print("Secure connection with the gateway hasn't established!")
exit(1)
elif add_gw.success:
print("The gateway was added successfully.")
gw_uid = add_gw.data['uid']
gw_name = add_gw.data['name']
else:
print("Failed to add the gateway - {}".format(add_gw.error_message))
exit(1)
change_policy = client.api_call("set-access-layer", {"name" : "Network", "applications-and-url-filtering": "true", "content-awareness": "true"})
if change_policy.success:
print("The policy has been changed successfully")
else:
print("Failed to change the policy- {}".format(change_policy.error_message))
change_rule = client.api_call("set-access-rule", {"name" : "Cleanup rule", "layer" : "Network", "action": "Accept", "track": {"type": "Detailed Log", "accounting": "true"}})
if change_rule.success:
print("The cleanup rule has been changed successfully")
else:
print("Failed to change the cleanup rule- {}".format(change_rule.error_message))
# publish the result
publish_res = client.api_call("publish", {})
if publish_res.success:
print("The changes were published successfully.")
else:
print("Failed to publish the changes - {}".format(install_tp_policy.error_message))
install_access_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'true', "threat-prevention" : 'false', "targets" : gw_uid})
if install_access_policy.success:
print("The access policy has been installed")
else:
print("Failed to install access policy - {}".format(install_tp_policy.error_message))
install_tp_policy = client.api_call("install-policy", {"policy-package" : "Standard", "access" : 'false', "threat-prevention" : 'true', "targets" : gw_uid})
if install_tp_policy.success:
print("The threat prevention policy has been installed")
else:
print("Failed to install threat prevention policy - {}".format(install_tp_policy.error_message))
# add passwords and passphrases to dictionary
with open('additional_pass.conf') as f:
line_num = 0
for line in f:
line_num += 1
add_password_dictionary = client.api_call("run-script", {"script-name" : "Add passwords and passphrases", "script" : "printf "{}" >> $FWDIR/conf/additional_pass.conf".format(line), "targets" : gw_name})
if add_password_dictionary.success:
print("The password dictionary line {} was added successfully".format(line_num))
else:
print("Failed to add the dictionary - {}".format(add_password_dictionary.error_message))
main()