Python code conventions for Cortex XSIAM.
All new integrations and scripts should be written in Python 3.
Follow these Python code conventions for consistency and best practices.
Define imports and disable insecure warning at the top of the file.
import demistomock as demisto from CommonServerPython import * from CommonServerUserPython import * ''' IMPORTS ''' import json import urllib3 # Disable insecure warnings urllib3.disable_warnings()
Define constants in the file below the imports. Do not define global variables in the constants section.
''' CONSTANTS ''' DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Important
Do NOT name constants as follows:
apiVersion = "v1" url = demisto.params().get("url")
Define the main
function as follows:
Create the
main
function and extract all the integration parameters usingdemisto.params()
in it.Implement the
_command
function for each integration command, for examplesay_hello_command(client, demisto.args())
.Wrap the commands with
try/except
in themain
to properly handle exceptions. Thereturn_error()
function receives error messages and returns error entries back into Cortex XSIAM. It also prints the full error to the Cortex XSIAM logs.For logging, use the
demisto.debug("write some log here")
function.In the
main
function, initialize the client instance and pass that client to_command
functions.
def main(): params = demisto.params() client = Client(params.get('insecure'), params.get('proxy')) command = demisto.command() demisto.info(f'Command being called is {command}') # Switch case try: if demisto.command() == 'fetch-events': events, last_run = fetch_events_command(client) # we submit the indicators in batches send_events_to_xsiam(events=events, vendor='MyVendor', product='MyProduct') else: results = get_events_command(client) return_results(results) except Exception as e: raise Exception(f'Error in {SOURCE_NAME} Integration [{e}]')
Follow these best practices for defining the Client class.
The
Client
class should inherit fromBaseClient
, which is defined inCommonServerPython
.The
Client
class should contain the_http_request
function.The
Client
class should implement the third party service API.The
Client
class should contain all the necessary parameters to establish connection and authentication with the third party API.
class Client(BaseClient):
"""
Client will implement the service API, should not contain Cortex XSIAM logic.
Should do requests and return data
"""
def get_ip_reputation(self, ip: str) -> Dict[str, Any]:
"""Gets the IP reputation using the '/ip' API endpoint
:type ip: ``str``
:param ip: IP address to get the reputation for
:return: dict containing the IP reputation as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix=f'/ip',
params={
'ip': ip
}
)
def get_alert(self, alert_id: str) -> Dict[str, Any]:
"""Gets a specific HelloWorld alert by id
:type alert_id: ``str``
:param alert_id: id of the alert to return
:return: dict containing the alert as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix=f'/get_alert_details',
params={
'alert_id': alert_id
}
)
Example - Client instance using an API Key
api_key = demisto.params().get('apikey') # get the service API url base_url = urljoin(demisto.params()['url'], '/api/v1') # if your Client class inherits from BaseClient, SSL verification is # handled out of the box by it, just pass ``verify_certificate`` to # the Client constructor verify_certificate = not demisto.params().get('insecure', False) headers = { 'Authorization': f'Bearer {api_key}' } client = Client( base_url=base_url, verify=verify_certificate, headers=headers, proxy=proxy )
Example - Client instance using basic authentication
username = demisto.params().get('credentials', {}).get('identifier') password = demisto.params().get('credentials', {}).get('password') # get the service API url base_url = urljoin(demisto.params()['url'], '/api/v1') # if your Client class inherits from BaseClient, SSL verification is # handled out of the box by it, just pass ``verify_certificate`` to # the Client constructor verify_certificate = not demisto.params().get('insecure', False) client = Client( base_url=base_url, verify=verify_certificate, auth=(username, password), proxy=proxy )
HTTP call retries
sleep
can cause performance issues so do not use it in the code. Instead, use the retry mechanism implemented in the BaseClient with the _http_request
function retries
and backoff_factor
arguments.
Follow these best practices for defining the command functions.
Each integration command should have a corresponding
_command
function.Each
_command
function should use Client class functions.Each
_command
function should be unit testable. This means you should avoid using global functions such asdemisto.results()
,return_error()
, orreturn_results()
.The
_command
function will receive the Client instance and theargs
(demisto.args()
dictionary).The
_command
function will return an instance of the CommandResults class.To return results to the War Room, in the Main use
return_results
(say_hello_command(client, demisto.args()
).
def say_hello_command(client, args): """ Returns Hello {somename} Args: client: HelloWorld client args: all command arguments Returns: Hello {someone} readable_output: This will be presented in Warroom - should be in markdown syntax - human readable outputs: Dictionary/JSON - saved in incident context in order to be used as input for other tasks in the playbook raw_response: Used for debugging/troubleshooting purposes - will be shown only if the command executed with raw-response=true """ name = args.get('name') result = client.say_hello(name) # readable output will be in markdown format - https://www.markdownguide.org/basic-syntax/ readable_output = f'## {result}' outputs = { 'name': name, 'hello': result } results = CommandResults( outputs_prefix='HelloWorld.Result', outputs_key_field='name', outputs=outputs, readable_output=readable_output, raw_response=result ) return results def main(): """ SOME CODE HERE... """ try: client = Client( base_url=server_url, verify=verify_certificate, auth=(username, password), proxy=proxy) """ SOME CODE HERE... """ if demisto.command() == 'helloworld-say-hello': return_results(say_hello_command(client, demisto.args())) # Log exceptions except Exception as e: return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}')
There are two implementation requirements for reputation commands (!file
, !email
, !domain
, !url
, and !ip
) that are enforced by checks in the Demisto SDK.
The reputation command's argument of the same name must have
default
set to True.The reputation command's argument of the same name must have
isArray
set to True.
For more details on these two command argument properties, see Integration metadata YAML file.
The test-module
executes when users click the Test button in the integration instance settings page.
If the test module returns the string "ok" then the test will be green (success). Any other string will be red.
if demisto.command() == 'test-module': # This is the call made when pressing the integration Test button. result = test_module(client) return_results(result)
def test_module(client): """ Returning 'ok' indicates that the integration works like it suppose to. Connection to the service is successful. Args: client: HelloWorld client Returns: 'ok' if test passed, anything else will fail the test """ result = client.say_hello('DBot') if 'Hello DBot' == result: return 'ok' else: return 'Test failed because ......'
The fetch-events
function initiates a fetch events request to specific external product endpoint(s) using the relevant chosen parameters, and sends the fetched events to the Cortex XSIAM dataset. If the integration instance setting is configured to Fetch events
, then this command is executed at the specified Events Fetch Interval
. By default, it runs every minute to retrieve and import events into Cortex XSIAM.
Follow these best practices for defining the fetch-events function.
Must be unit testable.
Should receive the
last_run
param instead of executing thedemisto.getLastRun()
function.Should return
next_run
back to Main, instead of executingdemisto.setLastRun()
inside thefetch-events
function.Should return incidents back to main instead of executing
demisto.incidents()
inside thefetch-events
function.
def get_events(client, alert_status, args): limit = args.get('limit', 50) from_date = args.get('from_date') events = client.search_events( prev_id=0, alert_status=alert_status, limit=limit, from_date=from_date, ) hr = tableToMarkdown(name='Test Event', t=events) return events, CommandResults(readable_output=hr)
When developing an event collector, set the Parsing Rules within the collector code. The most common parsing rule is the _time
system property which indicates the event time from the remote system. For example, if we use the following events as an example:
{ "id": "1234", "message": "New user added 'root2'", "type": "audit", "op": "add", "result": "success", "host_info": { "host": "prod-01", "os": "Windows" }, "created": "1676764803" }
We see that the created event property is a str
representation of a timestamp (without milliseconds). However, The _time
system property expects the result to be an str
in format %Y-%m-%dT%H:%M:%S.000Z
. So we can can transform it using the timestamp_to_datestring
function from CommonServerPython
.
from datetime import datetime from CommonServerPython import * # ... events: List[Dict[str, Any]] = get_events() for event in events: event["_time"] = timestamp_to_datestring(float(event.get("created")) * 1000) # ...
To verify that the parsing rule has been applied and is working as expected, run an XQL search to compare the _time
and created
fields:
dataset = "MyVendor_MyProduct_raw" | fields _time, created
Follow these best practices for defining exceptions and errors.
Wrap your command block in a "Try-Catch" to avoid unexpected issues.
Raise exceptions in the code where needed, but in the Main catch them and use the
return_error
function. This enables acceptable error messages in the War Room instead of stack trace.If the
return_error
second argument is error, you can pass an Exception object.You can use
demisto.error("some error message")
to log your error.def main(): try: if demisto.command() == 'test-module': test_get_session() return_results('ok') if demisto.command() == 'atd-login': return_results(get_session_command(client, demisto.args())) except Exception as e: return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}')
Every integration command must be covered with a unit test.
When naming variables, use Snake case, not Pascal case or camel case.
See Context and outputs.
Follow Context Standards when naming indicator outputs
Linking Context
Linking context together prevents a command from overwriting existing data or from creating duplicate entries in the context.
Example to link context:
ec = ({ 'URLScan(val.URL && val.URL == obj.URL)': cont_array, 'URL': url_array, 'IP': ip_array, 'Domain': dom_array })
In this example, val.URL && val.URL == obj.URL
links the results retrieved from this integration with results already in the context where the value of the URL is the same. For more information about linking syntax and Cortex XSIAM see Transform Language (DT).
You can pass information to the logs to assist future debugging.
To post to the logs:
demisto.debug('DEBUG level - This is some information we want in the logs') demisto.info('INFO level - This is some information we want in the logs') demisto.error('ERROR level - This is some information we want in the logs')
You can also use the @logger
decorator in Cortex XSIAM. When the decorator is placed at the top of each function, the logger prints the function name as well as all the argument values to the LOG
.
@logger def get_ip(ip): ip_data = http_request('POST', '/v1/api/ip' + ip) return ip_data
Important
Do not print sensitive data to the log. When an integration is ready to be used as part of a public release (meaning you are done debugging it), always remove print statements that are not absolutely necessary.
Cortex XSIAM does not use epoch time for customer facing results (for example context and human readable). If the API you are working with requires the time format to be in epoch, then convert the date string into epoch as needed. Where possible, use the human readable format of the date %Y-%m-%dT%H:%M:%S
.
time_epoch = 499137720 formatted_time = timestamp_to_datestring(time_epoch, "%Y-%m-%dT%H:%M:%S") print(formatted_time) >>> '1985-10-26T01:22:00'
Note
If the response returned is in epoch, best practice is to convert it to %Y-%m-%dT%H:%M:%S
.
When working on a command that supports pagination (usually has API parameters like page and/or page size) with a maximal page size enforced by the API, best practice is to create a command that supports two different use cases with the following three integer arguments:
page
page size
limit
Use cases
Manual Pagination: The user wants to control the pagination by using the
page
andpage size
arguments, usually as part of a wrapper script for the command. The command passes thepage
andpage size
values on to the API request. If the limit argument is also provided, it is redundant and should be ignored.Automatic Pagination: Useful when the user prefers to work with the total number of results returned from the playbook task rather than implementing a wrapper script that works with pages. In this case, the
limit
argument aggregates results by iterating over the necessary pages from the first page until collecting all the needed results. This implies a pagination loop mechanism is implemented behind the scenes. For example, if the limit value received is 250 and the maximal page size enforced by the API is 100, the command performs 3 API calls (pages 1,2, and 3) to collect the 250 requested results. Note that when a potentially large number of results may be returned and the user wants to perform filters and/or transformers on them, we still recommend creating a wrapper script for the command for better performance.
Recommendations
Page Tokens - If an API supports page tokens, instead of the more common 'limit' and 'offset'/'skip' as query parameters:
The arguments that are implemented are:
limit
,page_size
, andnext_token
.The retrieved
next_token
should be displayed in human readable output and in the context. It is a single node in the context and overwritten with each command run.{ "IntegrationName": { "Object1NextToken": "TOKEN_VALUE", "Object2NextToken": "TOKEN_VALUE", "Objects1": [], "Objects2": [] } }
Standard argument defaults:
limit
is a default of '50' in the YAML.page_size
should be defaulted in the code to '50', if onlypage
was provided.There should be no maximum value for the
limit
argument. This means that users should be able to retrieve as many records as they need in a single command execution.When an integrated API doesn't support pagination parameters at all - then only
limit
will be applied, and implemented internally in the code. An additional argument will be added to allow the user to retrieve all results by overriding the defaultlimit: all_result
=true.If the API supports only 'limit' and 'offset'/'skip' as query parameters, then all 3 standard Cortex XSIAMpagination arguments should be implemented.
When working on integrations that require user credentials (such as username/password and API token/key) best practice is to use the credentials
parameter type.
Using username and password
In the UI:
In the YAML file:
- display: Username name: credentials type: 9 required: true
In the code:
params = demisto.params() username = params.get('credentials', {}).get('identifier') password = params.get('credentials', {}).get('password')
In demistomock.py:
return { "base_url": "...", "credentials": {"identifier": "<username>", "password": "<password>"}, ... }
Using an API token/key
In the UI:
In the YAML file:
- displaypassword: API Token name: credentials type: 9 required: false hiddenusername: true
Using the credentials
parameter type is recommended (even when working with API token/key) because it enables using the Cortex XSIAM credentials vault feature when configuring the integration for the first time.