Write your own plugin
This section provides a guide on how to write your own plugin for DeepHunter.
Requirements
Database
Plugins’ settings should be stored in the database (Connector and ConnectorConf objects).
Python file
The python file should be stored in the plugins folder and have the same name (extension excluded) as the plugin name in the database (Connector table).
The plugin name shoud not contain any space or special characters. Only use lowercase letters.
It should contain all mandatory methods.
Methods are listed below (M/O = Mandatory / Optional).
Method |
Description |
M/O |
inputs |
outputs |
|---|---|---|---|---|
|
Define global variables and initialize them. This method should be called in the beginning of all other methods. Global variables are defined inside this function to delay their initialization until they are actually needed. This avoids side effects at import time. |
M |
||
|
Return the query language used by the connector (e.g., KQL for Microsoft Sentinel, PowerQuery for SentinelOne, etc). This is used by the AI assistant to generate queries. |
M |
String containing the query language. |
|
|
API calls to the remote data lake to query logs. Used by the “campaign” daily cron, and the “regenerate stats” script |
M |
|
The result of the query (array with 4 fields: endpoint.name, NULL, number of hits, NULL), or “ERROR” if the query failed. |
|
Check if the rule needs to be synced with Microsoft Sentinel. This is determined by the SYNC_RULES setting. |
M |
boolean (defined in a global variable, recommended to get the value from a setting in the database) |
|
|
Create a rule in the remote data lake. This method is called when the user enables the |
O |
|
JSON object containing the response from the remote data lake. |
|
Update a rule in the remote data lake. This method is called when the user updates a threat hunting analytic with |
O |
|
JSON object containing the response from the remote data lake. |
|
Deletes a rule in the remote data lake. |
O |
|
|
|
Get the redirect link to run the analytic in the remote data lake. |
M |
|
String containing the redirect link for the analytic. |
|
Get threats from your EDR for a specific hostname and a date. |
O |
|
List of threats (array) or |
|
Generate a link to the threats page for a specific endpoint and date. Mandatory if |
M/O |
|
String containing the redirect link for the threats page. |
|
Get the expiration (in days) of the API token. |
O |
Integer (number of days) or None (if failure). |
|
|
Check if the query error message is an informational message (INFO) instead of an ERROR. |
M |
|
Boolean indicating whether the error is informational. |
|
Get a list of network connections grouped by dest IP (IP address, ports, popularity). |
O |
|
Array containing the network connections (dest IP, number of events, list of port numbers separated by #, dest IP popularity). |
Template
You can use the following template to create your own plugin:
# Imports
from connectors.utils import get_connector_conf, gzip_base64_urlencode, manage_analytic_error
from datetime import datetime, timedelta, timezone
from urllib.parse import quote, unquote
_globals_initialized = False
def init_globals():
global DEBUG, TENANT_ID, CLIENT_ID, CLIENT_SECRET, SUBSCRIPTION_ID, WORKSPACE_ID, \
WORKSPACE_NAME, RESOURCE_GROUP, SYNC_RULES, THREATS_URL, QUERY_ERROR_INFO
global _globals_initialized
if not _globals_initialized:
DEBUG = False
TENANT_ID = get_connector_conf('microsoftsentinel', 'TENANT_ID')
CLIENT_ID = get_connector_conf('microsoftsentinel', 'CLIENT_ID')
# ....
# ....
# ....
SYNC_RULES = get_connector_conf('microsoftsentinel', 'SYNC_RULES')
THREATS_URL = get_connector_conf('microsoftsentinel', 'THREATS_URL')
QUERY_ERROR_INFO = get_connector_conf('microsoftsentinel', 'QUERY_ERROR_INFO')
_globals_initialized = True
def query(analytic, from_date=None, to_date=None, debug=None):
"""
Implement the query logic here.
"""
init_globals()
# ....
# .... Return a list of 4 fields:
# .... endpoint.name, NULL, number of hits, NULL)
# .... or "ERROR" if the query failed
def need_to_sync_rule():
"""
Check if the rule needs to be synced with Microsoft Sentinel.
This is determined by the SYNC_RULES setting.
"""
init_globals()
return SYNC_RULES
def create_rule(analytic):
"""
Method if you want to create rules to the remote data lake.
"""
init_globals()
return False
def update_rule(analytic):
"""
Method if you want to update rules to the remote data lake.
"""
init_globals()
return False
def delete_rule(analytic):
"""
Method if you want to delete rules to the remote data lake.
"""
init_globals()
return False
def get_redirect_analytic_link(analytic, filter_date=None, endpoint_name=None):
"""
Generate a URL to pre-fill the query in the remote data lake.
"""
init_globals()
url = ''
return url
def get_threats(hostname, sincedate=None):
"""
Get threats from remote data lake for a specific hostname and sincedate date.
:param hostname: Hostname of the machine to retrieve threats for.
:param sincedate: Date in ISO format to filter threats created after this date.
:return: List of threats (array) or None if not found.
"""
init_globals()
# Expected output format example:
threats = [
{'threatInfo': {
'identifiedAt': '2025-05-29T13:36:08.167000Z',
'threatName': 'Suivie NDF 2024.xlsm',
'analystVerdict': 'true_positive',
'confidenceLevel': 'malicious',
'storyline': '',
}},
{'threatInfo': {
'identifiedAt': '2025-05-29T13:36:08.183000Z',
'threatName': 'Suivie NDF 2024 (002).xlsm',
'analystVerdict': 'true_positive',
'confidenceLevel': 'malicious',
'storyline': '',
}},
{'threatInfo': {
'identifiedAt': '2025-05-29T13:36:12.198000Z',
'threatName': 'A2C163C3.xlsm',
'analystVerdict': 'true_positive',
'confidenceLevel': 'malicious',
'storyline': '',
}}
]
return threats
def get_redirect_threats_link(endpoint, date):
"""
Generate a link to the threats page for a specific endpoint and date.
:param endpoint: The endpoint name.
:param date: The date for which to generate the link, in 'YYYY-MM-DD' format.
:return: A formatted URL string for the SentinelOne threats page.
"""
init_globals()
# do your stuff
# ...
# you can use a URL template using the variables and replace with corect values
return f"https://portal.azure.com/search?host={endpoint}&date={date}"
def error_is_info(error):
"""
Check if the query error message is an informational message (INFO) instead of an ERROR.
This is determined with a regular expression provided by the QUERY_ERROR_INFO setting.
:param error: The error message to check.
:return: True if the error is an informational message, False otherwise.
"""
init_globals()
if QUERY_ERROR_INFO:
if re.search(QUERY_ERROR_INFO, error):
return True
return False
def get_network_connections(endpoint_name, timerange, storyline_id=None):
"""
Get network connections for a specific storyline ID and endpoint name.
:param endpoint_name: Name of the endpoint to filter the analytic by.
:param timerange: Time range (in hours) to filter the analytic by.
:param storyline_id: storyline ID to retrieve network connections for (only relevant for SentinelOne).
:return: List of network connections ([dst_ip, nb_events, dst_ports_separator_hash_sign, nb_hosts_same_dstip]) or None if not found.
"""
init_globals()
# Example data, replace with actual API call and data processing
data = [
('23.45.67.89', 1, '#80#49152#', 21),
('192.168.10.5', 2, '#443#32000#', 78),
('172.20.14.3', 1, '#54000', 9),
('203.0.113.77', 3, '#80#443#', 62),
('10.1.2.3', 1, '#25000#', 95),
('198.51.100.88', 2, '#10240#', 33),
('8.26.56.26', 4, '#32767#', 14),
('100.64.1.2', 5, '#40960#', 87),
('192.0.2.55', 9, '#55555#', 5),
('172.16.0.99', 1, '#60001#', 46)
]
return data