mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 01:26:11 -08:00
481 lines
22 KiB
Python
Executable File
481 lines
22 KiB
Python
Executable File
#!/usr/bin/env python
|
||
|
||
"""
|
||
This plugin imports devices and clients from Omada Controller using their OpenAPI.
|
||
|
||
It was inspired by the 'omada_sdn_imp/omada_sdn.py' plugin,
|
||
which relied on the 'tplink_omada_client' library instead of OpenAPI.
|
||
However, I found that approach somewhat unstable, so I decided
|
||
to give it a shot and create a new plugin with the goal of providing
|
||
same, but more reliable results.
|
||
|
||
Please note that this is my first plugin, and I’m not a Python developer.
|
||
Any comments, bug fixes, or contributions are greatly appreciated.
|
||
|
||
Author: https://github.com/xfilo
|
||
"""
|
||
|
||
__author__ = "xfilo"
|
||
__version__ = 0.1 # Initial version
|
||
__version__ = 0.2 # Rephrased error messages, improved logging and code logic
|
||
|
||
import os
|
||
import sys
|
||
import urllib3
|
||
import requests
|
||
import time
|
||
import datetime
|
||
import pytz
|
||
|
||
from datetime import datetime
|
||
from typing import Literal, Any, Dict
|
||
|
||
# Define the installation path and extend the system path for plugin imports
|
||
INSTALL_PATH = "/app"
|
||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||
|
||
from plugin_helper import Plugin_Objects, is_typical_router_ip, is_mac
|
||
from logger import mylog, Logger
|
||
from const import logPath
|
||
from helper import get_setting_value
|
||
import conf
|
||
|
||
# Make sure the TIMEZONE for logging is correct
|
||
conf.tz = pytz.timezone(get_setting_value('TIMEZONE'))
|
||
|
||
# Make sure log level is initialized correctly
|
||
Logger(get_setting_value('LOG_LEVEL'))
|
||
|
||
pluginName = 'OMDSDNOPENAPI'
|
||
|
||
# Define the current path and log file paths
|
||
LOG_PATH = logPath + '/plugins'
|
||
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
||
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||
|
||
# Disable insecure request warning
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
|
||
class OmadaHelper:
|
||
@staticmethod
|
||
def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None:
|
||
mylog(level, [f"[{pluginName}] [{level[:1].upper()}] {message}"])
|
||
|
||
@staticmethod
|
||
def debug(message: str) -> None:
|
||
return OmadaHelper.log(message, "debug")
|
||
|
||
@staticmethod
|
||
def verbose(message: str) -> None:
|
||
return OmadaHelper.log(message, "verbose")
|
||
|
||
@staticmethod
|
||
def minimal(message: str) -> None:
|
||
return OmadaHelper.log(message, "minimal")
|
||
|
||
@staticmethod
|
||
def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]:
|
||
return {"response_type": response_type, "response_message": response_message, "response_result": response_result}
|
||
|
||
@staticmethod
|
||
def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]:
|
||
"""Returns datetime from millisecond timestamp with required timezone."""
|
||
try:
|
||
if not ms or not isinstance(ms, (str, int)):
|
||
raise ValueError(f"Value '{ms}' is not a valid timestamp")
|
||
|
||
timestamp = ms / 1000
|
||
tz = pytz.timezone("UTC")
|
||
utc_datetime = datetime.fromtimestamp(timestamp, tz=tz)
|
||
target_timezone = pytz.timezone(timezone)
|
||
local_datetime = utc_datetime.astimezone(target_timezone)
|
||
result = local_datetime.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
msg = f"Converted timestamp {ms} to datetime {result} with timezone {timezone}"
|
||
OmadaHelper.debug(msg)
|
||
return OmadaHelper.response("success", msg, result)
|
||
except pytz.UnknownTimeZoneError:
|
||
msg = f"Failed to convert timestamp - unknown timezone: {timezone}"
|
||
OmadaHelper.verbose(msg)
|
||
return OmadaHelper.response("error", msg)
|
||
except Exception as ex:
|
||
msg = f"Failed to convert timestamp - error: {str(ex)}"
|
||
OmadaHelper.verbose(msg)
|
||
return OmadaHelper.response("error", msg)
|
||
|
||
@staticmethod
|
||
def normalize_mac(mac: str) -> Dict[str, Any]:
|
||
"""Returns a normalized version of MAC address."""
|
||
try:
|
||
if not mac or not isinstance(mac, str) or mac is None:
|
||
raise Exception(f"Value '{mac}' is not a valid MAC address")
|
||
|
||
result = mac.lower().replace("-", ":")
|
||
msg = f"Normalized MAC address from {mac} to {result}"
|
||
OmadaHelper.debug(msg)
|
||
return OmadaHelper.response("success", msg, result)
|
||
except Exception as ex:
|
||
msg = f"Failed to normalize MAC address '{mac}' - error: {str(ex)}"
|
||
OmadaHelper.verbose(msg)
|
||
return OmadaHelper.response("error", msg)
|
||
|
||
@staticmethod
|
||
def normalize_data(input_data: list, input_type: str, site_name: str, timezone: str) -> Dict[str, Any]:
|
||
"""Returns a normalized dictionary of input data (clients, devices)."""
|
||
try:
|
||
if not isinstance(input_data, list):
|
||
raise Exception(f"Expected a list, but got '{type(input_data)}'.")
|
||
|
||
OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}")
|
||
default_entry = {
|
||
"mac_address": "",
|
||
"ip_address": "",
|
||
"name": "",
|
||
"last_seen": "",
|
||
"site_name": site_name,
|
||
"parent_node_mac_address": "",
|
||
"parent_node_port": "",
|
||
"parent_node_ssid": "",
|
||
"vlan_id": "",
|
||
}
|
||
|
||
result = []
|
||
for data in input_data:
|
||
|
||
mac = OmadaHelper.normalize_mac(data.get("mac"))
|
||
if not isinstance(mac, dict) or mac.get("response_type") != "success":
|
||
continue
|
||
mac = mac.get("response_result")
|
||
|
||
if not is_mac(mac):
|
||
OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}")
|
||
continue
|
||
|
||
entry = default_entry.copy()
|
||
entry["mac_address"] = mac
|
||
entry["ip_address"] = data.get("ip", "")
|
||
entry["name"] = data.get("name", "")
|
||
|
||
last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone)
|
||
entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else ""
|
||
|
||
if input_type == "device":
|
||
entry["device_type"] = data.get("type")
|
||
if data.get("type", "") != "gateway":
|
||
parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac"))
|
||
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
|
||
|
||
if input_type == "client":
|
||
entry["vlan_id"] = data.get("vid")
|
||
entry["device_type"] = data.get("deviceType")
|
||
if data.get("connectDevType", "") == "gateway":
|
||
parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac"))
|
||
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
|
||
entry["parent_node_port"] = data.get("port", "")
|
||
elif data.get("connectDevType", "") == "switch":
|
||
parent_mac = OmadaHelper.normalize_mac(data.get("switchMac"))
|
||
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
|
||
entry["parent_node_port"] = data.get("port", "")
|
||
elif data.get("connectDevType", "") == "ap":
|
||
parent_mac = OmadaHelper.normalize_mac(data.get("apMac"))
|
||
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
|
||
entry["parent_node_ssid"] = data.get("ssid", "")
|
||
|
||
result.append(entry)
|
||
OmadaHelper.debug(f"Processed {input_type} entry: {entry}")
|
||
|
||
msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}"
|
||
OmadaHelper.minimal(msg)
|
||
final_result = OmadaHelper.response("success", msg, result)
|
||
except Exception as ex:
|
||
msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}"
|
||
OmadaHelper.verbose(msg)
|
||
final_result = OmadaHelper.response("error", msg)
|
||
|
||
return final_result
|
||
|
||
|
||
class OmadaAPI:
|
||
def __init__(self, options: dict):
|
||
OmadaHelper.debug("Initializing OmadaAPI with provided options")
|
||
|
||
# Define parameters: required, optional, and default values
|
||
params = {
|
||
"host": {"type": str, "required": True},
|
||
"omada_id": {"type": str, "required": True},
|
||
"client_id": {"type": str, "required": True},
|
||
"client_secret": {"type": str, "required": True},
|
||
"verify_ssl": {"type": bool, "required": False, "default": True},
|
||
"page_size": {"type": int, "required": False, "default": 1000},
|
||
"sites": {"type": list, "required": False, "default": []}
|
||
}
|
||
|
||
# Validate and set attributes
|
||
for param_name, param_info in params.items():
|
||
value = options.get(param_name, param_info.get("default"))
|
||
|
||
if param_info["required"] and (value is None or (param_info["type"] == str and not value)):
|
||
raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}")
|
||
|
||
if not isinstance(value, param_info["type"]):
|
||
raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}")
|
||
|
||
setattr(self, param_name, value)
|
||
OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}")
|
||
|
||
# Other parameters
|
||
self.available_sites_dict = {}
|
||
self.active_sites_dict = {}
|
||
self.access_token = None
|
||
self.refresh_token = None
|
||
|
||
OmadaHelper.verbose("OmadaAPI initialized")
|
||
|
||
def _get_headers(self, include_auth: bool = True) -> dict:
|
||
"""Return request headers."""
|
||
headers = {"Content-type": "application/json"}
|
||
if include_auth == True:
|
||
if not self.access_token:
|
||
OmadaHelper.debug("No access token available for headers")
|
||
else:
|
||
headers["Authorization"] = f"AccessToken={self.access_token}"
|
||
OmadaHelper.debug(f"Generated headers: {headers}")
|
||
return headers
|
||
|
||
def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]:
|
||
time.sleep(1) # Sleep before making any request so it does not rate-limited
|
||
OmadaHelper.debug(f"{method} request to endpoint: {endpoint}")
|
||
url = f"{getattr(self, 'host')}{endpoint}"
|
||
headers = self._get_headers(kwargs.pop('include_auth', True))
|
||
try:
|
||
response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
response_type = "error" if data.get("errorCode", 0) != 0 else "success"
|
||
msg = f"{method} request completed: {endpoint}"
|
||
OmadaHelper.verbose(msg)
|
||
return OmadaHelper.response(response_type, msg, data)
|
||
except requests.exceptions.RequestException as ex:
|
||
OmadaHelper.minimal(f"{method} request failed: {url}")
|
||
OmadaHelper.verbose(f"{method} request error: {str(ex)}")
|
||
return OmadaHelper.response("error", f"{method} request failed to endpoint '{endpoint}' with error: {str(ex)}")
|
||
|
||
def authenticate(self) -> Dict[str, any]:
|
||
"""Make an endpoint request to get access token."""
|
||
OmadaHelper.verbose("Starting authentication process")
|
||
|
||
# Endpoint request
|
||
endpoint = "/openapi/authorize/token?grant_type=client_credentials"
|
||
payload = {
|
||
"omadacId": getattr(self, 'omada_id'),
|
||
"client_id": getattr(self, 'client_id'),
|
||
"client_secret": getattr(self, 'client_secret')
|
||
}
|
||
response = self._make_request("POST", endpoint, json=payload, include_auth=False)
|
||
|
||
# Successful endpoint response
|
||
if response.get("response_type") == "success":
|
||
response_result = response.get("response_result")
|
||
error_code = response_result.get("errorCode")
|
||
access_token = response_result.get("result").get("accessToken")
|
||
refresh_token = response_result.get("result").get("refreshToken")
|
||
if error_code == 0 and access_token and refresh_token:
|
||
self.access_token = access_token
|
||
self.refresh_token = refresh_token
|
||
msg = "Successfully authenticated"
|
||
OmadaHelper.minimal(msg)
|
||
return OmadaHelper.response("success", msg)
|
||
|
||
# Failed authentication
|
||
OmadaHelper.debug(f"Authentication response: {response}")
|
||
return OmadaHelper.response("error", f"Authentication failed - error: {response.get('response_message', 'Not provided')}")
|
||
|
||
def get_clients(self, site_id: str) -> Dict[str, Any]:
|
||
"""Make an endpoint request to get all online clients on a site."""
|
||
OmadaHelper.verbose(f"Retrieving clients for site: {site_id}")
|
||
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/clients?page=1&pageSize={getattr(self, 'page_size')}"
|
||
return self._make_request("GET", endpoint)
|
||
|
||
def get_devices(self, site_id: str) -> Dict[str, Any]:
|
||
"""Make an endpoint request to get all online devices on a site."""
|
||
OmadaHelper.verbose(f"Retrieving devices for site: {site_id}")
|
||
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}"
|
||
return self._make_request("GET", endpoint)
|
||
|
||
def populate_sites(self) -> Dict[str, Any]:
|
||
"""Make an endpoint request to populate all accessible sites."""
|
||
OmadaHelper.verbose("Starting site population process")
|
||
|
||
# Endpoint request
|
||
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
|
||
response = self._make_request("GET", endpoint)
|
||
|
||
# Successful endpoint response
|
||
if response.get("response_type") == "success":
|
||
response_result = response.get("response_result")
|
||
if response_result.get("errorCode") == 0:
|
||
# All allowed sites for credentials
|
||
all_sites = response_result.get("result", "").get("data", [])
|
||
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total")
|
||
|
||
# All available sites
|
||
self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites}
|
||
OmadaHelper.debug(f"Available sites: {self.available_sites_dict}")
|
||
|
||
# All valid sites from input
|
||
active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if site["siteId"] in self.requested_sites()}
|
||
active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if site["name"] in self.requested_sites()}
|
||
self.active_sites_dict = active_sites_by_id | active_sites_by_name
|
||
OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")
|
||
|
||
# If none of the input sites is valid/accessible, default to the first available site
|
||
if not self.active_sites_dict:
|
||
OmadaHelper.verbose("No valid site requested by configuration options, defaulting to first available site")
|
||
first_available_site = next(iter(self.available_sites_dict.items()), (None, None))
|
||
if first_available_site[0]: # Check if there's an available site
|
||
self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
|
||
OmadaHelper.debug(f"Using first available site: {first_available_site}")
|
||
|
||
# Successful site population
|
||
msg = f"Successfully populated {len(self.active_sites_dict)} site(s)"
|
||
OmadaHelper.minimal(msg)
|
||
return OmadaHelper.response("success", msg)
|
||
|
||
# Failed site population
|
||
OmadaHelper.debug(f"Site population response: {response}")
|
||
return OmadaHelper.response("error", f"Site population failed - error: {response.get('response_message', 'Not provided')}")
|
||
|
||
def requested_sites(self) -> list:
|
||
"""Returns sites requested by user."""
|
||
return getattr(self, 'sites')
|
||
|
||
def available_sites(self) -> dict:
|
||
"""Returns all available sites."""
|
||
return self.available_sites_dict
|
||
|
||
def active_sites(self) -> dict:
|
||
"""Returns the sites the code will use."""
|
||
return self.active_sites_dict
|
||
|
||
|
||
def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None:
|
||
if normalized_input_data.get("response_type", "error") != "success":
|
||
OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}")
|
||
return
|
||
|
||
response_result = normalized_input_data.get("response_result", {})
|
||
for entry in response_result:
|
||
OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}")
|
||
|
||
parent_node = entry["parent_node_mac_address"]
|
||
if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]):
|
||
parent_node = "Internet"
|
||
|
||
device_type = entry["device_type"].lower()
|
||
if device_type == "iphone":
|
||
device_type = "iPhone"
|
||
elif device_type == "pc":
|
||
device_type = "PC"
|
||
else:
|
||
device_type = device_type.capitalize()
|
||
|
||
plugin_objects.add_object(
|
||
primaryId=entry["mac_address"],
|
||
secondaryId=entry["ip_address"],
|
||
watched1=entry["name"],
|
||
watched2=parent_node,
|
||
watched3=entry["parent_node_port"],
|
||
watched4=entry["parent_node_ssid"],
|
||
extra=device_type,
|
||
foreignKey=entry["mac_address"],
|
||
helpVal1=entry["last_seen"],
|
||
helpVal2=entry["site_name"],
|
||
helpVal3=entry["vlan_id"],
|
||
helpVal4="null"
|
||
)
|
||
|
||
|
||
def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
|
||
omada_api = OmadaAPI(OPTIONS)
|
||
|
||
auth_result = omada_api.authenticate()
|
||
if auth_result["response_type"] == "error":
|
||
OmadaHelper.minimal("Authentication failed, aborting data collection")
|
||
OmadaHelper.debug(f"{auth_result['response_message']}")
|
||
return plugin_objects
|
||
|
||
sites_result = omada_api.populate_sites()
|
||
if sites_result["response_type"] == "error":
|
||
OmadaHelper.minimal("Site population failed, aborting data collection")
|
||
OmadaHelper.debug(f"{sites_result['response_message']}")
|
||
return plugin_objects
|
||
|
||
requested_sites = omada_api.requested_sites()
|
||
available_sites = omada_api.available_sites()
|
||
active_sites = omada_api.active_sites()
|
||
|
||
OmadaHelper.verbose(f"Requested sites: {requested_sites}")
|
||
OmadaHelper.verbose(f"Available sites: {available_sites}")
|
||
OmadaHelper.verbose(f"Active sites: {active_sites}")
|
||
|
||
OmadaHelper.minimal("Starting data collection process")
|
||
|
||
for site_id, site_name in active_sites.items():
|
||
OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})")
|
||
|
||
devices_response = omada_api.get_devices(site_id)
|
||
if devices_response["response_type"] != "success":
|
||
OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}")
|
||
else:
|
||
devices = devices_response["response_result"].get("result").get("data", [])
|
||
OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}")
|
||
devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE)
|
||
make_entries(plugin_objects, devices)
|
||
|
||
clients_response = omada_api.get_clients(site_id)
|
||
if clients_response["response_type"] != "success":
|
||
OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}")
|
||
else:
|
||
clients = clients_response["response_result"].get("result").get("data", [])
|
||
OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}")
|
||
clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE)
|
||
make_entries(plugin_objects, clients)
|
||
|
||
OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})")
|
||
|
||
OmadaHelper.minimal("Completed data collection process")
|
||
return plugin_objects
|
||
|
||
|
||
def main():
|
||
start_time = time.time()
|
||
OmadaHelper.minimal("Starting execution")
|
||
|
||
# Initialize the Plugin object output file
|
||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||
|
||
# Retrieve options
|
||
global OPTIONS, TIMEZONE
|
||
TIMEZONE = get_setting_value("TIMEZONE")
|
||
OPTIONS = {
|
||
"host": get_setting_value(f"{pluginName}_host").strip(),
|
||
"client_id": get_setting_value(f"{pluginName}_client_id").strip(),
|
||
"client_secret": get_setting_value(f"{pluginName}_client_secret").strip(),
|
||
"omada_id": get_setting_value(f"{pluginName}_omada_id").strip(),
|
||
"sites": get_setting_value(f"{pluginName}_sites"),
|
||
"verify_ssl": get_setting_value(f"{pluginName}_verify_ssl")
|
||
}
|
||
OmadaHelper.verbose("Configuration options loaded")
|
||
|
||
# Retrieve entries
|
||
plugin_objects = get_entries(plugin_objects)
|
||
plugin_objects.write_result_file()
|
||
|
||
# Finish
|
||
OmadaHelper.minimal(f"Execution completed in {time.time() - start_time:.2f}s, found {len(plugin_objects)} devices and clients")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|